summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt25
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt14
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt55
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt42
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt54
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt5
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt19
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt4
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt3
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt2
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt2
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt11
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt8
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt2
18 files changed, 221 insertions, 34 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
index 4fd32f98..b4fc03f7 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -29,6 +29,31 @@ import java.util.UUID
*/
public interface ComputeClient : AutoCloseable {
/**
+ * Obtain the list of [Image]s accessible by the requesting user.
+ */
+ public suspend fun queryImages(): List<Image>
+
+ /**
+ * Obtain a [Image] by its unique identifier.
+ *
+ * @param id The identifier of the image.
+ */
+ public suspend fun findImage(id: UUID): Image?
+
+ /**
+ * Create a new [Image] instance at this compute service.
+ *
+ * @param name The name of the image.
+ * @param labels The identifying labels of the image.
+ * @param meta The non-identifying meta-data of the image.
+ */
+ public suspend fun newImage(
+ name: String,
+ labels: Map<String, String> = emptyMap(),
+ meta: Map<String, Any> = emptyMap()
+ ): Image
+
+ /**
* Obtain the list of [Server]s accessible by the requesting user.
*/
public suspend fun queryServers(): List<Server>
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
index 8b673e84..83e63b81 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
@@ -22,16 +22,12 @@
package org.opendc.compute.api
-import java.util.UUID
-
/**
* An image containing a bootable operating system that can directly be executed by physical or virtual server.
*/
-public data class Image(
- public override val uid: UUID,
- public override val name: String,
- override val labels: Map<String, String>,
- override val meta: Map<String, Any>
-) : Resource {
- override suspend fun refresh() {}
+public interface Image : Resource {
+ /**
+ * Delete the image instance.
+ */
+ public suspend fun delete()
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
new file mode 100644
index 00000000..6c5b2ab0
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.internal
+
+import org.opendc.compute.api.Image
+import java.util.*
+
+/**
+ * An [Image] implementation that is passed to clients but delegates its implementation to another class.
+ */
+internal class ClientImage(private val delegate: Image) : Image {
+ override val uid: UUID = delegate.uid
+
+ override var name: String = delegate.name
+ private set
+
+ override var labels: Map<String, String> = delegate.labels.toMap()
+ private set
+
+ override var meta: Map<String, Any> = delegate.meta.toMap()
+ private set
+
+ override suspend fun delete() {
+ delegate.delete()
+ refresh()
+ }
+
+ override suspend fun refresh() {
+ delegate.refresh()
+
+ name = delegate.name
+ labels = delegate.labels
+ meta = delegate.meta
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
index bca1ad44..ae4cee3b 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -87,6 +87,8 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
}
override suspend fun refresh() {
+ delegate.refresh()
+
name = delegate.name
flavor = delegate.flavor
image = delegate.image
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 3feb80ad..3b694537 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -90,6 +90,11 @@ public class ComputeServiceImpl(
private val activeServers: MutableMap<Server, Host> = mutableMapOf()
/**
+ * The registered images for this compute service.
+ */
+ internal val images = mutableMapOf<UUID, InternalImage>()
+
+ /**
* The registered servers for this compute service.
*/
private val servers = mutableMapOf<UUID, InternalServer>()
@@ -126,6 +131,29 @@ public class ComputeServiceImpl(
override fun newClient(): ComputeClient = object : ComputeClient {
private var isClosed: Boolean = false
+ override suspend fun queryImages(): List<Image> {
+ check(!isClosed) { "Client is already closed" }
+
+ return images.values.map { ClientImage(it) }
+ }
+
+ override suspend fun findImage(id: UUID): Image? {
+ check(!isClosed) { "Client is already closed" }
+
+ return images[id]?.let { ClientImage(it) }
+ }
+
+ override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image {
+ check(!isClosed) { "Client is already closed" }
+
+ val uid = UUID(clock.millis(), random.nextLong())
+ val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta)
+
+ images[uid] = image
+
+ return ClientImage(image)
+ }
+
override suspend fun newServer(
name: String,
image: Image,
@@ -150,15 +178,19 @@ public class ComputeServiceImpl(
)
)
+ val uid = UUID(clock.millis(), random.nextLong())
val server = InternalServer(
this@ComputeServiceImpl,
- uid = UUID(random.nextLong(), random.nextLong()),
+ uid,
name,
flavor,
image,
labels.toMutableMap(),
meta.toMutableMap()
)
+
+ servers[uid] = server
+
if (start) {
server.start()
}
@@ -218,6 +250,14 @@ public class ComputeServiceImpl(
requestSchedulingCycle()
}
+ internal fun delete(server: InternalServer) {
+ checkNotNull(servers.remove(server.uid)) { "Server was not know" }
+ }
+
+ internal fun delete(image: InternalImage) {
+ checkNotNull(images.remove(image.uid)) { "Server was not know" }
+ }
+
/**
* Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
new file mode 100644
index 00000000..86f2f6b9
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.internal
+
+import org.opendc.compute.api.Image
+import java.util.*
+
+/**
+ * Internal stateful representation of an [Image].
+ */
+internal class InternalImage(
+ private val service: ComputeServiceImpl,
+ override val uid: UUID,
+ override val name: String,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+) : Image {
+
+ override val labels: MutableMap<String, String> = labels.toMutableMap()
+
+ override val meta: MutableMap<String, Any> = meta.toMutableMap()
+
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override suspend fun delete() {
+ service.delete(this)
+ }
+
+ override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index 2656a488..ff7c1d15 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -94,6 +94,7 @@ internal class InternalServer(
ServerState.RUNNING -> {
val host = checkNotNull(host) { "Server not running" }
host.delete(this)
+ service.delete(this)
}
else -> {} // No work needed
}
@@ -123,4 +124,8 @@ internal class InternalServer(
internal fun assignHost(host: Host) {
this.host = host
}
+
+ override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt
index e10bc56f..c05f1a2c 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt
@@ -30,6 +30,6 @@ import org.opendc.simulator.compute.workload.SimWorkload
*/
public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper {
override fun createWorkload(server: Server): SimWorkload {
- return server.image.meta[key] as SimWorkload
+ return requireNotNull(server.meta[key] ?: server.image.meta[key]) as SimWorkload
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 0b37d766..0672047c 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -82,7 +82,7 @@ internal class SimHostTest {
scope.launch {
val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider())
val duration = 5 * 60L
- val vmImageA = Image(
+ val vmImageA = MockImage(
UUID.randomUUID(),
"<unnamed>",
emptyMap(),
@@ -97,7 +97,7 @@ internal class SimHostTest {
)
)
)
- val vmImageB = Image(
+ val vmImageB = MockImage(
UUID.randomUUID(),
"<unnamed>",
emptyMap(),
@@ -143,6 +143,21 @@ internal class SimHostTest {
)
}
+ private class MockImage(
+ override val uid: UUID,
+ override val name: String,
+ override val labels: Map<String, String>,
+ override val meta: Map<String, Any>
+ ) : Image {
+ override suspend fun delete() {
+ throw NotImplementedError()
+ }
+
+ override suspend fun refresh() {
+ throw NotImplementedError()
+ }
+ }
+
private class MockServer(
override val uid: UUID,
override val name: String,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 88460745..c94ee5d4 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -232,6 +232,7 @@ public suspend fun processTrace(
monitor: ExperimentMonitor
) {
val client = scheduler.newClient()
+ val image = client.newImage("vm-image")
try {
var submitted = 0
@@ -244,7 +245,7 @@ public suspend fun processTrace(
chan.send(Unit)
val server = client.newServer(
entry.name,
- Image(entry.uid, entry.name, emptyMap(), mapOf("workload" to entry.workload)),
+ image,
Flavor(
entry.meta["cores"] as Int,
entry.meta["required-memory"] as Long
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index 718c5e03..7ea5efe5 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -46,6 +46,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the fragments into memory.
*/
private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
.disableCompatibility()
.build()
@@ -80,6 +81,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the metadata into a workload.
*/
private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ @Suppress("DEPRECATION")
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
.disableCompatibility()
.build()
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index 2c3eac3d..9ab69572 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -96,6 +96,7 @@ public class Sc20StreamingParquetTraceReader(
* The thread to read the records in.
*/
private val readerThread = thread(start = true, name = "sc20-reader") {
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
@@ -111,11 +112,9 @@ public class Sc20StreamingParquetTraceReader(
}
val id = record["id"].toString()
- val tick = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
- val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
duration,
@@ -165,6 +164,7 @@ public class Sc20StreamingParquetTraceReader(
val entries = mutableMapOf<String, GenericData.Record>()
val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
+ @Suppress("DEPRECATION")
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
index a521dd22..f510271b 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -22,7 +22,6 @@
package org.opendc.format.trace.gwf
-import org.opendc.compute.api.Image
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
@@ -138,9 +137,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- Image(UUID.randomUUID(), "<unnamed>", emptyMap(), mapOf("workload" to workload)),
HashSet(),
mapOf(
+ "workload" to workload,
WORKFLOW_TASK_CORES to cores,
WORKFLOW_TASK_DEADLINE to (runtime * 1000)
),
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
index dd12a380..1eb4bac2 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -83,7 +83,7 @@ public class Sc20TraceReader(
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
- var timestamp = -1L
+ var timestamp: Long
var cores = -1
var minTime = Long.MAX_VALUE
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index 375330f1..0d1f3cea 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -71,7 +71,6 @@ public class SwfTraceReader(
var slicedWaitTime: Long
var flopsPerSecond: Long
var flopsPartialSlice: Long
- var flopsFullSlice: Long
var runtimePartialSliceRemainder: Long
BufferedReader(FileReader(file)).use { reader ->
@@ -125,7 +124,6 @@ public class SwfTraceReader(
flopsPerSecond = 4_000L * cores
runtimePartialSliceRemainder = runTime % sliceDuration
flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder
- flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice
for (
tick in (submitTime + slicedWaitTime)
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index c004162a..3d969eb7 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -25,7 +25,6 @@ package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.api.Image
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
@@ -57,6 +56,7 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build()
while (true) {
@@ -81,16 +81,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- Image(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to workload
- )
- ),
HashSet(),
mapOf(
+ "workload" to workload,
WORKFLOW_TASK_CORES to cores,
WORKFLOW_TASK_DEADLINE to runtime
)
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 5ae503a7..34d19e4f 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -145,6 +145,7 @@ public class StageWorkflowService(
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
+ private lateinit var image: Image
init {
this.mode = mode(this)
@@ -152,6 +153,9 @@ public class StageWorkflowService(
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
+ coroutineScope.launch {
+ image = computeClient.newImage("workflow-runner")
+ }
}
override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
@@ -259,9 +263,9 @@ public class StageWorkflowService(
val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
- val image = instance.task.image
+ val image = image
coroutineScope.launch {
- val server = computeClient.newServer(instance.task.name, image, flavor, start = false)
+ val server = computeClient.newServer(instance.task.name, image, flavor, start = false, meta = instance.task.metadata)
instance.state = TaskStatus.ACTIVE
instance.server = server
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
index 9ed3a9a5..4ccefef9 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
@@ -24,7 +24,6 @@
package org.opendc.workflows.workload
-import org.opendc.compute.api.Image
import java.util.*
/**
@@ -39,7 +38,6 @@ import java.util.*
public data class Task(
val uid: UUID,
val name: String,
- val image: Image,
val dependencies: Set<Task>,
val metadata: Map<String, Any> = emptyMap()
) {