diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 19:38:29 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 20:33:27 +0100 |
| commit | b3f390be783cad21cd4925bcbe8077b91f869b5d (patch) | |
| tree | dadd8b4eb8af12120e8b334270f0d49c725b54e6 | |
| parent | b3a271794d64bd97ef93abf650137c5a0a1785df (diff) | |
compute: Model storage of VM images
18 files changed, 221 insertions, 34 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt index 4fd32f98..b4fc03f7 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt @@ -29,6 +29,31 @@ import java.util.UUID */ public interface ComputeClient : AutoCloseable { /** + * Obtain the list of [Image]s accessible by the requesting user. + */ + public suspend fun queryImages(): List<Image> + + /** + * Obtain a [Image] by its unique identifier. + * + * @param id The identifier of the image. + */ + public suspend fun findImage(id: UUID): Image? + + /** + * Create a new [Image] instance at this compute service. + * + * @param name The name of the image. + * @param labels The identifying labels of the image. + * @param meta The non-identifying meta-data of the image. + */ + public suspend fun newImage( + name: String, + labels: Map<String, String> = emptyMap(), + meta: Map<String, Any> = emptyMap() + ): Image + + /** * Obtain the list of [Server]s accessible by the requesting user. */ public suspend fun queryServers(): List<Server> diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt index 8b673e84..83e63b81 100644 --- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt +++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt @@ -22,16 +22,12 @@ package org.opendc.compute.api -import java.util.UUID - /** * An image containing a bootable operating system that can directly be executed by physical or virtual server. */ -public data class Image( - public override val uid: UUID, - public override val name: String, - override val labels: Map<String, String>, - override val meta: Map<String, Any> -) : Resource { - override suspend fun refresh() {} +public interface Image : Resource { + /** + * Delete the image instance. + */ + public suspend fun delete() } diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt new file mode 100644 index 00000000..6c5b2ab0 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import org.opendc.compute.api.Image +import java.util.* + +/** + * An [Image] implementation that is passed to clients but delegates its implementation to another class. + */ +internal class ClientImage(private val delegate: Image) : Image { + override val uid: UUID = delegate.uid + + override var name: String = delegate.name + private set + + override var labels: Map<String, String> = delegate.labels.toMap() + private set + + override var meta: Map<String, Any> = delegate.meta.toMap() + private set + + override suspend fun delete() { + delegate.delete() + refresh() + } + + override suspend fun refresh() { + delegate.refresh() + + name = delegate.name + labels = delegate.labels + meta = delegate.meta + } +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt index bca1ad44..ae4cee3b 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt @@ -87,6 +87,8 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche } override suspend fun refresh() { + delegate.refresh() + name = delegate.name flavor = delegate.flavor image = delegate.image diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 3feb80ad..3b694537 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -90,6 +90,11 @@ public class ComputeServiceImpl( private val activeServers: MutableMap<Server, Host> = mutableMapOf() /** + * The registered images for this compute service. + */ + internal val images = mutableMapOf<UUID, InternalImage>() + + /** * The registered servers for this compute service. */ private val servers = mutableMapOf<UUID, InternalServer>() @@ -126,6 +131,29 @@ public class ComputeServiceImpl( override fun newClient(): ComputeClient = object : ComputeClient { private var isClosed: Boolean = false + override suspend fun queryImages(): List<Image> { + check(!isClosed) { "Client is already closed" } + + return images.values.map { ClientImage(it) } + } + + override suspend fun findImage(id: UUID): Image? { + check(!isClosed) { "Client is already closed" } + + return images[id]?.let { ClientImage(it) } + } + + override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image { + check(!isClosed) { "Client is already closed" } + + val uid = UUID(clock.millis(), random.nextLong()) + val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) + + images[uid] = image + + return ClientImage(image) + } + override suspend fun newServer( name: String, image: Image, @@ -150,15 +178,19 @@ public class ComputeServiceImpl( ) ) + val uid = UUID(clock.millis(), random.nextLong()) val server = InternalServer( this@ComputeServiceImpl, - uid = UUID(random.nextLong(), random.nextLong()), + uid, name, flavor, image, labels.toMutableMap(), meta.toMutableMap() ) + + servers[uid] = server + if (start) { server.start() } @@ -218,6 +250,14 @@ public class ComputeServiceImpl( requestSchedulingCycle() } + internal fun delete(server: InternalServer) { + checkNotNull(servers.remove(server.uid)) { "Server was not know" } + } + + internal fun delete(image: InternalImage) { + checkNotNull(images.remove(image.uid)) { "Server was not know" } + } + /** * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt new file mode 100644 index 00000000..86f2f6b9 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.internal + +import org.opendc.compute.api.Image +import java.util.* + +/** + * Internal stateful representation of an [Image]. + */ +internal class InternalImage( + private val service: ComputeServiceImpl, + override val uid: UUID, + override val name: String, + labels: Map<String, String>, + meta: Map<String, Any> +) : Image { + + override val labels: MutableMap<String, String> = labels.toMutableMap() + + override val meta: MutableMap<String, Any> = meta.toMutableMap() + + override suspend fun refresh() { + // No-op: this object is the source-of-truth + } + + override suspend fun delete() { + service.delete(this) + } + + override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index 2656a488..ff7c1d15 100644 --- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -94,6 +94,7 @@ internal class InternalServer( ServerState.RUNNING -> { val host = checkNotNull(host) { "Server not running" } host.delete(this) + service.delete(this) } else -> {} // No work needed } @@ -123,4 +124,8 @@ internal class InternalServer( internal fun assignHost(host: Host) { this.host = host } + + override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt index e10bc56f..c05f1a2c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt @@ -30,6 +30,6 @@ import org.opendc.simulator.compute.workload.SimWorkload */ public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper { override fun createWorkload(server: Server): SimWorkload { - return server.image.meta[key] as SimWorkload + return requireNotNull(server.meta[key] ?: server.image.meta[key]) as SimWorkload } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 0b37d766..0672047c 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -82,7 +82,7 @@ internal class SimHostTest { scope.launch { val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) val duration = 5 * 60L - val vmImageA = Image( + val vmImageA = MockImage( UUID.randomUUID(), "<unnamed>", emptyMap(), @@ -97,7 +97,7 @@ internal class SimHostTest { ) ) ) - val vmImageB = Image( + val vmImageB = MockImage( UUID.randomUUID(), "<unnamed>", emptyMap(), @@ -143,6 +143,21 @@ internal class SimHostTest { ) } + private class MockImage( + override val uid: UUID, + override val name: String, + override val labels: Map<String, String>, + override val meta: Map<String, Any> + ) : Image { + override suspend fun delete() { + throw NotImplementedError() + } + + override suspend fun refresh() { + throw NotImplementedError() + } + } + private class MockServer( override val uid: UUID, override val name: String, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 88460745..c94ee5d4 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -232,6 +232,7 @@ public suspend fun processTrace( monitor: ExperimentMonitor ) { val client = scheduler.newClient() + val image = client.newImage("vm-image") try { var submitted = 0 @@ -244,7 +245,7 @@ public suspend fun processTrace( chan.send(Unit) val server = client.newServer( entry.name, - Image(entry.uid, entry.name, emptyMap(), mapOf("workload" to entry.workload)), + image, Flavor( entry.meta["cores"] as Int, entry.meta["required-memory"] as Long diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index 718c5e03..7ea5efe5 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -46,6 +46,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the fragments into memory. */ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) .disableCompatibility() .build() @@ -80,6 +81,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the metadata into a workload. */ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { + @Suppress("DEPRECATION") val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index 2c3eac3d..9ab69572 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -96,6 +96,7 @@ public class Sc20StreamingParquetTraceReader( * The thread to read the records in. */ private val readerThread = thread(start = true, name = "sc20-reader") { + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } @@ -111,11 +112,9 @@ public class Sc20StreamingParquetTraceReader( } val id = record["id"].toString() - val tick = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( duration, @@ -165,6 +164,7 @@ public class Sc20StreamingParquetTraceReader( val entries = mutableMapOf<String, GenericData.Record>() val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() + @Suppress("DEPRECATION") val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index a521dd22..f510271b 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,7 +22,6 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.api.Image import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload @@ -138,9 +137,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val task = Task( UUID(0L, taskId), "<unnamed>", - Image(UUID.randomUUID(), "<unnamed>", emptyMap(), mapOf("workload" to workload)), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to (runtime * 1000) ), diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index dd12a380..1eb4bac2 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -83,7 +83,7 @@ public class Sc20TraceReader( var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L + var timestamp: Long var cores = -1 var minTime = Long.MAX_VALUE diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index 375330f1..0d1f3cea 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -71,7 +71,6 @@ public class SwfTraceReader( var slicedWaitTime: Long var flopsPerSecond: Long var flopsPartialSlice: Long - var flopsFullSlice: Long var runtimePartialSliceRemainder: Long BufferedReader(FileReader(file)).use { reader -> @@ -125,7 +124,6 @@ public class SwfTraceReader( flopsPerSecond = 4_000L * cores runtimePartialSliceRemainder = runTime % sliceDuration flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder - flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice for ( tick in (submitTime + slicedWaitTime) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index c004162a..3d969eb7 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,7 +25,6 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.Image import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload @@ -57,6 +56,7 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() while (true) { @@ -81,16 +81,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val task = Task( UUID(0L, taskId), "<unnamed>", - Image( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to workload - ) - ), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to runtime ) diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 5ae503a7..34d19e4f 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -145,6 +145,7 @@ public class StageWorkflowService( private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private lateinit var image: Image init { this.mode = mode(this) @@ -152,6 +153,9 @@ public class StageWorkflowService( this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + coroutineScope.launch { + image = computeClient.newImage("workflow-runner") + } } override val events: Flow<WorkflowEvent> = tracer.openRecording().let { @@ -259,9 +263,9 @@ public class StageWorkflowService( val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task - val image = instance.task.image + val image = image coroutineScope.launch { - val server = computeClient.newServer(instance.task.name, image, flavor, start = false) + val server = computeClient.newServer(instance.task.name, image, flavor, start = false, meta = instance.task.metadata) instance.state = TaskStatus.ACTIVE instance.server = server diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt index 9ed3a9a5..4ccefef9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt @@ -24,7 +24,6 @@ package org.opendc.workflows.workload -import org.opendc.compute.api.Image import java.util.* /** @@ -39,7 +38,6 @@ import java.util.* public data class Task( val uid: UUID, val name: String, - val image: Image, val dependencies: Set<Task>, val metadata: Map<String, Any> = emptyMap() ) { |
