summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt91
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt11
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt20
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt56
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt13
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt21
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt25
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt35
12 files changed, 172 insertions, 145 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 8f3e686a..a5cf4fc0 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -32,22 +32,26 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.metal.NODE_CLUSTER
-import org.opendc.compute.core.metal.driver.BareMetalDriver
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
-import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimBareMetalDriver
-import org.opendc.compute.simulator.SimVirtDriver
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.AllocationPolicy
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.metal.NODE_CLUSTER
+import org.opendc.metal.NodeEvent
+import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.failures.CorrelatedFaultInjector
@@ -135,6 +139,12 @@ public fun createTraceReader(
)
}
+public data class ProvisionerResult(
+ val metal: ProvisioningService,
+ val provisioner: SimHostProvisioner,
+ val compute: ComputeServiceImpl
+)
+
/**
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
@@ -144,45 +154,52 @@ public suspend fun createProvisioner(
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
eventTracer: EventTracer
-): Pair<ProvisioningService, SimVirtProvisioningService> {
+): ProvisionerResult {
val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider())
+ val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+
+ val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+
+ for (host in hosts) {
+ scheduler.addHost(host)
+ }
// Wait for the hypervisors to be spawned
delay(10)
- return bareMetalProvisioner to scheduler
+ return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler)
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun attachMonitor(
+public fun attachMonitor(
coroutineScope: CoroutineScope,
clock: Clock,
- scheduler: SimVirtProvisioningService,
+ scheduler: ComputeService,
monitor: ExperimentMonitor
) {
- val hypervisors = scheduler.drivers()
+ val hypervisors = scheduler.hosts
// Monitor hypervisor events
for (hypervisor in hypervisors) {
- // TODO Do not expose VirtDriver directly but use Hypervisor class.
- val server = (hypervisor as SimVirtDriver).server
+ // TODO Do not expose Host directly but use Hypervisor class.
+ val server = (hypervisor as SimHost).node
monitor.reportHostStateChange(clock.millis(), hypervisor, server)
server.events
.onEach { event ->
val time = clock.millis()
when (event) {
- is ServerEvent.StateChanged -> {
- monitor.reportHostStateChange(time, hypervisor, event.server)
+ is NodeEvent.StateChanged -> {
+ monitor.reportHostStateChange(time, hypervisor, event.node)
}
}
}
@@ -190,7 +207,7 @@ public suspend fun attachMonitor(
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
+ is HostEvent.SliceFinished -> monitor.reportHostSlice(
clock.millis(),
event.requestedBurst,
event.grantedBurst,
@@ -199,22 +216,22 @@ public suspend fun attachMonitor(
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer
+ (event.driver as SimHost).node
)
}
}
.launchIn(coroutineScope)
- val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver
+ val driver = server.metadata["driver"] as SimBareMetalDriver
driver.powerDraw
- .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
+ .onEach { monitor.reportPowerConsumption(server, it) }
.launchIn(coroutineScope)
}
scheduler.events
.onEach { event ->
when (event) {
- is VirtProvisioningEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable ->
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
@@ -227,11 +244,12 @@ public suspend fun attachMonitor(
public suspend fun processTrace(
coroutineScope: CoroutineScope,
clock: Clock,
- reader: TraceReader<VmWorkload>,
- scheduler: SimVirtProvisioningService,
+ reader: TraceReader<ComputeWorkload>,
+ scheduler: ComputeService,
chan: Channel<Unit>,
monitor: ExperimentMonitor
) {
+ val client = scheduler.newClient()
try {
var submitted = 0
@@ -242,7 +260,7 @@ public suspend fun processTrace(
delay(max(0, time - clock.millis()))
coroutineScope.launch {
chan.send(Unit)
- val server = scheduler.deploy(
+ val server = client.newServer(
workload.image.name,
workload.image,
Flavor(
@@ -250,21 +268,19 @@ public suspend fun processTrace(
workload.image.tags["required-memory"] as Long
)
)
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(clock.millis(), it.server)
- }
+
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
}
- .collect()
+ })
}
}
scheduler.events
.takeWhile {
when (it) {
- is VirtProvisioningEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable ->
it.inactiveVmCount + it.failedVmCount != submitted
}
}
@@ -272,5 +288,6 @@ public suspend fun processTrace(
delay(1)
} finally {
reader.close()
+ client.close()
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 75b0d735..ff0a026d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
+import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
@@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
)
testScope.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(
+ val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
this,
clock,
environment,
@@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) {
logger.debug("FINISHED=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
+ provisioner.close()
}
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 3c6637bf..1e42cf56 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -22,9 +22,11 @@
package org.opendc.experiments.capelin.monitor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.metal.Node
import java.io.Closeable
/**
@@ -34,22 +36,22 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
- public fun reportVmStateChange(time: Long, server: Server) {}
+ public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
/**
* This method is invoked when the state of a host changes.
*/
public fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
- server: Server
+ driver: Host,
+ host: Node
) {
}
/**
* Report the power consumption of a host.
*/
- public fun reportPowerConsumption(host: Server, draw: Double) {}
+ public fun reportPowerConsumption(host: Node, draw: Double) {}
/**
* This method is invoked for a host for each slice that is finishes.
@@ -63,7 +65,7 @@ public interface ExperimentMonitor : Closeable {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long = 5 * 60 * 1000L
) {
}
@@ -71,5 +73,5 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index a0d57656..98052214 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -23,13 +23,15 @@
package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import org.opendc.experiments.capelin.telemetry.HostEvent
import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
+import org.opendc.metal.Node
import java.io.File
/**
@@ -49,10 +51,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
File(base, "provisioner-metrics/$partition/data.parquet"),
bufferSize
)
- private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
if (startTime < 0) {
startTime = time
@@ -63,12 +65,12 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
override fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
- server: Server
+ driver: Host,
+ host: Node
) {
- logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+ logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
- val previousEvent = currentHostEvent[server]
+ val previousEvent = currentHostEvent[host]
val roundedTime = previousEvent?.let {
val duration = time - it.timestamp
@@ -91,13 +93,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
0.0,
0.0,
0,
- server
+ host
)
}
- private val lastPowerConsumption = mutableMapOf<Server, Double>()
+ private val lastPowerConsumption = mutableMapOf<Node, Double>()
- override fun reportPowerConsumption(host: Server, draw: Double) {
+ override fun reportPowerConsumption(host: Node, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -110,16 +112,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
- val previousEvent = currentHostEvent[hostServer]
+ val previousEvent = currentHostEvent[host]
when {
previousEvent == null -> {
val event = HostEvent(
time,
5 * 60 * 1000L,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -127,17 +129,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
previousEvent.timestamp == time -> {
val event = HostEvent(
time,
previousEvent.duration,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -145,11 +147,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
else -> {
hostWriter.write(previousEvent)
@@ -157,7 +159,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
val event = HostEvent(
time,
time - previousEvent.timestamp,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -165,16 +167,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
}
}
- override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
provisionerWriter.write(
ProvisionerEvent(
time,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
index e5e9d520..e7b6a7bb 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.compute.core.Server
+import org.opendc.metal.Node
/**
* A periodic report of the host machine metrics.
@@ -30,7 +30,7 @@ import org.opendc.compute.core.Server
public data class HostEvent(
override val timestamp: Long,
public val duration: Long,
- public val host: Server,
+ public val node: Node,
public val vmCount: Int,
public val requestedBurst: Long,
public val grantedBurst: Long,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
index 427c453a..7631f55f 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.compute.core.Server
+import org.opendc.compute.api.Server
/**
* A periodic report of a virtual machine's metrics.
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
index 4a3e7963..b4fdd66a 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
// record.put("portfolio_id", event.run.parent.parent.id)
// record.put("scenario_id", event.run.parent.id)
// record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
+ record.put("host_id", event.node.name)
+ record.put("state", event.node.state.name)
record.put("timestamp", event.timestamp)
record.put("duration", event.duration)
record.put("vm_count", event.vmCount)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index 6cfdae40..f9630078 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -22,8 +22,8 @@
package org.opendc.experiments.capelin.trace
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
@@ -45,11 +45,11 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
workload: Workload,
seed: Int
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The iterator over the actual trace.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>> =
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>> =
rawReaders
.map { it.read() }
.run {
@@ -73,11 +73,10 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
val newImage =
- SimWorkloadImage(
+ Image(
image.uid,
image.name,
image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- (image as SimWorkloadImage).workload
)
val newWorkload = entry.workload.copy(image = newImage)
Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
@@ -88,7 +87,7 @@ public class Sc20ParquetTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index d2560d62..b29bdc54 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -26,8 +26,8 @@ import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -108,11 +108,12 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(vmFragments)
+ val vmWorkload = ComputeWorkload(
uid,
id,
UnnamedUser,
- SimWorkloadImage(
+ Image(
uid,
id,
mapOf(
@@ -120,9 +121,9 @@ public class Sc20RawParquetTraceReader(private val path: File) {
"end-time" to endTime,
"total-load" to totalLoad,
"cores" to maxCores,
- "required-memory" to requiredMemory
- ),
- SimTraceWorkload(vmFragments)
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
)
entries.add(TraceEntryImpl(submissionTime, vmWorkload))
@@ -150,7 +151,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the entries in the trace.
*/
- public fun read(): List<TraceEntry<VmWorkload>> = entries
+ public fun read(): List<TraceEntry<ComputeWorkload>> = entries
/**
* An unnamed user.
@@ -165,6 +166,6 @@ public class Sc20RawParquetTraceReader(private val path: File) {
*/
internal data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index 12705c80..c588fda3 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -31,8 +31,8 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -62,11 +62,11 @@ public class Sc20StreamingParquetTraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>>
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>>
/**
* The intermediate buffer to store the read records in.
@@ -235,19 +235,20 @@ public class Sc20StreamingParquetTraceReader(
performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
Random(random.nextInt())
)
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(fragments)
+ val vmWorkload = ComputeWorkload(
uid,
"VM Workload $id",
UnnamedUser,
- SimWorkloadImage(
+ Image(
uid,
id,
mapOf(
IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
"cores" to maxCores,
- "required-memory" to requiredMemory
- ),
- SimTraceWorkload(fragments),
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
)
@@ -263,7 +264,7 @@ public class Sc20StreamingParquetTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {
readerThread.interrupt()
@@ -300,6 +301,6 @@ public class Sc20StreamingParquetTraceReader(
*/
private data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index 4d9b9df1..881652f6 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -23,8 +23,8 @@
package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
@@ -38,11 +38,11 @@ private val logger = KotlinLogging.logger {}
* Sample the workload for the specified [run].
*/
public fun sampleWorkload(
- trace: List<TraceEntry<VmWorkload>>,
+ trace: List<TraceEntry<ComputeWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<VmWorkload>> {
+): List<TraceEntry<ComputeWorkload>> {
return when {
workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
workload.samplingStrategy == SamplingStrategy.HPC ->
@@ -58,15 +58,15 @@ public fun sampleWorkload(
* Sample a regular (non-HPC) workload.
*/
public fun sampleRegularWorkload(
- trace: List<TraceEntry<VmWorkload>>,
+ trace: List<TraceEntry<ComputeWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<VmWorkload>> {
+): List<TraceEntry<ComputeWorkload>> {
val fraction = subWorkload.fraction
val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
val totalLoad = if (workload is CompositeWorkload) {
workload.totalLoad
} else {
@@ -93,11 +93,11 @@ public fun sampleRegularWorkload(
* Sample a HPC workload.
*/
public fun sampleHpcWorkload(
- trace: List<TraceEntry<VmWorkload>>,
+ trace: List<TraceEntry<ComputeWorkload>>,
workload: Workload,
seed: Int,
sampleOnLoad: Boolean
-): List<TraceEntry<VmWorkload>> {
+): List<TraceEntry<ComputeWorkload>> {
val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
val random = Random(seed)
@@ -109,7 +109,7 @@ public fun sampleHpcWorkload(
val hpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
hpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -118,7 +118,7 @@ public fun sampleHpcWorkload(
val nonHpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
nonHpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -139,7 +139,7 @@ public fun sampleHpcWorkload(
var nonHpcCount = 0
var nonHpcLoad = 0.0
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
if (sampleOnLoad) {
var currentLoad = 0.0
@@ -194,17 +194,16 @@ public fun sampleHpcWorkload(
/**
* Sample a random trace entry.
*/
-private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> {
+private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> {
val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
- val image = SimWorkloadImage(
+ val image = Image(
id,
entry.workload.image.name,
- entry.workload.image.tags,
- (entry.workload.image as SimWorkloadImage).workload
+ entry.workload.image.tags
)
val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
return VmTraceEntry(vmWorkload, entry.submissionTime)
}
-private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) :
- TraceEntry<VmWorkload>
+private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) :
+ TraceEntry<ComputeWorkload>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 6a0796f6..dfc6b90b 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,10 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
import org.opendc.experiments.capelin.experiment.createProvisioner
@@ -47,6 +46,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.metal.Node
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import java.io.File
@@ -97,7 +97,7 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: SimVirtProvisioningService
+ lateinit var scheduler: ComputeServiceImpl
val tracer = EventTracer(clock)
testScope.launch {
@@ -108,8 +108,8 @@ class CapelinIntegrationTest {
allocationPolicy,
tracer
)
- val bareMetalProvisioner = res.first
- scheduler = res.second
+ val bareMetalProvisioner = res.metal
+ scheduler = res.compute
val failureDomain = if (failures) {
println("ENABLING failures")
@@ -138,8 +138,9 @@ class CapelinIntegrationTest {
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
monitor.close()
+ res.provisioner.close()
}
runSimulation()
@@ -148,9 +149,9 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1684849230562, monitor.totalRequestedBurst) },
- { assertEquals(447612683996, monitor.totalGrantedBurst) },
- { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) },
+ { assertEquals(1678587333640, monitor.totalRequestedBurst) },
+ { assertEquals(438118200924, monitor.totalGrantedBurst) },
+ { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@@ -162,19 +163,16 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- lateinit var scheduler: SimVirtProvisioningService
val tracer = EventTracer(clock)
testScope.launch {
- val res = createProvisioner(
+ val (_, provisioner, scheduler) = createProvisioner(
this,
clock,
environmentReader,
allocationPolicy,
tracer
)
- scheduler = res.second
-
attachMonitor(this, clock, scheduler, monitor)
processTrace(
this,
@@ -187,8 +185,9 @@ class CapelinIntegrationTest {
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
- scheduler.terminate()
+ scheduler.close()
monitor.close()
+ provisioner.close()
}
runSimulation()
@@ -210,7 +209,7 @@ class CapelinIntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> {
return Sc20ParquetTraceReader(
listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
emptyMap(),
@@ -242,7 +241,7 @@ class CapelinIntegrationTest {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
totalRequestedBurst += requestedBurst