summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-10-05 15:26:46 +0200
committerGitHub <noreply@github.com>2020-10-05 15:26:46 +0200
commit1f0e5a17861e8a8de623b540596aee162e712521 (patch)
tree207f2deb8e3782f1a52e6b32290e274c6ff7ba1f /simulator/opendc-experiments/opendc-experiments-sc20/src
parent0119ca3b2e05b06f9646149c2d7bfe3d4b57c380 (diff)
parent374960cd15a2893a124c42975167f8bf6e45e868 (diff)
Merge pull request #45 from atlarge-research/refactor/decouple
Decouple simulation logic from OpenDC Compute
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-sc20/src')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/Main.kt3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt47
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt5
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt14
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt22
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt40
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt14
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt9
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt22
12 files changed, 93 insertions, 95 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/Main.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/Main.kt
index b7bb0c23..8916261b 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/Main.kt
@@ -41,7 +41,6 @@ import org.opendc.experiments.sc20.runner.internal.DefaultExperimentRunner
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import org.opendc.format.trace.sc20.Sc20VmPlacementReader
import java.io.File
-import java.io.InputStream
/**
* The logger for this experiment.
@@ -74,7 +73,7 @@ public class ExperimentCli : CliktCommand(name = "sc20-experiment") {
help = "path to the performance interference file"
)
.file(canBeDir = false)
- .convert { it.inputStream() as InputStream }
+ .convert { it.inputStream() }
/**
* The path to the original VM placements file.
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 97d7d5da..09f44199 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -34,23 +34,24 @@ import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.core.Flavor
import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.workload.PerformanceInterferenceModel
+import org.opendc.compute.core.metal.NODE_CLUSTER
+import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.metal.NODE_CLUSTER
-import org.opendc.compute.metal.driver.BareMetalDriver
-import org.opendc.compute.metal.service.ProvisioningService
-import org.opendc.compute.virt.HypervisorEvent
-import org.opendc.compute.virt.driver.SimpleVirtDriver
-import org.opendc.compute.virt.service.SimpleVirtProvisioningService
-import org.opendc.compute.virt.service.VirtProvisioningEvent
-import org.opendc.compute.virt.service.allocation.AllocationPolicy
-import org.opendc.core.failure.CorrelatedFaultInjector
-import org.opendc.core.failure.FailureDomain
-import org.opendc.core.failure.FaultInjector
+import org.opendc.compute.simulator.SimBareMetalDriver
+import org.opendc.compute.simulator.SimVirtDriver
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.AllocationPolicy
import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import org.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.failures.CorrelatedFaultInjector
+import org.opendc.simulator.failures.FailureDomain
+import org.opendc.simulator.failures.FaultInjector
import java.io.File
import java.time.Clock
import kotlin.math.ln
@@ -140,14 +141,14 @@ public suspend fun createProvisioner(
clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+): Pair<ProvisioningService, SimVirtProvisioningService> {
val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
@@ -162,7 +163,7 @@ public suspend fun createProvisioner(
public suspend fun attachMonitor(
coroutineScope: CoroutineScope,
clock: Clock,
- scheduler: SimpleVirtProvisioningService,
+ scheduler: SimVirtProvisioningService,
monitor: ExperimentMonitor
) {
@@ -171,8 +172,9 @@ public suspend fun attachMonitor(
// Monitor hypervisor events
for (hypervisor in hypervisors) {
// TODO Do not expose VirtDriver directly but use Hypervisor class.
- monitor.reportHostStateChange(clock.millis(), hypervisor, (hypervisor as SimpleVirtDriver).server)
- hypervisor.server.events
+ val server = (hypervisor as SimVirtDriver).server
+ monitor.reportHostStateChange(clock.millis(), hypervisor, server)
+ server.events
.onEach { event ->
val time = clock.millis()
when (event) {
@@ -200,7 +202,7 @@ public suspend fun attachMonitor(
}
.launchIn(coroutineScope)
- val driver = hypervisor.server.services[BareMetalDriver.Key]
+ val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver
driver.powerDraw
.onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
.launchIn(coroutineScope)
@@ -223,10 +225,9 @@ public suspend fun processTrace(
coroutineScope: CoroutineScope,
clock: Clock,
reader: TraceReader<VmWorkload>,
- scheduler: SimpleVirtProvisioningService,
+ scheduler: SimVirtProvisioningService,
chan: Channel<Unit>,
- monitor: ExperimentMonitor,
- vmPlacements: Map<String, String> = emptyMap()
+ monitor: ExperimentMonitor
) {
try {
var submitted = 0
@@ -242,8 +243,8 @@ public suspend fun processTrace(
workload.image.name,
workload.image,
Flavor(
- workload.image.maxCores,
- workload.image.requiredMemory
+ workload.image.tags["cores"] as Int,
+ workload.image.tags["required-memory"] as Long
)
)
// Monitor server events
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt
index 95987d07..660fc882 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/Run.kt
@@ -28,7 +28,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
-import org.opendc.compute.virt.service.allocation.*
+import org.opendc.compute.simulator.allocation.*
import org.opendc.experiments.sc20.experiment.model.CompositeWorkload
import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
import org.opendc.experiments.sc20.runner.TrialExperimentDescriptor
@@ -131,8 +131,7 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
trace,
scheduler,
chan,
- monitor,
- experiment.vmPlacements
+ monitor
)
logger.debug("SUBMIT=${scheduler.submittedVms}")
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
index 0ac532a4..18ba2c33 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ExperimentMonitor.kt
@@ -23,8 +23,8 @@
package org.opendc.experiments.sc20.experiment.monitor
import org.opendc.compute.core.Server
-import org.opendc.compute.virt.driver.VirtDriver
-import org.opendc.compute.virt.service.VirtProvisioningEvent
+import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import java.io.Closeable
/**
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index bb512ef7..3eb9362c 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -24,8 +24,8 @@ package org.opendc.experiments.sc20.experiment.monitor
import mu.KotlinLogging
import org.opendc.compute.core.Server
-import org.opendc.compute.virt.driver.VirtDriver
-import org.opendc.compute.virt.service.VirtProvisioningEvent
+import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
import org.opendc.experiments.sc20.telemetry.HostEvent
import org.opendc.experiments.sc20.telemetry.ProvisionerEvent
import org.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
@@ -57,7 +57,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
startTime = time
// Update timestamp of initial event
- currentHostEvent.replaceAll { k, v -> v.copy(timestamp = startTime) }
+ currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) }
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
index 5a336865..d735ea4b 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -22,14 +22,14 @@
package org.opendc.experiments.sc20.trace
-import org.opendc.compute.core.image.VmImage
-import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.compute.core.workload.PerformanceInterferenceModel
import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
import org.opendc.experiments.sc20.experiment.model.CompositeWorkload
import org.opendc.experiments.sc20.experiment.model.Workload
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import java.util.TreeSet
/**
@@ -73,13 +73,11 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
val newImage =
- VmImage(
+ SimWorkloadImage(
image.uid,
image.name,
- mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- image.flopsHistory,
- image.maxCores,
- image.requiredMemory
+ image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
+ (image as SimWorkloadImage).workload
)
val newWorkload = entry.workload.copy(image = newImage)
Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index 8965cf43..9bc1a58e 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -26,12 +26,12 @@ import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.core.image.FlopsHistoryFragment
-import org.opendc.compute.core.image.VmImage
import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.File
import java.util.UUID
@@ -47,12 +47,12 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the fragments into memory.
*/
- private fun parseFragments(path: File): Map<String, List<FlopsHistoryFragment>> {
+ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
.disableCompatibility()
.build()
- val fragments = mutableMapOf<String, MutableList<FlopsHistoryFragment>>()
+ val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
return try {
while (true) {
@@ -65,7 +65,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val cpuUsage = record["cpuUsage"] as Double
val flops = record["flops"] as Long
- val fragment = FlopsHistoryFragment(
+ val fragment = SimTraceWorkload.Fragment(
tick,
flops,
duration,
@@ -85,7 +85,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(path: File, fragments: Map<String, List<FlopsHistoryFragment>>): List<TraceEntryImpl> {
+ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> {
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
.disableCompatibility()
.build()
@@ -114,17 +114,17 @@ public class Sc20RawParquetTraceReader(private val path: File) {
uid,
id,
UnnamedUser,
- VmImage(
+ SimWorkloadImage(
uid,
id,
mapOf(
"submit-time" to submissionTime,
"end-time" to endTime,
- "total-load" to totalLoad
+ "total-load" to totalLoad,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory
),
- vmFragments,
- maxCores,
- requiredMemory
+ SimTraceWorkload(vmFragments)
)
)
entries.add(TraceEntryImpl(submissionTime, vmWorkload))
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index 41dc4b49..edef276c 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -31,14 +31,14 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.compute.core.image.FlopsHistoryFragment
-import org.opendc.compute.core.image.VmImage
-import org.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.compute.core.workload.PerformanceInterferenceModel
import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.File
import java.io.Serializable
import java.util.SortedSet
@@ -71,7 +71,7 @@ public class Sc20StreamingParquetTraceReader(
/**
* The intermediate buffer to store the read records in.
*/
- private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024)
+ private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024)
/**
* An optional filter for filtering the selected VMs
@@ -92,7 +92,7 @@ public class Sc20StreamingParquetTraceReader(
/**
* A poisonous fragment.
*/
- private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0))
+ private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0, 0, 0.0, 0))
/**
* The thread to read the records in.
@@ -119,7 +119,7 @@ public class Sc20StreamingParquetTraceReader(
val cpuUsage = record["cpuUsage"] as Double
val flops = record["flops"] as Long
- val fragment = FlopsHistoryFragment(
+ val fragment = SimTraceWorkload.Fragment(
tick,
flops,
duration,
@@ -139,12 +139,12 @@ public class Sc20StreamingParquetTraceReader(
/**
* Fill the buffers with the VMs
*/
- private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) {
+ private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) {
if (!hasNext) {
return
}
- val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>()
+ val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>()
queue.drainTo(fragments)
for ((id, fragment) in fragments) {
@@ -167,7 +167,7 @@ public class Sc20StreamingParquetTraceReader(
init {
val takenIds = mutableSetOf<UUID>()
val entries = mutableMapOf<String, GenericData.Record>()
- val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>()
+ val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
.disableCompatibility()
@@ -200,10 +200,10 @@ public class Sc20StreamingParquetTraceReader(
logger.info("Processing VM $id")
- val internalBuffer = mutableListOf<FlopsHistoryFragment>()
- val externalBuffer = mutableListOf<FlopsHistoryFragment>()
+ val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
+ val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>()
buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer)
- val fragments = sequence<FlopsHistoryFragment> {
+ val fragments = sequence {
repeat@ while (true) {
if (externalBuffer.isEmpty()) {
if (hasNext) {
@@ -220,7 +220,7 @@ public class Sc20StreamingParquetTraceReader(
for (fragment in internalBuffer) {
yield(fragment)
- if (fragment.tick >= endTime) {
+ if (fragment.time >= endTime) {
break@repeat
}
}
@@ -239,13 +239,15 @@ public class Sc20StreamingParquetTraceReader(
uid,
"VM Workload $id",
UnnamedUser,
- VmImage(
+ SimWorkloadImage(
uid,
id,
- mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- fragments,
- maxCores,
- requiredMemory
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory
+ ),
+ SimTraceWorkload(fragments),
)
)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index dd6b15d0..bb2a75ee 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -187,11 +187,11 @@ public class SolvinityConversion : TraceConversion("Solvinity") {
.filterNot { it.isDirectory }
.filter { it.extension == "csv" || it.extension == "txt" }
.toList()
- .forEach { vmFile ->
+ .forEach file@{ vmFile ->
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
.chunked(128)
- .forEachIndexed { idx, lines ->
+ .forEach { lines ->
for (line in lines) {
// Ignore comments in the trace
if (line.startsWith("#") || line.isBlank()) {
@@ -212,7 +212,7 @@ public class SolvinityConversion : TraceConversion("Solvinity") {
if (timestamp < minTimestamp) {
minTimestamp = timestamp
}
- return@forEach
+ return@file
}
}
}
@@ -229,13 +229,13 @@ public class SolvinityConversion : TraceConversion("Solvinity") {
.filterNot { it.isDirectory }
.filter { it.extension == "csv" || it.extension == "txt" }
.toList()
- .forEachIndexed { idx, vmFile ->
+ .forEach { vmFile ->
println(vmFile)
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
- var cores = -1
+ var cores: Int
var minTime = Long.MAX_VALUE
val flopsFragments = sequence {
@@ -353,13 +353,13 @@ public class BitbrainsConversion : TraceConversion("Bitbrains") {
.filterNot { it.isDirectory }
.filter { it.extension == "csv" || it.extension == "txt" }
.toList()
- .forEachIndexed { idx, vmFile ->
+ .forEach { vmFile ->
println(vmFile)
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
- var cores = -1
+ var cores: Int
var minTime = Long.MAX_VALUE
val flopsFragments = sequence {
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt
index 19da97bb..a8b83aef 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/kotlin/org/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -23,8 +23,8 @@
package org.opendc.experiments.sc20.trace
import mu.KotlinLogging
-import org.opendc.compute.core.image.VmImage
import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.simulator.SimWorkloadImage
import org.opendc.experiments.sc20.experiment.model.CompositeWorkload
import org.opendc.experiments.sc20.experiment.model.SamplingStrategy
import org.opendc.experiments.sc20.experiment.model.Workload
@@ -143,7 +143,6 @@ public fun sampleHpcWorkload(
if (sampleOnLoad) {
var currentLoad = 0.0
- var i = 0
for (entry in hpcSequence) {
val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > fraction) {
@@ -197,13 +196,11 @@ public fun sampleHpcWorkload(
*/
private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> {
val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
- val image = VmImage(
+ val image = SimWorkloadImage(
id,
entry.workload.image.name,
entry.workload.image.tags,
- entry.workload.image.flopsHistory,
- entry.workload.image.maxCores,
- entry.workload.image.requiredMemory
+ (entry.workload.image as SimWorkloadImage).workload
)
val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
return VmTraceEntry(vmWorkload, entry.submissionTime)
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml
index 5ce99dfb..8029092e 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/main/resources/log4j2.xml
@@ -30,7 +30,7 @@
</Console>
</Appenders>
<Loggers>
- <Logger name="org.opendc" level="warn" additivity="false">
+ <Logger name="org.opendc" level="debug" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.opendc.experiments.sc20" level="info" additivity="false">
diff --git a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 230e7f36..9c44edfc 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc20/src/test/kotlin/org/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -34,8 +34,8 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.Server
import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.virt.service.SimpleVirtProvisioningService
-import org.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.sc20.experiment.attachMonitor
import org.opendc.experiments.sc20.experiment.createFailureDomain
import org.opendc.experiments.sc20.experiment.createProvisioner
@@ -96,7 +96,7 @@ class Sc20IntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: SimpleVirtProvisioningService
+ lateinit var scheduler: SimVirtProvisioningService
testScope.launch {
val res = createProvisioner(
@@ -142,12 +142,14 @@ class Sc20IntegrationTest {
runSimulation()
// Note that these values have been verified beforehand
- assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs")
- assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run")
- assertEquals(207379117949, monitor.totalRequestedBurst)
- assertEquals(203388071813, monitor.totalGrantedBurst)
- assertEquals(3991046136, monitor.totalOvercommissionedBurst)
- assertEquals(0, monitor.totalInterferedBurst)
+ assertAll(
+ { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
+ { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
+ { assertEquals(207379117949, monitor.totalRequestedBurst) },
+ { assertEquals(203388071813, monitor.totalGrantedBurst) },
+ { assertEquals(3991046136, monitor.totalOvercommissionedBurst) },
+ { assertEquals(0, monitor.totalInterferedBurst) }
+ )
}
@Test
@@ -157,7 +159,7 @@ class Sc20IntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- lateinit var scheduler: SimpleVirtProvisioningService
+ lateinit var scheduler: SimVirtProvisioningService
testScope.launch {
val res = createProvisioner(