diff options
Diffstat (limited to 'simulator/opendc-experiments')
9 files changed, 65 insertions, 141 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 636f291c..2d0da1bf 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -31,7 +31,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) implementation(project(":opendc-simulator:opendc-simulator-core")) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index f2f53917..88460745 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,11 +32,7 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Flavor -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host @@ -52,6 +48,7 @@ import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector import org.opendc.trace.core.EventTracer @@ -229,7 +226,7 @@ public fun attachMonitor( public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, - reader: TraceReader<ComputeWorkload>, + reader: TraceReader<SimWorkload>, scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor @@ -239,19 +236,20 @@ public suspend fun processTrace( var submitted = 0 while (reader.hasNext()) { - val (time, workload) = reader.next() + val entry = reader.next() submitted++ - delay(max(0, time - clock.millis())) + delay(max(0, entry.start - clock.millis())) coroutineScope.launch { chan.send(Unit) val server = client.newServer( - workload.image.name, - workload.image, + entry.name, + Image(entry.uid, entry.name, emptyMap(), mapOf("workload" to entry.workload)), Flavor( - workload.image.tags["cores"] as Int, - workload.image.tags["required-memory"] as Long - ) + entry.meta["cores"] as Int, + entry.meta["required-memory"] as Long + ), + meta = entry.meta ) server.watch(object : ServerWatcher { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index f9630078..a8462a51 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -22,14 +22,13 @@ package org.opendc.experiments.capelin.trace -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.workload.SimWorkload import java.util.TreeSet /** @@ -45,11 +44,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The iterator over the actual trace. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> = + private val iterator: Iterator<TraceEntry<SimWorkload>> = rawReaders .map { it.read() } .run { @@ -67,19 +66,11 @@ public class Sc20ParquetTraceReader( this else { map { entry -> - val image = entry.workload.image - val id = image.name + val id = entry.name val relevantPerformanceInterferenceModelItems = performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - val newImage = - Image( - image.uid, - image.name, - image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - ) - val newWorkload = entry.workload.copy(image = newImage) - Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) + entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) } } } @@ -87,7 +78,7 @@ public class Sc20ParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index b29bdc54..718c5e03 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -26,12 +26,10 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.util.UUID @@ -59,11 +57,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { val record = reader.read() ?: break val id = record["id"].toString() - val tick = record["time"] as Long val duration = record["duration"] as Long val cores = record["cores"] as Int val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long val fragment = SimTraceWorkload.Fragment( duration, @@ -83,13 +79,13 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> { + private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) .disableCompatibility() .build() var counter = 0 - val entries = mutableListOf<TraceEntryImpl>() + val entries = mutableListOf<TraceEntry<SimWorkload>>() return try { while (true) { @@ -109,13 +105,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val workload = SimTraceWorkload(vmFragments) - val vmWorkload = ComputeWorkload( - uid, - id, - UnnamedUser, - Image( - uid, - id, + entries.add( + TraceEntry( + uid, id, submissionTime, workload, mapOf( "submit-time" to submissionTime, "end-time" to endTime, @@ -126,7 +118,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { ) ) ) - entries.add(TraceEntryImpl(submissionTime, vmWorkload)) } entries @@ -141,7 +132,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * The entries in the trace. */ - private val entries: List<TraceEntryImpl> + private val entries: List<TraceEntry<SimWorkload>> init { val fragments = parseFragments(path) @@ -151,21 +142,5 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<ComputeWorkload>> = entries - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - internal data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> + public fun read(): List<TraceEntry<SimWorkload>> = entries } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index c588fda3..2c3eac3d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -31,14 +31,12 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image -import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload import java.io.File import java.io.Serializable import java.util.SortedSet @@ -62,11 +60,11 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<ComputeWorkload> { +) : TraceReader<SimWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<ComputeWorkload>> + private val iterator: Iterator<TraceEntry<SimWorkload>> /** * The intermediate buffer to store the read records in. @@ -236,35 +234,25 @@ public class Sc20StreamingParquetTraceReader( Random(random.nextInt()) ) val workload = SimTraceWorkload(fragments) - val vmWorkload = ComputeWorkload( - uid, - "VM Workload $id", - UnnamedUser, - Image( - uid, - id, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - TraceEntryImpl( - submissionTime, - vmWorkload + TraceEntry( + uid, id, submissionTime, workload, + mapOf( + IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, + "cores" to maxCores, + "required-memory" to requiredMemory, + "workload" to workload + ) ) } - .sortedBy { it.submissionTime } + .sortedBy { it.start } .toList() .iterator() } override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<ComputeWorkload> = iterator.next() + override fun next(): TraceEntry<SimWorkload> = iterator.next() override fun close() { readerThread.interrupt() @@ -287,20 +275,4 @@ public class Sc20StreamingParquetTraceReader( return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() } } - - /** - * An unnamed user. - */ - private object UnnamedUser : User { - override val name: String = "<unnamed>" - override val uid: UUID = UUID.randomUUID() - } - - /** - * An entry in the trace. - */ - private data class TraceEntryImpl( - override var submissionTime: Long, - override val workload: ComputeWorkload - ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 881652f6..5c8727ea 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -23,12 +23,11 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging -import org.opendc.compute.api.ComputeWorkload -import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry +import org.opendc.simulator.compute.workload.SimWorkload import java.util.* import kotlin.random.Random @@ -38,11 +37,11 @@ private val logger = KotlinLogging.logger {} * Sample the workload for the specified [run]. */ public fun sampleWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { return when { workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) workload.samplingStrategy == SamplingStrategy.HPC -> @@ -58,24 +57,24 @@ public fun sampleWorkload( * Sample a regular (non-HPC) workload. */ public fun sampleRegularWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { - shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + shuffled.sumByDouble { it.meta.getValue("total-load") as Double } } var currentLoad = 0.0 for (entry in shuffled) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -93,23 +92,23 @@ public fun sampleRegularWorkload( * Sample a HPC workload. */ public fun sampleHpcWorkload( - trace: List<TraceEntry<ComputeWorkload>>, + trace: List<TraceEntry<SimWorkload>>, workload: Workload, seed: Int, sampleOnLoad: Boolean -): List<TraceEntry<ComputeWorkload>> { +): List<TraceEntry<SimWorkload>> { val pattern = Regex("^vm__workload__(ComputeNode|cn).*") val random = Random(seed) val fraction = workload.fraction val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.workload.image.name + val name = entry.name name.matches(pattern) } val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() hpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -118,7 +117,7 @@ public fun sampleHpcWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() nonHpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -130,7 +129,7 @@ public fun sampleHpcWorkload( val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { - trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + trace.sumByDouble { it.meta.getValue("total-load") as Double } } logger.debug { "Total trace load: $totalLoad" } @@ -139,12 +138,12 @@ public fun sampleHpcWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<TraceEntry<ComputeWorkload>>() + val res = mutableListOf<TraceEntry<SimWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 for (entry in hpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -156,7 +155,7 @@ public fun sampleHpcWorkload( } for (entry in nonHpcSequence) { - val entryLoad = entry.workload.image.tags.getValue("total-load") as Double + val entryLoad = entry.meta.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > 1) { break } @@ -170,7 +169,7 @@ public fun sampleHpcWorkload( hpcSequence .take((fraction * trace.size).toInt()) .forEach { entry -> - hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcLoad += entry.meta.getValue("total-load") as Double hpcCount += 1 res.add(entry) } @@ -178,7 +177,7 @@ public fun sampleHpcWorkload( nonHpcSequence .take(((1 - fraction) * trace.size).toInt()) .forEach { entry -> - nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcLoad += entry.meta.getValue("total-load") as Double nonHpcCount += 1 res.add(entry) } @@ -194,16 +193,7 @@ public fun sampleHpcWorkload( /** * Sample a random trace entry. */ -private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> { - val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = Image( - id, - entry.workload.image.name, - entry.workload.image.tags - ) - val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) - return VmTraceEntry(vmWorkload, entry.submissionTime) +private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { + val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) + return entry.copy(uid = uid) } - -private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) : - TraceEntry<ComputeWorkload> diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 9921c209..4e6cfddc 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.ComputeWorkload import org.opendc.compute.service.driver.Host import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy @@ -43,6 +42,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import java.io.File @@ -201,7 +201,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { return Sc20ParquetTraceReader( listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index 00aa0395..f85d9b19 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -30,7 +30,6 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api(project(":opendc-core")) api(project(":opendc-harness")) implementation(project(":opendc-format")) implementation(project(":opendc-workflows")) diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 57d1a4a0..2be05119 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -121,9 +121,9 @@ public class UnderspecificationExperiment : Experiment("underspecification") { val reader = GwfTraceReader(File(trace)) while (reader.hasNext()) { - val (time, job) = reader.next() - delay(max(0, time * 1000 - clock.millis())) - scheduler.submit(job) + val entry = reader.next() + delay(max(0, entry.start * 1000 - clock.millis())) + scheduler.submit(entry.workload) } } |
