summaryrefslogtreecommitdiff
path: root/simulator/opendc-format
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 20:47:06 +0100
committerGitHub <noreply@github.com>2021-03-09 20:47:06 +0100
commit3b6fbe0b535bf3398f120373f59f87adbba34005 (patch)
treebc880252a935cc0b1558c50fe83f71d21b735d29 /simulator/opendc-format
parent66c2501d95b167f9e7474a45e542f82d2d8e83ff (diff)
parent40e5871e01858a55372bfcb51cf90069c080e751 (diff)
compute: Improvements to cloud compute model (v2)
This is the second in the series of pull requests to improve the existing cloud compute model (see #86). This pull request removes the dependency on the bare-metal provisioning code which simplifies experiment setup tremendously: - Remove bare-metal provisioning code (opendc-metal) - Remove opendc-core which was a relic of the previous codebase and was only used sparingly. - Move ownership of Server, Image and Flavor to the compute service. Users are expected to create instances via the compute service.
Diffstat (limited to 'simulator/opendc-format')
-rw-r--r--simulator/opendc-format/build.gradle.kts3
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt9
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt (renamed from simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt)34
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt45
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt39
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt42
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt36
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt5
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt55
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt42
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt57
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt53
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt49
-rw-r--r--simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt10
-rw-r--r--simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt4
15 files changed, 137 insertions, 346 deletions
diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts
index 37e9c9c8..385e556d 100644
--- a/simulator/opendc-format/build.gradle.kts
+++ b/simulator/opendc-format/build.gradle.kts
@@ -30,9 +30,8 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-compute:opendc-compute-api"))
- api(project(":opendc-workflows"))
+ api(project(":opendc-workflow:opendc-workflow-api"))
implementation(project(":opendc-simulator:opendc-simulator-compute"))
implementation(project(":opendc-compute:opendc-compute-simulator"))
api("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}")
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
index 1f73bb61..97d6f239 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
@@ -22,17 +22,14 @@
package org.opendc.format.environment
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.core.Environment
import java.io.Closeable
-import java.time.Clock
/**
- * An interface for reading descriptions of topology environments into memory as [Environment].
+ * An interface for reading descriptions of topology environments into memory.
*/
public interface EnvironmentReader : Closeable {
/**
- * Construct an [Environment] in the specified [CoroutineScope].
+ * Read the environment into a list.
*/
- public suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment
+ public fun read(): List<MachineDef>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
index 54fb6214..b5b3b84b 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,24 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.format.trace
+package org.opendc.format.environment
-import org.opendc.core.workload.Workload
-import java.io.Closeable
+import org.opendc.compute.simulator.power.api.CpuPowerModel
+import org.opendc.simulator.compute.SimMachineModel
+import java.util.*
-/**
- * An interface for persisting workload traces (e.g. to disk).
- *
- * @param T The type of [Workload] supported by this writer.
- */
-public interface TraceWriter<T : Workload> : Closeable {
- /**
- * Write an entry to the trace.
- *
- * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException].
- *
- * @param submissionTime The time of submission of the workload.
- * @param workload The workload to write to the trace.
- */
- public fun write(submissionTime: Long, workload: T)
-}
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: SimMachineModel,
+ val powerModel: CpuPowerModel
+)
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index bbbbe87c..3da8d0b3 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -25,21 +25,14 @@ package org.opendc.format.environment.sc18
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.simulator.SimBareMetalDriver
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
+import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.InputStream
-import java.time.Clock
import java.util.*
/**
@@ -55,9 +48,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
+ /**
+ * Read the environment.
+ */
+ public override fun read(): List<MachineDef> {
var counter = 0
- val nodes = setup.rooms.flatMap { room ->
+ return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
when (roomObject) {
is RoomObject.Rack -> {
@@ -75,35 +71,18 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimBareMetalDriver(
- coroutineScope,
- clock,
- UUID.randomUUID(),
- "node-${counter++}",
+ MachineDef(
+ UUID(0L, counter++.toLong()),
+ "node-$counter",
emptyMap(),
- SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000)))
+ SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
+ ConstantPowerModel(0.0)
)
}
}
}
}
}
-
- val provisioningService = SimpleProvisioningService()
- for (node in nodes) {
- provisioningService.create(node)
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
- val platform = Platform(
- UUID.randomUUID(),
- "sc18-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment(setup.name, null, listOf(platform))
}
override fun close() {}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 998f9cd6..9a06a40f 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -22,17 +22,9 @@
package org.opendc.format.environment.sc20
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.NODE_CLUSTER
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -40,7 +32,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
-import java.time.Clock
import java.util.*
/**
@@ -54,8 +45,7 @@ public class Sc20ClusterEnvironmentReader(
public constructor(file: File) : this(FileInputStream(file))
- @Suppress("BlockingMethodInNonBlockingContext")
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
+ public override fun read(): List<MachineDef> {
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
@@ -69,7 +59,7 @@ public class Sc20ClusterEnvironmentReader(
var memoryPerHost: Long
var coresPerHost: Int
- val nodes = mutableListOf<SimBareMetalDriver>()
+ val nodes = mutableListOf<MachineDef>()
val random = Random(0)
input.bufferedReader().use { reader ->
@@ -103,12 +93,10 @@ public class Sc20ClusterEnvironmentReader(
repeat(numberOfHosts) {
nodes.add(
- SimBareMetalDriver(
- coroutineScope,
- clock,
+ MachineDef(
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$it",
- mapOf(NODE_CLUSTER to clusterId),
+ mapOf("cluster" to clusterId),
SimMachineModel(
List(coresPerHost) { coreId ->
ProcessingUnit(unknownProcessingNode, coreId, speed)
@@ -125,22 +113,7 @@ public class Sc20ClusterEnvironmentReader(
}
}
- val provisioningService = SimpleProvisioningService()
- for (node in nodes) {
- provisioningService.create(node)
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
-
- val platform = Platform(
- UUID.randomUUID(),
- "sc20-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment("SC20 Environment", null, listOf(platform))
+ return nodes
}
override fun close() {}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 6cf65f7f..effd0286 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -25,22 +25,14 @@ package org.opendc.format.environment.sc20
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.InputStream
-import java.time.Clock
import java.util.*
/**
@@ -55,9 +47,12 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
+ /**
+ * Read the environment.
+ */
+ public override fun read(): List<MachineDef> {
var counter = 0
- val nodes = setup.rooms.flatMap { room ->
+ return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
when (roomObject) {
is RoomObject.Rack -> {
@@ -81,11 +76,9 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimBareMetalDriver(
- coroutineScope,
- clock,
- UUID.randomUUID(),
- "node-${counter++}",
+ MachineDef(
+ UUID(0L, counter++.toLong()),
+ "node-$counter",
emptyMap(),
SimMachineModel(cores, memories),
// For now we assume a simple linear load model with an idle draw of ~200W and a maximum
@@ -98,23 +91,6 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
}
}
}
-
- val provisioningService = SimpleProvisioningService()
- for (node in nodes) {
- provisioningService.create(node)
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
-
- val platform = Platform(
- UUID.randomUUID(),
- "sc20-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment(setup.name, null, listOf(platform))
}
override fun close() {}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
index ec547e84..3ce79d69 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
@@ -24,31 +24,21 @@
package org.opendc.format.trace
-import org.opendc.core.workload.Workload
+import java.util.UUID
/**
* An entry in a workload trace.
*
- * @param T The shape of the workload in this entry.
+ * @param uid The unique identifier of the entry.
+ * @param name The name of the entry.
+ * @param start The start time of the workload.
+ * @param workload The workload of the entry.
+ * @param meta The meta-data associated with the workload.
*/
-public interface TraceEntry<T : Workload> {
- /**
- * The time of submission of the workload.
- */
- public val submissionTime: Long
-
- /**
- * The workload in this trace entry.
- */
- public val workload: T
-
- /**
- * Extract the submission time from this entry.
- */
- public operator fun component1(): Long = submissionTime
-
- /**
- * Extract the workload from this entry.
- */
- public operator fun component2(): T = workload
-}
+public data class TraceEntry<out T>(
+ val uid: UUID,
+ val name: String,
+ val start: Long,
+ val workload: T,
+ val meta: Map<String, Any>
+)
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
index a0beec3e..7df1acd3 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
@@ -22,14 +22,13 @@
package org.opendc.format.trace
-import org.opendc.core.workload.Workload
import java.io.Closeable
/**
- * An interface for reading [Workload]s into memory.
+ * An interface for reading workloads into memory.
*
* This interface must guarantee that the entries are delivered in order of submission time.
*
* @param T The shape of the workloads supported by this reader.
*/
-public interface TraceReader<T : Workload> : Iterator<TraceEntry<T>>, Closeable
+public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 1571b17d..769b2b13 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -22,14 +22,12 @@
package org.opendc.format.trace.bitbrains
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -45,17 +43,17 @@ import kotlin.math.min
public class BitbrainsTraceReader(
traceDirectory: File,
performanceInterferenceModel: PerformanceInterferenceModel
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>()
+ val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
var timestampCol = 0
var coreCol = 0
@@ -132,50 +130,27 @@ public class BitbrainsTraceReader(
)
val workload = SimTraceWorkload(flopsHistory.asSequence())
- val vmWorkload = ComputeWorkload(
+ entries[vmId] = TraceEntry(
uuid,
- "VM Workload $vmId",
- UnnamedUser,
- Image(
- uuid,
- vmId.toString(),
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to cores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- entries[vmId] = TraceEntryImpl(
+ vmId.toString(),
startTime,
- vmWorkload
+ workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to cores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = entries.values.sortedBy { it.start }.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
index cd7aff3c..e68afeb7 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -22,15 +22,13 @@
package org.opendc.format.trace.gwf
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.Task
-import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
-import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import java.io.BufferedReader
import java.io.File
import java.io.InputStream
@@ -88,7 +86,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntryImpl>()
+ val workflows = mutableMapOf<Long, Job>()
+ val starts = mutableMapOf<Long, Long>()
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
@@ -131,22 +130,21 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
val flops: Long = 4000 * runtime * cores
- val entry = entries.getOrPut(workflowId) {
- TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet()))
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
}
- val workflow = entry.workload
val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)),
HashSet(),
mapOf(
+ "workload" to workload,
WORKFLOW_TASK_CORES to cores,
WORKFLOW_TASK_DEADLINE to (runtime * 1000)
),
)
- entry.submissionTime = min(entry.submissionTime, submitTime)
+ starts.merge(workflowId, submitTime, ::min)
(workflow.tasks as MutableSet<Task>).add(task)
tasks[taskId] = task
taskDependencies[task] = dependencies
@@ -165,7 +163,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
+ .sortedBy { it.start }
+ .iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
@@ -173,20 +173,4 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
override fun next(): TraceEntry<Job> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: Job
- ) : TraceEntry<Job>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
index 07785632..1eb4bac2 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -22,14 +22,12 @@
package org.opendc.format.trace.sc20
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -49,17 +47,17 @@ public class Sc20TraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>()
+ val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>()
val timestampCol = 0
val cpuUsageCol = 1
@@ -85,7 +83,7 @@ public class Sc20TraceReader(
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
- var timestamp = -1L
+ var timestamp: Long
var cores = -1
var minTime = Long.MAX_VALUE
@@ -157,50 +155,27 @@ public class Sc20TraceReader(
Random(random.nextInt())
)
val workload = SimTraceWorkload(flopsFragments.asSequence())
- val vmWorkload = ComputeWorkload(
+ entries[uuid] = TraceEntry(
uuid,
- "VM Workload $vmId",
- UnnamedUser,
- Image(
- uuid,
- vmId,
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to cores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- entries[uuid] = TraceEntryImpl(
+ vmId,
minTime,
- vmWorkload
+ workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to cores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = entries.values.sortedBy { it.start }.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index ead20c35..0d1f3cea 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -22,12 +22,10 @@
package org.opendc.format.trace.swf
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -43,17 +41,17 @@ import java.util.*
public class SwfTraceReader(
file: File,
maxNumCores: Int = -1
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>()
+ val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
val jobNumberCol = 0
val submitTimeCol = 1 // seconds (begin of trace is 0)
@@ -73,7 +71,6 @@ public class SwfTraceReader(
var slicedWaitTime: Long
var flopsPerSecond: Long
var flopsPartialSlice: Long
- var flopsFullSlice: Long
var runtimePartialSliceRemainder: Long
BufferedReader(FileReader(file)).use { reader ->
@@ -127,7 +124,6 @@ public class SwfTraceReader(
flopsPerSecond = 4_000L * cores
runtimePartialSliceRemainder = runTime % sliceDuration
flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder
- flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice
for (
tick in (submitTime + slicedWaitTime)
@@ -155,48 +151,27 @@ public class SwfTraceReader(
val uuid = UUID(0L, jobNumber)
val workload = SimTraceWorkload(flopsHistory.asSequence())
- val vmWorkload = ComputeWorkload(
+ entries[jobNumber] = TraceEntry(
uuid,
- "SWF Workload $jobNumber",
- UnnamedUser,
- Image(
- uuid,
- jobNumber.toString(),
- mapOf(
- "cores" to cores,
- "required-memory" to memory,
- "workload" to workload
- )
+ jobNumber.toString(),
+ submitTime,
+ workload,
+ mapOf(
+ "cores" to cores,
+ "required-memory" to memory,
+ "workload" to workload
)
)
-
- entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload)
}
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = entries.values.sortedBy { it.start }.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index 5a271fab..feadf61f 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -25,15 +25,13 @@ package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.Task
-import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
-import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import java.util.UUID
import kotlin.math.min
@@ -53,10 +51,12 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntryImpl>()
+ val workflows = mutableMapOf<Long, Job>()
+ val starts = mutableMapOf<Long, Long>()
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build()
while (true) {
@@ -74,29 +74,22 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val flops: Long = 4100 * (runtime / 1000) * cores
- val entry = entries.getOrPut(workflowId) {
- TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet()))
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
}
- val workflow = entry.workload
val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- Image(
- UUID.randomUUID(),
- "<unnamed>",
- mapOf(
- "workload" to workload
- )
- ),
HashSet(),
mapOf(
+ "workload" to workload,
WORKFLOW_TASK_CORES to cores,
WORKFLOW_TASK_DEADLINE to runtime
)
)
- entry.submissionTime = min(entry.submissionTime, submitTime)
+ starts.merge(workflowId, submitTime, ::min)
(workflow.tasks as MutableSet<Task>).add(task)
tasks[taskId] = task
taskDependencies[task] = dependencies
@@ -112,7 +105,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
+ .sortedBy { it.start }
+ .iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
@@ -120,20 +115,4 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
override fun next(): TraceEntry<Job> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: Job
- ) : TraceEntry<Job>
}
diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 7e3d2623..e0e049cf 100644
--- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -32,14 +32,14 @@ class SwfTraceReaderTest {
internal fun testParseSwf() {
val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI()))
var entry = reader.next()
- assertEquals(0, entry.submissionTime)
+ assertEquals(0, entry.start)
// 1961 slices for waiting, 3 full and 1 partial running slices
- assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size)
+ assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size)
entry = reader.next()
- assertEquals(164472, entry.submissionTime)
+ assertEquals(164472, entry.start)
// 1188 slices for waiting, 0 full and 1 partial running slices
- assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size)
- assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage)
+ assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size)
+ assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage)
}
}
diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
index 58d96657..bcfa7553 100644
--- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
+++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
@@ -36,11 +36,11 @@ class WtfTraceReaderTest {
fun testParseWtf() {
val reader = WtfTraceReader("src/test/resources/wtf-trace")
var entry = reader.next()
- assertEquals(0, entry.submissionTime)
+ assertEquals(0, entry.start)
assertEquals(23, entry.workload.tasks.size)
entry = reader.next()
- assertEquals(333387, entry.submissionTime)
+ assertEquals(333387, entry.start)
assertEquals(23, entry.workload.tasks.size)
}
}