diff options
Diffstat (limited to 'simulator/opendc-experiments')
13 files changed, 184 insertions, 155 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 8f3e686a..a5cf4fc0 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,22 +32,26 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import mu.KotlinLogging -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.ServerEvent -import org.opendc.compute.core.metal.NODE_CLUSTER -import org.opendc.compute.core.metal.driver.BareMetalDriver -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.service.VirtProvisioningEvent -import org.opendc.compute.core.workload.VmWorkload +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Flavor +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AllocationPolicy import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AllocationPolicy +import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.metal.NODE_CLUSTER +import org.opendc.metal.NodeEvent +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.failures.CorrelatedFaultInjector @@ -135,6 +139,12 @@ public fun createTraceReader( ) } +public data class ProvisionerResult( + val metal: ProvisioningService, + val provisioner: SimHostProvisioner, + val compute: ComputeServiceImpl +) + /** * Construct the environment for a VM provisioner and return the provisioner instance. */ @@ -144,45 +154,52 @@ public suspend fun createProvisioner( environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy, eventTracer: EventTracer -): Pair<ProvisioningService, SimVirtProvisioningService> { +): ProvisionerResult { val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider()) + val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider()) + val hosts = provisioner.provisionAll() + + val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl + + for (host in hosts) { + scheduler.addHost(host) + } // Wait for the hypervisors to be spawned delay(10) - return bareMetalProvisioner to scheduler + return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler) } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public suspend fun attachMonitor( +public fun attachMonitor( coroutineScope: CoroutineScope, clock: Clock, - scheduler: SimVirtProvisioningService, + scheduler: ComputeService, monitor: ExperimentMonitor ) { - val hypervisors = scheduler.drivers() + val hypervisors = scheduler.hosts // Monitor hypervisor events for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).server + // TODO Do not expose Host directly but use Hypervisor class. + val server = (hypervisor as SimHost).node monitor.reportHostStateChange(clock.millis(), hypervisor, server) server.events .onEach { event -> val time = clock.millis() when (event) { - is ServerEvent.StateChanged -> { - monitor.reportHostStateChange(time, hypervisor, event.server) + is NodeEvent.StateChanged -> { + monitor.reportHostStateChange(time, hypervisor, event.node) } } } @@ -190,7 +207,7 @@ public suspend fun attachMonitor( hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + is HostEvent.SliceFinished -> monitor.reportHostSlice( clock.millis(), event.requestedBurst, event.grantedBurst, @@ -199,22 +216,22 @@ public suspend fun attachMonitor( event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + (event.driver as SimHost).node ) } } .launchIn(coroutineScope) - val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver + val driver = server.metadata["driver"] as SimBareMetalDriver driver.powerDraw - .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } + .onEach { monitor.reportPowerConsumption(server, it) } .launchIn(coroutineScope) } scheduler.events .onEach { event -> when (event) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> monitor.reportProvisionerMetrics(clock.millis(), event) } } @@ -227,11 +244,12 @@ public suspend fun attachMonitor( public suspend fun processTrace( coroutineScope: CoroutineScope, clock: Clock, - reader: TraceReader<VmWorkload>, - scheduler: SimVirtProvisioningService, + reader: TraceReader<ComputeWorkload>, + scheduler: ComputeService, chan: Channel<Unit>, monitor: ExperimentMonitor ) { + val client = scheduler.newClient() try { var submitted = 0 @@ -242,7 +260,7 @@ public suspend fun processTrace( delay(max(0, time - clock.millis())) coroutineScope.launch { chan.send(Unit) - val server = scheduler.deploy( + val server = client.newServer( workload.image.name, workload.image, Flavor( @@ -250,21 +268,19 @@ public suspend fun processTrace( workload.image.tags["required-memory"] as Long ) ) - // Monitor server events - server.events - .onEach { - if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(clock.millis(), it.server) - } + + server.watch(object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor.reportVmStateChange(clock.millis(), server, newState) } - .collect() + }) } } scheduler.events .takeWhile { when (it) { - is VirtProvisioningEvent.MetricsAvailable -> + is ComputeServiceEvent.MetricsAvailable -> it.inactiveVmCount + it.failedVmCount != submitted } } @@ -272,5 +288,6 @@ public suspend fun processTrace( delay(1) } finally { reader.close() + client.close() } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 75b0d735..ff0a026d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy +import org.opendc.compute.service.scheduler.RandomAllocationPolicy import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain @@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { ) testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( + val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner( this, clock, environment, @@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) { logger.debug("FINISHED=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() + provisioner.close() } try { diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 3c6637bf..1e42cf56 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -22,9 +22,11 @@ package org.opendc.experiments.capelin.monitor -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host +import org.opendc.metal.Node import java.io.Closeable /** @@ -34,22 +36,22 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked when the state of a VM changes. */ - public fun reportVmStateChange(time: Long, server: Server) {} + public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} /** * This method is invoked when the state of a host changes. */ public fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { } /** * Report the power consumption of a host. */ - public fun reportPowerConsumption(host: Server, draw: Double) {} + public fun reportPowerConsumption(host: Node, draw: Double) {} /** * This method is invoked for a host for each slice that is finishes. @@ -63,7 +65,7 @@ public interface ExperimentMonitor : Closeable { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long = 5 * 60 * 1000L ) { } @@ -71,5 +73,5 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index a0d57656..98052214 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -23,13 +23,15 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging -import org.opendc.compute.core.Server -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.service.ComputeServiceEvent +import org.opendc.compute.service.driver.Host import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter +import org.opendc.metal.Node import java.io.File /** @@ -49,10 +51,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: File(base, "provisioner-metrics/$partition/data.parquet"), bufferSize ) - private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private val currentHostEvent = mutableMapOf<Node, HostEvent>() private var startTime = -1L - override fun reportVmStateChange(time: Long, server: Server) { + override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) { if (startTime < 0) { startTime = time @@ -63,12 +65,12 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: override fun reportHostStateChange( time: Long, - driver: VirtDriver, - server: Server + driver: Host, + host: Node ) { - logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } - val previousEvent = currentHostEvent[server] + val previousEvent = currentHostEvent[host] val roundedTime = previousEvent?.let { val duration = time - it.timestamp @@ -91,13 +93,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: 0.0, 0.0, 0, - server + host ) } - private val lastPowerConsumption = mutableMapOf<Server, Double>() + private val lastPowerConsumption = mutableMapOf<Node, Double>() - override fun reportPowerConsumption(host: Server, draw: Double) { + override fun reportPowerConsumption(host: Node, draw: Double) { lastPowerConsumption[host] = draw } @@ -110,16 +112,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { - val previousEvent = currentHostEvent[hostServer] + val previousEvent = currentHostEvent[host] when { previousEvent == null -> { val event = HostEvent( time, 5 * 60 * 1000L, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -127,17 +129,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } previousEvent.timestamp == time -> { val event = HostEvent( time, previousEvent.duration, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -145,11 +147,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } else -> { hostWriter.write(previousEvent) @@ -157,7 +159,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: val event = HostEvent( time, time - previousEvent.timestamp, - hostServer, + host, numberOfDeployedImages, requestedBurst, grantedBurst, @@ -165,16 +167,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0, - hostServer.flavor.cpuCount + lastPowerConsumption[host] ?: 200.0, + host.flavor.cpuCount ) - currentHostEvent[hostServer] = event + currentHostEvent[host] = event } } } - override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { provisionerWriter.write( ProvisionerEvent( time, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt index e5e9d520..e7b6a7bb 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.metal.Node /** * A periodic report of the host machine metrics. @@ -30,7 +30,7 @@ import org.opendc.compute.core.Server public data class HostEvent( override val timestamp: Long, public val duration: Long, - public val host: Server, + public val node: Node, public val vmCount: Int, public val requestedBurst: Long, public val grantedBurst: Long, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt index 427c453a..7631f55f 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt @@ -22,7 +22,7 @@ package org.opendc.experiments.capelin.telemetry -import org.opendc.compute.core.Server +import org.opendc.compute.api.Server /** * A periodic report of a virtual machine's metrics. diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt index 4a3e7963..b4fdd66a 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt @@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : // record.put("portfolio_id", event.run.parent.parent.id) // record.put("scenario_id", event.run.parent.id) // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) + record.put("host_id", event.node.name) + record.put("state", event.node.state.name) record.put("timestamp", event.timestamp) record.put("duration", event.duration) record.put("vm_count", event.vmCount) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index 6cfdae40..f9630078 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -22,8 +22,8 @@ package org.opendc.experiments.capelin.trace -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.Workload import org.opendc.format.trace.TraceEntry @@ -45,11 +45,11 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, workload: Workload, seed: Int -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The iterator over the actual trace. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> = + private val iterator: Iterator<TraceEntry<ComputeWorkload>> = rawReaders .map { it.read() } .run { @@ -73,11 +73,10 @@ public class Sc20ParquetTraceReader( performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = - SimWorkloadImage( + Image( image.uid, image.name, image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - (image as SimWorkloadImage).workload ) val newWorkload = entry.workload.copy(image = newImage) Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload) @@ -88,7 +87,7 @@ public class Sc20ParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index d2560d62..b29bdc54 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -26,8 +26,8 @@ import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -108,11 +108,12 @@ public class Sc20RawParquetTraceReader(private val path: File) { val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(vmFragments) + val vmWorkload = ComputeWorkload( uid, id, UnnamedUser, - SimWorkloadImage( + Image( uid, id, mapOf( @@ -120,9 +121,9 @@ public class Sc20RawParquetTraceReader(private val path: File) { "end-time" to endTime, "total-load" to totalLoad, "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(vmFragments) + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) entries.add(TraceEntryImpl(submissionTime, vmWorkload)) @@ -150,7 +151,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the entries in the trace. */ - public fun read(): List<TraceEntry<VmWorkload>> = entries + public fun read(): List<TraceEntry<ComputeWorkload>> = entries /** * An unnamed user. @@ -165,6 +166,6 @@ public class Sc20RawParquetTraceReader(private val path: File) { */ internal data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index 12705c80..c588fda3 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -31,8 +31,8 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.core.User import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader @@ -62,11 +62,11 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel: PerformanceInterferenceModel, selectedVms: List<String>, random: Random -) : TraceReader<VmWorkload> { +) : TraceReader<ComputeWorkload> { /** * The internal iterator to use for this reader. */ - private val iterator: Iterator<TraceEntry<VmWorkload>> + private val iterator: Iterator<TraceEntry<ComputeWorkload>> /** * The intermediate buffer to store the read records in. @@ -235,19 +235,20 @@ public class Sc20StreamingParquetTraceReader( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), Random(random.nextInt()) ) - val vmWorkload = VmWorkload( + val workload = SimTraceWorkload(fragments) + val vmWorkload = ComputeWorkload( uid, "VM Workload $id", UnnamedUser, - SimWorkloadImage( + Image( uid, id, mapOf( IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to maxCores, - "required-memory" to requiredMemory - ), - SimTraceWorkload(fragments), + "required-memory" to requiredMemory, + "workload" to workload + ) ) ) @@ -263,7 +264,7 @@ public class Sc20StreamingParquetTraceReader( override fun hasNext(): Boolean = iterator.hasNext() - override fun next(): TraceEntry<VmWorkload> = iterator.next() + override fun next(): TraceEntry<ComputeWorkload> = iterator.next() override fun close() { readerThread.interrupt() @@ -300,6 +301,6 @@ public class Sc20StreamingParquetTraceReader( */ private data class TraceEntryImpl( override var submissionTime: Long, - override val workload: VmWorkload - ) : TraceEntry<VmWorkload> + override val workload: ComputeWorkload + ) : TraceEntry<ComputeWorkload> } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt index 4d9b9df1..881652f6 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt @@ -23,8 +23,8 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimWorkloadImage +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.api.Image import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.SamplingStrategy import org.opendc.experiments.capelin.model.Workload @@ -38,11 +38,11 @@ private val logger = KotlinLogging.logger {} * Sample the workload for the specified [run]. */ public fun sampleWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { return when { workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) workload.samplingStrategy == SamplingStrategy.HPC -> @@ -58,15 +58,15 @@ public fun sampleWorkload( * Sample a regular (non-HPC) workload. */ public fun sampleRegularWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, subWorkload: Workload, seed: Int -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() val totalLoad = if (workload is CompositeWorkload) { workload.totalLoad } else { @@ -93,11 +93,11 @@ public fun sampleRegularWorkload( * Sample a HPC workload. */ public fun sampleHpcWorkload( - trace: List<TraceEntry<VmWorkload>>, + trace: List<TraceEntry<ComputeWorkload>>, workload: Workload, seed: Int, sampleOnLoad: Boolean -): List<TraceEntry<VmWorkload>> { +): List<TraceEntry<ComputeWorkload>> { val pattern = Regex("^vm__workload__(ComputeNode|cn).*") val random = Random(seed) @@ -109,7 +109,7 @@ public fun sampleHpcWorkload( val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() hpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -118,7 +118,7 @@ public fun sampleHpcWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() nonHpc.mapTo(res) { sample(it, index) } res.shuffle(random) res @@ -139,7 +139,7 @@ public fun sampleHpcWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<TraceEntry<VmWorkload>>() + val res = mutableListOf<TraceEntry<ComputeWorkload>>() if (sampleOnLoad) { var currentLoad = 0.0 @@ -194,17 +194,16 @@ public fun sampleHpcWorkload( /** * Sample a random trace entry. */ -private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> { +private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> { val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray()) - val image = SimWorkloadImage( + val image = Image( id, entry.workload.image.name, - entry.workload.image.tags, - (entry.workload.image as SimWorkloadImage).workload + entry.workload.image.tags ) val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name) return VmTraceEntry(vmWorkload, entry.submissionTime) } -private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) : - TraceEntry<VmWorkload> +private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) : + TraceEntry<ComputeWorkload> diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 6a0796f6..dfc6b90b 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,10 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.core.Server -import org.opendc.compute.core.workload.VmWorkload -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.api.ComputeWorkload +import org.opendc.compute.service.internal.ComputeServiceImpl +import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.experiment.attachMonitor import org.opendc.experiments.capelin.experiment.createFailureDomain import org.opendc.experiments.capelin.experiment.createProvisioner @@ -47,6 +46,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader +import org.opendc.metal.Node import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import java.io.File @@ -97,7 +97,7 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: SimVirtProvisioningService + lateinit var scheduler: ComputeServiceImpl val tracer = EventTracer(clock) testScope.launch { @@ -108,8 +108,8 @@ class CapelinIntegrationTest { allocationPolicy, tracer ) - val bareMetalProvisioner = res.first - scheduler = res.second + val bareMetalProvisioner = res.metal + scheduler = res.compute val failureDomain = if (failures) { println("ENABLING failures") @@ -138,8 +138,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") failureDomain?.cancel() - scheduler.terminate() + scheduler.close() monitor.close() + res.provisioner.close() } runSimulation() @@ -148,9 +149,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1684849230562, monitor.totalRequestedBurst) }, - { assertEquals(447612683996, monitor.totalGrantedBurst) }, - { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) }, + { assertEquals(1678587333640, monitor.totalRequestedBurst) }, + { assertEquals(438118200924, monitor.totalGrantedBurst) }, + { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -162,19 +163,16 @@ class CapelinIntegrationTest { val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - lateinit var scheduler: SimVirtProvisioningService val tracer = EventTracer(clock) testScope.launch { - val res = createProvisioner( + val (_, provisioner, scheduler) = createProvisioner( this, clock, environmentReader, allocationPolicy, tracer ) - scheduler = res.second - attachMonitor(this, clock, scheduler, monitor) processTrace( this, @@ -187,8 +185,9 @@ class CapelinIntegrationTest { println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") - scheduler.terminate() + scheduler.close() monitor.close() + provisioner.close() } runSimulation() @@ -210,7 +209,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> { + private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> { return Sc20ParquetTraceReader( listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), @@ -242,7 +241,7 @@ class CapelinIntegrationTest { cpuUsage: Double, cpuDemand: Double, numberOfDeployedImages: Int, - hostServer: Server, + host: Node, duration: Long ) { totalRequestedBurst += requestedBurst diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index fc979363..7b9d70ed 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -24,13 +24,14 @@ package org.opendc.experiments.sc18 import kotlinx.coroutines.* import kotlinx.coroutines.test.TestCoroutineScope -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.simulator.SimVirtProvisioningService -import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy +import org.opendc.compute.simulator.SimHostProvisioner import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf +import org.opendc.metal.service.ProvisioningService import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer @@ -91,16 +92,17 @@ public class UnderspecificationExperiment : Experiment("underspecification") { // Wait for the bare metal nodes to be spawned delay(10) - val provisioner = SimVirtProvisioningService( - testScope, + val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider()) + val hosts = provisioner.provisionAll() + val compute = ComputeService( + testScope.coroutineContext, clock, - bareMetal, - NumberOfActiveServersAllocationPolicy(), tracer, - SimSpaceSharedHypervisorProvider(), - schedulingQuantum = 1000 + NumberOfActiveServersAllocationPolicy(), ) + hosts.forEach { compute.addHost(it) } + // Wait for the hypervisors to be spawned delay(10) @@ -108,7 +110,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") { testScope, clock, tracer, - provisioner, + compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), |
