diff options
Diffstat (limited to 'simulator/opendc-format')
15 files changed, 137 insertions, 346 deletions
diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index 37e9c9c8..385e556d 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -30,9 +30,8 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-workflows")) + api(project(":opendc-workflow:opendc-workflow-api")) implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation(project(":opendc-compute:opendc-compute-simulator")) api("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt index 1f73bb61..97d6f239 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt @@ -22,17 +22,14 @@ package org.opendc.format.environment -import kotlinx.coroutines.CoroutineScope -import org.opendc.core.Environment import java.io.Closeable -import java.time.Clock /** - * An interface for reading descriptions of topology environments into memory as [Environment]. + * An interface for reading descriptions of topology environments into memory. */ public interface EnvironmentReader : Closeable { /** - * Construct an [Environment] in the specified [CoroutineScope]. + * Read the environment into a list. */ - public suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment + public fun read(): List<MachineDef> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt index 54fb6214..b5b3b84b 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,24 +20,16 @@ * SOFTWARE. */ -package org.opendc.format.trace +package org.opendc.format.environment -import org.opendc.core.workload.Workload -import java.io.Closeable +import org.opendc.compute.simulator.power.api.CpuPowerModel +import org.opendc.simulator.compute.SimMachineModel +import java.util.* -/** - * An interface for persisting workload traces (e.g. to disk). - * - * @param T The type of [Workload] supported by this writer. - */ -public interface TraceWriter<T : Workload> : Closeable { - /** - * Write an entry to the trace. - * - * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException]. - * - * @param submissionTime The time of submission of the workload. - * @param workload The workload to write to the trace. - */ - public fun write(submissionTime: Long, workload: T) -} +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: SimMachineModel, + val powerModel: CpuPowerModel +) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index bbbbe87c..3da8d0b3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -25,21 +25,14 @@ package org.opendc.format.environment.sc18 import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry +import org.opendc.compute.simulator.power.models.ConstantPowerModel import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -55,9 +48,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { var counter = 0 - val nodes = setup.rooms.flatMap { room -> + return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { @@ -75,35 +71,18 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimBareMetalDriver( - coroutineScope, - clock, - UUID.randomUUID(), - "node-${counter++}", + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))) + SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), + ConstantPowerModel(0.0) ) } } } } } - - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - val platform = Platform( - UUID.randomUUID(), - "sc18-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(setup.name, null, listOf(platform)) } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 998f9cd6..9a06a40f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -22,17 +22,9 @@ package org.opendc.format.environment.sc20 -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.NODE_CLUSTER -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -40,7 +32,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import java.io.File import java.io.FileInputStream import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -54,8 +45,7 @@ public class Sc20ClusterEnvironmentReader( public constructor(file: File) : this(FileInputStream(file)) - @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + public override fun read(): List<MachineDef> { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 @@ -69,7 +59,7 @@ public class Sc20ClusterEnvironmentReader( var memoryPerHost: Long var coresPerHost: Int - val nodes = mutableListOf<SimBareMetalDriver>() + val nodes = mutableListOf<MachineDef>() val random = Random(0) input.bufferedReader().use { reader -> @@ -103,12 +93,10 @@ public class Sc20ClusterEnvironmentReader( repeat(numberOfHosts) { nodes.add( - SimBareMetalDriver( - coroutineScope, - clock, + MachineDef( UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", - mapOf(NODE_CLUSTER to clusterId), + mapOf("cluster" to clusterId), SimMachineModel( List(coresPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, speed) @@ -125,22 +113,7 @@ public class Sc20ClusterEnvironmentReader( } } - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "sc20-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment("SC20 Environment", null, listOf(platform)) + return nodes } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 6cf65f7f..effd0286 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -25,22 +25,14 @@ package org.opendc.format.environment.sc20 import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import kotlinx.coroutines.CoroutineScope -import org.opendc.compute.simulator.SimBareMetalDriver import org.opendc.compute.simulator.power.models.LinearPowerModel -import org.opendc.core.Environment -import org.opendc.core.Platform -import org.opendc.core.Zone -import org.opendc.core.services.ServiceRegistry import org.opendc.format.environment.EnvironmentReader -import org.opendc.metal.service.ProvisioningService -import org.opendc.metal.service.SimpleProvisioningService +import org.opendc.format.environment.MachineDef import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import java.io.InputStream -import java.time.Clock import java.util.* /** @@ -55,9 +47,12 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + /** + * Read the environment. + */ + public override fun read(): List<MachineDef> { var counter = 0 - val nodes = setup.rooms.flatMap { room -> + return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> when (roomObject) { is RoomObject.Rack -> { @@ -81,11 +76,9 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja else -> throw IllegalArgumentException("The cpu id $id is not recognized") } } - SimBareMetalDriver( - coroutineScope, - clock, - UUID.randomUUID(), - "node-${counter++}", + MachineDef( + UUID(0L, counter++.toLong()), + "node-$counter", emptyMap(), SimMachineModel(cores, memories), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum @@ -98,23 +91,6 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja } } } - - val provisioningService = SimpleProvisioningService() - for (node in nodes) { - provisioningService.create(node) - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "sc20-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(setup.name, null, listOf(platform)) } override fun close() {} diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt index ec547e84..3ce79d69 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt @@ -24,31 +24,21 @@ package org.opendc.format.trace -import org.opendc.core.workload.Workload +import java.util.UUID /** * An entry in a workload trace. * - * @param T The shape of the workload in this entry. + * @param uid The unique identifier of the entry. + * @param name The name of the entry. + * @param start The start time of the workload. + * @param workload The workload of the entry. + * @param meta The meta-data associated with the workload. */ -public interface TraceEntry<T : Workload> { - /** - * The time of submission of the workload. - */ - public val submissionTime: Long - - /** - * The workload in this trace entry. - */ - public val workload: T - - /** - * Extract the submission time from this entry. - */ - public operator fun component1(): Long = submissionTime - - /** - * Extract the workload from this entry. - */ - public operator fun component2(): T = workload -} +public data class TraceEntry<out T>( + val uid: UUID, + val name: String, + val start: Long, + val workload: T, + val meta: Map<String, Any> +) diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index a0beec3e..7df1acd3 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,14 +22,13 @@ package org.opendc.format.trace -import org.opendc.core.workload.Workload import java.io.Closeable /** - * An interface for reading [Workload]s into memory. + * An interface for reading workloads into memory. * * This interface must guarantee that the entries are delivered in order of submission time. * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader<T : Workload> : Iterator<TraceEntry<T>>, Closeable +public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 1571b17d..769b2b13 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -22,14 +22,12 @@ package org.opendc.format.trace.bitbrains -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -45,17 +43,17 @@ import kotlin.math.min public class BitbrainsTraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() var timestampCol = 0 var coreCol = 0 @@ -132,50 +130,27 @@ public class BitbrainsTraceReader( ) val workload = SimTraceWorkload(flopsHistory.asSequence()) - val vmWorkload = ComputeWorkload( + entries[vmId] = TraceEntry( uuid, - "VM Workload $vmId", - UnnamedUser, - Image( - uuid, - vmId.toString(), - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - entries[vmId] = TraceEntryImpl( + vmId.toString(), startTime, - vmWorkload + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index cd7aff3c..e68afeb7 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -22,15 +22,13 @@ package org.opendc.format.trace.gwf -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File import java.io.InputStream @@ -88,7 +86,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntryImpl>() + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() @@ -131,22 +130,21 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val flops: Long = 4000 * runtime * cores - val entry = entries.getOrPut(workflowId) { - TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) } - val workflow = entry.workload val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to (runtime * 1000) ), ) - entry.submissionTime = min(entry.submissionTime, submitTime) + starts.merge(workflowId, submitTime, ::min) (workflow.tasks as MutableSet<Task>).add(task) tasks[taskId] = task taskDependencies[task] = dependencies @@ -165,7 +163,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() @@ -173,20 +173,4 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { override fun next(): TraceEntry<Job> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: Job - ) : TraceEntry<Job> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt index 07785632..1eb4bac2 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -22,14 +22,12 @@ package org.opendc.format.trace.sc20 -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -49,17 +47,17 @@ public class Sc20TraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>() val timestampCol = 0 val cpuUsageCol = 1 @@ -85,7 +83,7 @@ public class Sc20TraceReader( var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L + var timestamp: Long var cores = -1 var minTime = Long.MAX_VALUE @@ -157,50 +155,27 @@ public class Sc20TraceReader( Random(random.nextInt()) ) val workload = SimTraceWorkload(flopsFragments.asSequence()) - val vmWorkload = ComputeWorkload( + entries[uuid] = TraceEntry( uuid, - "VM Workload $vmId", - UnnamedUser, - Image( - uuid, - vmId, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - entries[uuid] = TraceEntryImpl( + vmId, minTime, - vmWorkload + workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to cores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index ead20c35..0d1f3cea 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -22,12 +22,10 @@ package org.opendc.format.trace.swf -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -43,17 +41,17 @@ import java.util.* public class SwfTraceReader( file: File, maxNumCores: Int = -1 -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>() + val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() val jobNumberCol = 0 val submitTimeCol = 1 // seconds (begin of trace is 0) @@ -73,7 +71,6 @@ public class SwfTraceReader( var slicedWaitTime: Long var flopsPerSecond: Long var flopsPartialSlice: Long - var flopsFullSlice: Long var runtimePartialSliceRemainder: Long BufferedReader(FileReader(file)).use { reader -> @@ -127,7 +124,6 @@ public class SwfTraceReader( flopsPerSecond = 4_000L * cores runtimePartialSliceRemainder = runTime % sliceDuration flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder - flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice for ( tick in (submitTime + slicedWaitTime) @@ -155,48 +151,27 @@ public class SwfTraceReader( val uuid = UUID(0L, jobNumber) val workload = SimTraceWorkload(flopsHistory.asSequence()) - val vmWorkload = ComputeWorkload( + entries[jobNumber] = TraceEntry( uuid, - "SWF Workload $jobNumber", - UnnamedUser, - Image( - uuid, - jobNumber.toString(), - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) + jobNumber.toString(), + submitTime, + workload, + mapOf( + "cores" to cores, + "required-memory" to memory, + "workload" to workload ) ) - - entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload) } } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = entries.values.sortedBy { it.start }.iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index 5a271fab..feadf61f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,15 +25,13 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.util.UUID import kotlin.math.min @@ -53,10 +51,12 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { * Initialize the reader. */ init { - val entries = mutableMapOf<Long, TraceEntryImpl>() + val workflows = mutableMapOf<Long, Job>() + val starts = mutableMapOf<Long, Long>() val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() + @Suppress("DEPRECATION") val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() while (true) { @@ -74,29 +74,22 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val flops: Long = 4100 * (runtime / 1000) * cores - val entry = entries.getOrPut(workflowId) { - TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) } - val workflow = entry.workload val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, taskId), "<unnamed>", - Image( - UUID.randomUUID(), - "<unnamed>", - mapOf( - "workload" to workload - ) - ), HashSet(), mapOf( + "workload" to workload, WORKFLOW_TASK_CORES to cores, WORKFLOW_TASK_DEADLINE to runtime ) ) - entry.submissionTime = min(entry.submissionTime, submitTime) + starts.merge(workflowId, submitTime, ::min) (workflow.tasks as MutableSet<Task>).add(task) tasks[taskId] = task taskDependencies[task] = dependencies @@ -112,7 +105,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { } // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } + .sortedBy { it.start } + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() @@ -120,20 +115,4 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { override fun next(): TraceEntry<Job> = iterator.next() override fun close() {} - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: Job - ) : TraceEntry<Job> } diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt index 7e3d2623..e0e049cf 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -32,14 +32,14 @@ class SwfTraceReaderTest { internal fun testParseSwf() { val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI())) var entry = reader.next() - assertEquals(0, entry.submissionTime) + assertEquals(0, entry.start) // 1961 slices for waiting, 3 full and 1 partial running slices - assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) + assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size) entry = reader.next() - assertEquals(164472, entry.submissionTime) + assertEquals(164472, entry.start) // 1188 slices for waiting, 0 full and 1 partial running slices - assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size) - assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage) + assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size) + assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage) } } diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt index 58d96657..bcfa7553 100644 --- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -36,11 +36,11 @@ class WtfTraceReaderTest { fun testParseWtf() { val reader = WtfTraceReader("src/test/resources/wtf-trace") var entry = reader.next() - assertEquals(0, entry.submissionTime) + assertEquals(0, entry.start) assertEquals(23, entry.workload.tasks.size) entry = reader.next() - assertEquals(333387, entry.submissionTime) + assertEquals(333387, entry.start) assertEquals(23, entry.workload.tasks.size) } } |
