summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-09 20:47:06 +0100
committerGitHub <noreply@github.com>2021-03-09 20:47:06 +0100
commit3b6fbe0b535bf3398f120373f59f87adbba34005 (patch)
treebc880252a935cc0b1558c50fe83f71d21b735d29 /simulator/opendc-experiments
parent66c2501d95b167f9e7474a45e542f82d2d8e83ff (diff)
parent40e5871e01858a55372bfcb51cf90069c080e751 (diff)
compute: Improvements to cloud compute model (v2)
This is the second in the series of pull requests to improve the existing cloud compute model (see #86). This pull request removes the dependency on the bare-metal provisioning code which simplifies experiment setup tremendously: - Remove bare-metal provisioning code (opendc-metal) - Remove opendc-core which was a relic of the previous codebase and was only used sparingly. - Move ownership of Server, Image and Flavor to the compute service. Users are expected to create instances via the compute service.
Diffstat (limited to 'simulator/opendc-experiments')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt125
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt9
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt13
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt24
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt21
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt58
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt54
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt22
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt52
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt2
15 files changed, 163 insertions, 272 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 636f291c..2d0da1bf 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -31,7 +31,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-harness"))
implementation(project(":opendc-format"))
implementation(project(":opendc-simulator:opendc-simulator-core"))
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index a5cf4fc0..f327b55d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.experiment
+package org.opendc.experiments.capelin
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -32,30 +32,24 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
-import org.opendc.metal.NODE_CLUSTER
-import org.opendc.metal.NodeEvent
-import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
-import org.opendc.simulator.failures.FailureDomain
import org.opendc.simulator.failures.FaultInjector
import org.opendc.trace.core.EventTracer
import java.io.File
@@ -72,20 +66,20 @@ private val logger = KotlinLogging.logger {}
/**
* Construct the failure domain for the experiments.
*/
-public suspend fun createFailureDomain(
+public fun createFailureDomain(
coroutineScope: CoroutineScope,
clock: Clock,
seed: Int,
failureInterval: Double,
- bareMetalProvisioner: ProvisioningService,
+ service: ComputeService,
chan: Channel<Unit>
): CoroutineScope {
val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
- for (node in bareMetalProvisioner.nodes()) {
- val cluster = node.metadata[NODE_CLUSTER] as String
+ for (host in service.hosts) {
+ val cluster = host.meta["cluster"] as String
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
@@ -95,7 +89,7 @@ public suspend fun createFailureDomain(
failureInterval
)
}
- injector.enqueue(node.metadata["driver"] as FailureDomain)
+ injector.enqueue(host as SimHost)
}
}
return CoroutineScope(coroutineScope.coroutineContext + job)
@@ -139,41 +133,39 @@ public fun createTraceReader(
)
}
-public data class ProvisionerResult(
- val metal: ProvisioningService,
- val provisioner: SimHostProvisioner,
- val compute: ComputeServiceImpl
-)
-
/**
- * Construct the environment for a VM provisioner and return the provisioner instance.
+ * Construct the environment for a simulated compute service..
*/
-public suspend fun createProvisioner(
+public fun createComputeService(
coroutineScope: CoroutineScope,
clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
eventTracer: EventTracer
-): ProvisionerResult {
- val environment = environmentReader.use { it.construct(coroutineScope, clock) }
- val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
-
- // Wait for the bare metal nodes to be spawned
- delay(10)
-
- val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider())
- val hosts = provisioner.provisionAll()
+): ComputeServiceImpl {
+ val hosts = environmentReader
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ coroutineScope.coroutineContext,
+ clock,
+ SimFairShareHypervisorProvider(),
+ def.powerModel
+ )
+ }
- val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+ val scheduler =
+ ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
for (host in hosts) {
scheduler.addHost(host)
}
- // Wait for the hypervisors to be spawned
- delay(10)
-
- return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler)
+ return scheduler
}
/**
@@ -186,25 +178,16 @@ public fun attachMonitor(
scheduler: ComputeService,
monitor: ExperimentMonitor
) {
-
- val hypervisors = scheduler.hosts
-
- // Monitor hypervisor events
- for (hypervisor in hypervisors) {
- // TODO Do not expose Host directly but use Hypervisor class.
- val server = (hypervisor as SimHost).node
- monitor.reportHostStateChange(clock.millis(), hypervisor, server)
- server.events
- .onEach { event ->
- val time = clock.millis()
- when (event) {
- is NodeEvent.StateChanged -> {
- monitor.reportHostStateChange(time, hypervisor, event.node)
- }
- }
+ // Monitor host events
+ for (host in scheduler.hosts) {
+ monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, newState: HostState) {
+ monitor.reportHostStateChange(clock.millis(), host, newState)
}
- .launchIn(coroutineScope)
- hypervisor.events
+ })
+
+ host.events
.onEach { event ->
when (event) {
is HostEvent.SliceFinished -> monitor.reportHostSlice(
@@ -216,15 +199,14 @@ public fun attachMonitor(
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- (event.driver as SimHost).node
+ event.driver
)
}
}
.launchIn(coroutineScope)
- val driver = server.metadata["driver"] as SimBareMetalDriver
- driver.powerDraw
- .onEach { monitor.reportPowerConsumption(server, it) }
+ (host as SimHost).powerDraw
+ .onEach { monitor.reportPowerConsumption(host, it) }
.launchIn(coroutineScope)
}
@@ -244,29 +226,32 @@ public fun attachMonitor(
public suspend fun processTrace(
coroutineScope: CoroutineScope,
clock: Clock,
- reader: TraceReader<ComputeWorkload>,
+ reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
chan: Channel<Unit>,
monitor: ExperimentMonitor
) {
val client = scheduler.newClient()
+ val image = client.newImage("vm-image")
try {
var submitted = 0
while (reader.hasNext()) {
- val (time, workload) = reader.next()
+ val entry = reader.next()
submitted++
- delay(max(0, time - clock.millis()))
+ delay(max(0, entry.start - clock.millis()))
coroutineScope.launch {
chan.send(Unit)
val server = client.newServer(
- workload.image.name,
- workload.image,
- Flavor(
- workload.image.tags["cores"] as Int,
- workload.image.tags["required-memory"] as Long
- )
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta
)
server.watch(object : ServerWatcher {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index ff0a026d..f9c96bb6 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -35,10 +35,6 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
-import org.opendc.experiments.capelin.experiment.attachMonitor
-import org.opendc.experiments.capelin.experiment.createFailureDomain
-import org.opendc.experiments.capelin.experiment.createProvisioner
-import org.opendc.experiments.capelin.experiment.processTrace
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -157,7 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
)
testScope.launch {
- val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
+ val scheduler = createComputeService(
this,
clock,
environment,
@@ -172,7 +168,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
clock,
seeder.nextInt(),
operationalPhenomena.failureFrequency,
- bareMetalProvisioner,
+ scheduler,
chan
)
} else {
@@ -197,7 +193,6 @@ public abstract class Portfolio(name: String) : Experiment(name) {
failureDomain?.cancel()
scheduler.close()
- provisioner.close()
}
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 1e42cf56..14cc06dc 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -26,7 +26,7 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
-import org.opendc.metal.Node
+import org.opendc.compute.service.driver.HostState
import java.io.Closeable
/**
@@ -41,17 +41,12 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a host changes.
*/
- public fun reportHostStateChange(
- time: Long,
- driver: Host,
- host: Node
- ) {
- }
+ public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
/**
* Report the power consumption of a host.
*/
- public fun reportPowerConsumption(host: Node, draw: Double) {}
+ public fun reportPowerConsumption(host: Host, draw: Double) {}
/**
* This method is invoked for a host for each slice that is finishes.
@@ -65,7 +60,7 @@ public interface ExperimentMonitor : Closeable {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long = 5 * 60 * 1000L
) {
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index 98052214..c9d57a98 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -27,11 +27,11 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import org.opendc.metal.Node
import java.io.File
/**
@@ -51,7 +51,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
File(base, "provisioner-metrics/$partition/data.parquet"),
bufferSize
)
- private val currentHostEvent = mutableMapOf<Node, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Host, HostEvent>()
private var startTime = -1L
override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
@@ -63,12 +63,8 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportHostStateChange(
- time: Long,
- driver: Host,
- host: Node
- ) {
- logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
+ override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
val previousEvent = currentHostEvent[host]
@@ -97,9 +93,9 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
)
}
- private val lastPowerConsumption = mutableMapOf<Node, Double>()
+ private val lastPowerConsumption = mutableMapOf<Host, Double>()
- override fun reportPowerConsumption(host: Node, draw: Double) {
+ override fun reportPowerConsumption(host: Host, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -112,7 +108,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long
) {
val previousEvent = currentHostEvent[host]
@@ -130,7 +126,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -148,7 +144,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -168,7 +164,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
index e7b6a7bb..899fc9b1 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.metal.Node
+import org.opendc.compute.service.driver.Host
/**
* A periodic report of the host machine metrics.
@@ -30,7 +30,7 @@ import org.opendc.metal.Node
public data class HostEvent(
override val timestamp: Long,
public val duration: Long,
- public val node: Node,
+ public val host: Host,
public val vmCount: Int,
public val requestedBurst: Long,
public val grantedBurst: Long,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
index b4fdd66a..4a3e7963 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
// record.put("portfolio_id", event.run.parent.parent.id)
// record.put("scenario_id", event.run.parent.id)
// record.put("run_id", event.run.id)
- record.put("host_id", event.node.name)
- record.put("state", event.node.state.name)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
record.put("timestamp", event.timestamp)
record.put("duration", event.duration)
record.put("vm_count", event.vmCount)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index f9630078..a8462a51 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -22,14 +22,13 @@
package org.opendc.experiments.capelin.trace
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimWorkload
import java.util.TreeSet
/**
@@ -45,11 +44,11 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
workload: Workload,
seed: Int
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The iterator over the actual trace.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>> =
+ private val iterator: Iterator<TraceEntry<SimWorkload>> =
rawReaders
.map { it.read() }
.run {
@@ -67,19 +66,11 @@ public class Sc20ParquetTraceReader(
this
else {
map { entry ->
- val image = entry.workload.image
- val id = image.name
+ val id = entry.name
val relevantPerformanceInterferenceModelItems =
performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
- val newImage =
- Image(
- image.uid,
- image.name,
- image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- )
- val newWorkload = entry.workload.copy(image = newImage)
- Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
+ entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
}
}
}
@@ -87,7 +78,7 @@ public class Sc20ParquetTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index b29bdc54..7ea5efe5 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -26,12 +26,10 @@ import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
import java.util.UUID
@@ -48,6 +46,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the fragments into memory.
*/
private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
.disableCompatibility()
.build()
@@ -59,11 +58,9 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val record = reader.read() ?: break
val id = record["id"].toString()
- val tick = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
- val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
duration,
@@ -83,13 +80,14 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> {
+ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ @Suppress("DEPRECATION")
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
.disableCompatibility()
.build()
var counter = 0
- val entries = mutableListOf<TraceEntryImpl>()
+ val entries = mutableListOf<TraceEntry<SimWorkload>>()
return try {
while (true) {
@@ -109,13 +107,9 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val workload = SimTraceWorkload(vmFragments)
- val vmWorkload = ComputeWorkload(
- uid,
- id,
- UnnamedUser,
- Image(
- uid,
- id,
+ entries.add(
+ TraceEntry(
+ uid, id, submissionTime, workload,
mapOf(
"submit-time" to submissionTime,
"end-time" to endTime,
@@ -126,7 +120,6 @@ public class Sc20RawParquetTraceReader(private val path: File) {
)
)
)
- entries.add(TraceEntryImpl(submissionTime, vmWorkload))
}
entries
@@ -141,7 +134,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* The entries in the trace.
*/
- private val entries: List<TraceEntryImpl>
+ private val entries: List<TraceEntry<SimWorkload>>
init {
val fragments = parseFragments(path)
@@ -151,21 +144,5 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the entries in the trace.
*/
- public fun read(): List<TraceEntry<ComputeWorkload>> = entries
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- internal data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
+ public fun read(): List<TraceEntry<SimWorkload>> = entries
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index c588fda3..9ab69572 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -31,14 +31,12 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
import java.io.Serializable
import java.util.SortedSet
@@ -62,11 +60,11 @@ public class Sc20StreamingParquetTraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* The intermediate buffer to store the read records in.
@@ -98,6 +96,7 @@ public class Sc20StreamingParquetTraceReader(
* The thread to read the records in.
*/
private val readerThread = thread(start = true, name = "sc20-reader") {
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
@@ -113,11 +112,9 @@ public class Sc20StreamingParquetTraceReader(
}
val id = record["id"].toString()
- val tick = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
- val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
duration,
@@ -167,6 +164,7 @@ public class Sc20StreamingParquetTraceReader(
val entries = mutableMapOf<String, GenericData.Record>()
val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
+ @Suppress("DEPRECATION")
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
@@ -236,35 +234,25 @@ public class Sc20StreamingParquetTraceReader(
Random(random.nextInt())
)
val workload = SimTraceWorkload(fragments)
- val vmWorkload = ComputeWorkload(
- uid,
- "VM Workload $id",
- UnnamedUser,
- Image(
- uid,
- id,
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- TraceEntryImpl(
- submissionTime,
- vmWorkload
+ TraceEntry(
+ uid, id, submissionTime, workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
}
- .sortedBy { it.submissionTime }
+ .sortedBy { it.start }
.toList()
.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {
readerThread.interrupt()
@@ -287,20 +275,4 @@ public class Sc20StreamingParquetTraceReader(
return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
}
}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index 881652f6..5c8727ea 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -23,12 +23,11 @@
package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
+import org.opendc.simulator.compute.workload.SimWorkload
import java.util.*
import kotlin.random.Random
@@ -38,11 +37,11 @@ private val logger = KotlinLogging.logger {}
* Sample the workload for the specified [run].
*/
public fun sampleWorkload(
- trace: List<TraceEntry<ComputeWorkload>>,
+ trace: List<TraceEntry<SimWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<ComputeWorkload>> {
+): List<TraceEntry<SimWorkload>> {
return when {
workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
workload.samplingStrategy == SamplingStrategy.HPC ->
@@ -58,24 +57,24 @@ public fun sampleWorkload(
* Sample a regular (non-HPC) workload.
*/
public fun sampleRegularWorkload(
- trace: List<TraceEntry<ComputeWorkload>>,
+ trace: List<TraceEntry<SimWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<ComputeWorkload>> {
+): List<TraceEntry<SimWorkload>> {
val fraction = subWorkload.fraction
val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
val totalLoad = if (workload is CompositeWorkload) {
workload.totalLoad
} else {
- shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ shuffled.sumByDouble { it.meta.getValue("total-load") as Double }
}
var currentLoad = 0.0
for (entry in shuffled) {
- val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ val entryLoad = entry.meta.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -93,23 +92,23 @@ public fun sampleRegularWorkload(
* Sample a HPC workload.
*/
public fun sampleHpcWorkload(
- trace: List<TraceEntry<ComputeWorkload>>,
+ trace: List<TraceEntry<SimWorkload>>,
workload: Workload,
seed: Int,
sampleOnLoad: Boolean
-): List<TraceEntry<ComputeWorkload>> {
+): List<TraceEntry<SimWorkload>> {
val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
val random = Random(seed)
val fraction = workload.fraction
val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.workload.image.name
+ val name = entry.name
name.matches(pattern)
}
val hpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
hpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -118,7 +117,7 @@ public fun sampleHpcWorkload(
val nonHpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
nonHpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -130,7 +129,7 @@ public fun sampleHpcWorkload(
val totalLoad = if (workload is CompositeWorkload) {
workload.totalLoad
} else {
- trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ trace.sumByDouble { it.meta.getValue("total-load") as Double }
}
logger.debug { "Total trace load: $totalLoad" }
@@ -139,12 +138,12 @@ public fun sampleHpcWorkload(
var nonHpcCount = 0
var nonHpcLoad = 0.0
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
if (sampleOnLoad) {
var currentLoad = 0.0
for (entry in hpcSequence) {
- val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ val entryLoad = entry.meta.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -156,7 +155,7 @@ public fun sampleHpcWorkload(
}
for (entry in nonHpcSequence) {
- val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ val entryLoad = entry.meta.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > 1) {
break
}
@@ -170,7 +169,7 @@ public fun sampleHpcWorkload(
hpcSequence
.take((fraction * trace.size).toInt())
.forEach { entry ->
- hpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ hpcLoad += entry.meta.getValue("total-load") as Double
hpcCount += 1
res.add(entry)
}
@@ -178,7 +177,7 @@ public fun sampleHpcWorkload(
nonHpcSequence
.take(((1 - fraction) * trace.size).toInt())
.forEach { entry ->
- nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ nonHpcLoad += entry.meta.getValue("total-load") as Double
nonHpcCount += 1
res.add(entry)
}
@@ -194,16 +193,7 @@ public fun sampleHpcWorkload(
/**
* Sample a random trace entry.
*/
-private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> {
- val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
- val image = Image(
- id,
- entry.workload.image.name,
- entry.workload.image.tags
- )
- val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
- return VmTraceEntry(vmWorkload, entry.submissionTime)
+private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
}
-
-private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) :
- TraceEntry<ComputeWorkload>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index dfc6b90b..4e6cfddc 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,13 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
-import org.opendc.experiments.capelin.experiment.attachMonitor
-import org.opendc.experiments.capelin.experiment.createFailureDomain
-import org.opendc.experiments.capelin.experiment.createProvisioner
-import org.opendc.experiments.capelin.experiment.processTrace
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
@@ -46,7 +42,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
-import org.opendc.metal.Node
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import java.io.File
@@ -101,15 +97,13 @@ class CapelinIntegrationTest {
val tracer = EventTracer(clock)
testScope.launch {
- val res = createProvisioner(
+ scheduler = createComputeService(
this,
clock,
environmentReader,
allocationPolicy,
tracer
)
- val bareMetalProvisioner = res.metal
- scheduler = res.compute
val failureDomain = if (failures) {
println("ENABLING failures")
@@ -118,7 +112,7 @@ class CapelinIntegrationTest {
clock,
seed,
24.0 * 7,
- bareMetalProvisioner,
+ scheduler,
chan
)
} else {
@@ -140,7 +134,6 @@ class CapelinIntegrationTest {
failureDomain?.cancel()
scheduler.close()
monitor.close()
- res.provisioner.close()
}
runSimulation()
@@ -166,7 +159,7 @@ class CapelinIntegrationTest {
val tracer = EventTracer(clock)
testScope.launch {
- val (_, provisioner, scheduler) = createProvisioner(
+ val scheduler = createComputeService(
this,
clock,
environmentReader,
@@ -187,7 +180,6 @@ class CapelinIntegrationTest {
scheduler.close()
monitor.close()
- provisioner.close()
}
runSimulation()
@@ -209,7 +201,7 @@ class CapelinIntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
return Sc20ParquetTraceReader(
listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
emptyMap(),
@@ -241,7 +233,7 @@ class CapelinIntegrationTest {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long
) {
totalRequestedBurst += requestedBurst
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
index 00aa0395..02e77c7c 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
@@ -30,10 +30,9 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-harness"))
implementation(project(":opendc-format"))
- implementation(project(":opendc-workflows"))
+ implementation(project(":opendc-workflow:opendc-workflow-service"))
implementation(project(":opendc-simulator:opendc-simulator-core"))
implementation(project(":opendc-compute:opendc-compute-simulator"))
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
index 7b9d70ed..9e305b3d 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
@@ -26,23 +26,22 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.test.TestCoroutineScope
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.simulator.SimHostProvisioner
+import org.opendc.compute.simulator.SimHost
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
-import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.enable
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.WorkflowEvent
-import org.opendc.workflows.service.WorkflowSchedulerMode
-import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
-import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
-import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import org.opendc.workflow.service.WorkflowEvent
+import org.opendc.workflow.service.WorkflowService
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
import java.io.File
import java.io.FileInputStream
import kotlin.math.max
@@ -84,16 +83,20 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
}
testScope.launch {
- val environment = Sc18EnvironmentReader(FileInputStream(File(environment)))
- .use { it.construct(testScope, clock) }
+ val hosts = Sc18EnvironmentReader(FileInputStream(File(environment)))
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ testScope.coroutineContext,
+ clock,
+ SimSpaceSharedHypervisorProvider()
+ )
+ }
- val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
-
- // Wait for the bare metal nodes to be spawned
- delay(10)
-
- val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
- val hosts = provisioner.provisionAll()
val compute = ComputeService(
testScope.coroutineContext,
clock,
@@ -103,11 +106,8 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
hosts.forEach { compute.addHost(it) }
- // Wait for the hypervisors to be spawned
- delay(10)
-
- val scheduler = StageWorkflowService(
- testScope,
+ val scheduler = WorkflowService(
+ testScope.coroutineContext,
clock,
tracer,
compute.newClient(),
@@ -121,9 +121,9 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
val reader = GwfTraceReader(File(trace))
while (reader.hasNext()) {
- val (time, job) = reader.next()
- delay(max(0, time * 1000 - clock.millis()))
- scheduler.submit(job)
+ val entry = reader.next()
+ delay(max(0, entry.start * 1000 - clock.millis()))
+ scheduler.submit(entry.workload)
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
index dbd04b87..a8356888 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.sc18
import org.opendc.trace.core.EventStream
import org.opendc.trace.core.onEvent
-import org.opendc.workflows.service.WorkflowEvent
+import org.opendc.workflow.service.WorkflowEvent
import java.util.*
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine