diff options
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src/main')
30 files changed, 0 insertions, 3327 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt deleted file mode 100644 index faabe5cb..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CompositeWorkloadPortfolio.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of a composite workload. - */ -public class CompositeWorkloadPortfolio : Portfolio("composite-workload") { - private val totalSampleLoad = 1.3301733005049648E12 - - override val topology: Topology by anyOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workload: Workload by anyOf( - CompositeWorkload( - "all-azure", - listOf(Workload("solvinity-short", 0.0), Workload("azure", 1.0)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-25-azure-75", - listOf(Workload("solvinity-short", 0.25), Workload("azure", 0.75)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-50-azure-50", - listOf(Workload("solvinity-short", 0.5), Workload("azure", 0.5)), - totalSampleLoad - ), - CompositeWorkload( - "solvinity-75-azure-25", - listOf(Workload("solvinity-short", 0.75), Workload("azure", 0.25)), - totalSampleLoad - ), - CompositeWorkload( - "all-solvinity", - listOf(Workload("solvinity-short", 1.0), Workload("azure", 0.0)), - totalSampleLoad - ) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt deleted file mode 100644 index 763234f8..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import mu.KotlinLogging -import org.opendc.compute.api.* -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.simulator.SimHost -import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter -import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.failures.CorrelatedFaultInjector -import org.opendc.simulator.failures.FaultInjector -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import java.io.File -import java.time.Clock -import kotlin.coroutines.coroutineContext -import kotlin.coroutines.resume -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random - -/** - * The logger for this experiment. - */ -private val logger = KotlinLogging.logger {} - -/** - * Construct the failure domain for the experiments. - */ -public fun createFailureDomain( - coroutineScope: CoroutineScope, - clock: Clock, - seed: Int, - failureInterval: Double, - service: ComputeService, - chan: Channel<Unit> -): CoroutineScope { - val job = coroutineScope.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf<String, FaultInjector>() - for (host in service.hosts) { - val cluster = host.meta["cluster"] as String - val injector = - injectors.getOrPut(cluster) { - createFaultInjector( - this, - clock, - random, - failureInterval - ) - } - injector.enqueue(host as SimHost) - } - } - return CoroutineScope(coroutineScope.coroutineContext + job) -} - -/** - * Obtain the [FaultInjector] to use for the experiments. - */ -public fun createFaultInjector( - coroutineScope: CoroutineScope, - clock: Clock, - random: Random, - failureInterval: Double -): FaultInjector { - // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 - // GRID'5000 - return CorrelatedFaultInjector( - coroutineScope, - clock, - iatScale = ln(failureInterval), iatShape = 1.03, // Hours - sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 - dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes - random = random - ) -} - -/** - * Create the trace reader from which the VM workloads are read. - */ -public fun createTraceReader( - path: File, - performanceInterferenceModel: PerformanceInterferenceModel, - vms: List<String>, - seed: Int -): Sc20StreamingParquetTraceReader { - return Sc20StreamingParquetTraceReader( - path, - performanceInterferenceModel, - vms, - Random(seed) - ) -} - -/** - * Construct the environment for a simulated compute service.. - */ -public suspend fun withComputeService( - clock: Clock, - meterProvider: MeterProvider, - environmentReader: EnvironmentReader, - scheduler: ComputeScheduler, - block: suspend CoroutineScope.(ComputeService) -> Unit -): Unit = coroutineScope { - val hosts = environmentReader - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - clock, - meterProvider.get("opendc-compute-simulator"), - SimFairShareHypervisorProvider(), - def.powerModel - ) - } - - val serviceMeter = meterProvider.get("opendc-compute") - val service = - ComputeService(coroutineContext, clock, serviceMeter, scheduler) - - for (host in hosts) { - service.addHost(host) - } - - try { - block(this, service) - } finally { - service.close() - hosts.forEach(SimHost::close) - } -} - -/** - * Attach the specified monitor to the VM provisioner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun withMonitor( - monitor: ExperimentMonitor, - clock: Clock, - metricProducer: MetricProducer, - scheduler: ComputeService, - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - val monitorJobs = mutableSetOf<Job>() - - // Monitor host events - for (host in scheduler.hosts) { - monitor.reportHostStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.reportHostStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ExperimentMetricExporter(monitor, clock, scheduler.hosts.associateBy { it.uid.toString() }), - exportInterval = 5 * 60 * 1000 /* Every 5 min (which is the granularity of the workload trace) */ - ) - - try { - block(this) - } finally { - monitorJobs.forEach(Job::cancel) - reader.close() - monitor.close() - } -} - -public class ComputeMetrics { - public var submittedVms: Int = 0 - public var queuedVms: Int = 0 - public var runningVms: Int = 0 - public var unscheduledVms: Int = 0 - public var finishedVms: Int = 0 -} - -/** - * Collect the metrics of the compute service. - */ -public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } - val res = ComputeMetrics() - try { - // Hack to extract metrics from OpenTelemetry SDK - res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - } catch (cause: Throwable) { - logger.warn(cause) { "Failed to collect metrics" } - } - return res -} - -/** - * Process the trace. - */ -public suspend fun processTrace( - clock: Clock, - reader: TraceReader<SimWorkload>, - scheduler: ComputeService, - chan: Channel<Unit>, - monitor: ExperimentMonitor -) { - val client = scheduler.newClient() - val image = client.newImage("vm-image") - var offset = Long.MIN_VALUE - try { - coroutineScope { - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } - - delay(max(0, (entry.start - offset) - clock.millis())) - launch { - chan.send(Unit) - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace) - val server = client.newServer( - entry.name, - image, - client.newFlavor( - entry.name, - entry.meta["cores"] as Int, - entry.meta["required-memory"] as Long - ), - meta = entry.meta + mapOf("workload" to workload) - ) - - suspendCancellableCoroutine { cont -> - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor.reportVmStateChange(clock.millis(), server, newState) - - if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { - cont.resume(Unit) - } - } - }) - } - } - } - } - - yield() - } finally { - reader.close() - client.close() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt deleted file mode 100644 index e1cf8517..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/HorVerPortfolio.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the difference between horizontal and vertical scaling. - */ -public class HorVerPortfolio : Portfolio("horizontal_vs_vertical") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("rep-vol-hor-hom"), - Topology("rep-vol-hor-het"), - Topology("rep-vol-ver-hom"), - Topology("rep-vol-ver-het"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-hor-het"), - Topology("exp-vol-ver-hom"), - Topology("exp-vol-ver-het") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt deleted file mode 100644 index a995e467..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreHpcPortfolio.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.SamplingStrategy -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] to explore the effect of HPC workloads. - */ -public class MoreHpcPortfolio : Portfolio("more_hpc") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("exp-vol-hor-hom"), - Topology("exp-vol-ver-hom"), - Topology("exp-vel-ver-hom") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), - Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC_LOAD), - Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC_LOAD) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt deleted file mode 100644 index 49559e0e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/MoreVelocityPortfolio.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of adding more velocity to a cluster (e.g., faster machines). - */ -public class MoreVelocityPortfolio : Portfolio("more_velocity") { - override val topology: Topology by anyOf( - Topology("base"), - Topology("rep-vel-ver-hom"), - Topology("rep-vel-ver-het"), - Topology("exp-vel-ver-hom"), - Topology("exp-vel-ver-het") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf( - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt deleted file mode 100644 index 1aac4f9e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/OperationalPhenomenaPortfolio.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that explores the effect of operational phenomena on metrics. - */ -public class OperationalPhenomenaPortfolio : Portfolio("operational_phenomena") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = true), - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false), - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "mem", - "mem-inv", - "core-mem", - "core-mem-inv", - "active-servers", - "active-servers-inv", - "random" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt deleted file mode 100644 index b969366c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel -import mu.KotlinLogging -import org.opendc.compute.service.scheduler.* -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.* -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.harness.dsl.Experiment -import org.opendc.harness.dsl.anyOf -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock -import java.io.File -import java.util.* -import java.util.concurrent.ConcurrentHashMap -import kotlin.random.asKotlinRandom - -/** - * A portfolio represents a collection of scenarios are tested for the work. - * - * @param name The name of the portfolio. - */ -public abstract class Portfolio(name: String) : Experiment(name) { - /** - * The logger for this portfolio instance. - */ - private val logger = KotlinLogging.logger {} - - /** - * The path to where the environments are located. - */ - private val environmentPath by anyOf(File("input/environments/")) - - /** - * The path to where the traces are located. - */ - private val tracePath by anyOf(File("input/traces/")) - - /** - * The path to where the output results should be written. - */ - private val outputPath by anyOf(File("output/")) - - /** - * The path to the original VM placements file. - */ - private val vmPlacements by anyOf(emptyMap<String, String>()) - - /** - * The path to the performance interference model. - */ - private val performanceInterferenceModel by anyOf<PerformanceInterferenceModelReader?>(null) - - /** - * The topology to test. - */ - public abstract val topology: Topology - - /** - * The workload to test. - */ - public abstract val workload: Workload - - /** - * The operational phenomenas to consider. - */ - public abstract val operationalPhenomena: OperationalPhenomena - - /** - * The allocation policies to consider. - */ - public abstract val allocationPolicy: String - - /** - * A map of trace readers. - */ - private val traceReaders = ConcurrentHashMap<String, Sc20RawParquetTraceReader>() - - /** - * Perform a single trial for this portfolio. - */ - @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val seeder = Random(repeat.toLong()) - val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) - - val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = createComputeScheduler(seeder) - - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - - val workload = workload - val workloadNames = if (workload is CompositeWorkload) { - workload.workloads.map { it.name } - } else { - listOf(workload.name) - } - - val rawReaders = workloadNames.map { workloadName -> - traceReaders.computeIfAbsent(workloadName) { - logger.info { "Loading trace $workloadName" } - Sc20RawParquetTraceReader(File(tracePath, workloadName)) - } - } - - val performanceInterferenceModel = performanceInterferenceModel - ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder.asKotlinRandom()) ?: emptyMap() - val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) - - val monitor = ParquetExperimentMonitor( - outputPath, - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - - withComputeService(clock, meterProvider, environment, allocationPolicy) { scheduler -> - val failureDomain = if (operationalPhenomena.failureFrequency > 0) { - logger.debug("ENABLING failures") - createFailureDomain( - this, - clock, - seeder.nextInt(), - operationalPhenomena.failureFrequency, - scheduler, - chan - ) - } else { - null - } - - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) - } - - failureDomain?.cancel() - } - - val monitorResults = collectMetrics(meterProvider as MetricProducer) - logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } - } - - /** - * Create the [ComputeScheduler] instance to use for the trial. - */ - private fun createComputeScheduler(seeder: Random): ComputeScheduler { - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(InstanceCountWeigher() to 1.0) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to 1.0) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt deleted file mode 100644 index b6d3b30c..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ReplayPortfolio.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] that compares the original VM placements against our policies. - */ -public class ReplayPortfolio : Portfolio("replay") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) - ) - - override val allocationPolicy: String by anyOf( - "replay", - "active-servers" - ) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt deleted file mode 100644 index 90840db8..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/TestPortfolio.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin - -import org.opendc.experiments.capelin.model.OperationalPhenomena -import org.opendc.experiments.capelin.model.Topology -import org.opendc.experiments.capelin.model.Workload -import org.opendc.harness.dsl.anyOf - -/** - * A [Portfolio] to perform a simple test run. - */ -public class TestPortfolio : Portfolio("test") { - override val topology: Topology by anyOf( - Topology("base") - ) - - override val workload: Workload by anyOf( - Workload("solvinity", 1.0) - ) - - override val operationalPhenomena: OperationalPhenomena by anyOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) - ) - - override val allocationPolicy: String by anyOf("active-servers") -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt deleted file mode 100644 index b53b3617..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/OperationalPhenomena.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.model - -/** - * Operation phenomena during experiments. - * - * @param failureFrequency The average time between failures in hours. - * @param hasInterference A flag to enable performance interference between VMs. - */ -public data class OperationalPhenomena(val failureFrequency: Double, val hasInterference: Boolean) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt deleted file mode 100644 index fe16a294..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Topology.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.model - -/** - * The topology topology on which we test the workload. - */ -public data class Topology(val name: String) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt deleted file mode 100644 index c4ddd158..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/model/Workload.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.model - -public enum class SamplingStrategy { - REGULAR, - HPC, - HPC_LOAD -} - -/** - * A workload that is considered for a scenario. - */ -public open class Workload( - public open val name: String, - public val fraction: Double, - public val samplingStrategy: SamplingStrategy = SamplingStrategy.REGULAR -) - -/** - * A workload that is composed of multiple workloads. - */ -public class CompositeWorkload(override val name: String, public val workloads: List<Workload>, public val totalLoad: Double) : - Workload(name, -1.0) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt deleted file mode 100644 index 5f8002e2..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricExporter -import org.opendc.compute.service.driver.Host -import java.time.Clock - -/** - * A [MetricExporter] that exports the metrics to the [ExperimentMonitor]. - */ -public class ExperimentMetricExporter( - private val monitor: ExperimentMonitor, - private val clock: Clock, - private val hosts: Map<String, Host> -) : MetricExporter { - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - reportHostMetrics(metricsByName) - reportProvisionerMetrics(metricsByName) - return CompletableResultCode.ofSuccess() - } - - private fun reportHostMetrics(metrics: Map<String, MetricData>) { - val hostMetrics = mutableMapOf<String, HostMetrics>() - hosts.mapValuesTo(hostMetrics) { HostMetrics() } - - mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> - m.cpuDemand = v - } - - mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> - m.cpuUsage = v - } - - mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v -> - m.powerDraw = v - } - - mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> - m.requestedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> - m.grantedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> - m.overcommissionedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> - m.interferedBurst = v.toLong() - } - - mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> - m.numberOfDeployedImages = v.toInt() - } - - for ((id, hostMetric) in hostMetrics) { - val host = hosts.getValue(id) - monitor.reportHostSlice( - clock.millis(), - hostMetric.requestedBurst, - hostMetric.grantedBurst, - hostMetric.overcommissionedBurst, - hostMetric.interferedBurst, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.powerDraw, - hostMetric.numberOfDeployedImages, - host - ) - } - } - - private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSummaryData?.points ?: emptyList() - for (point in points) { - val uid = point.labels["host"] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - // Take the average of the summary - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) - } - } - } - - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { - val points = data?.longSumData?.points ?: emptyList() - for (point in points) { - val uid = point.labels["host"] - val hostMetric = hostMetrics[uid] - - if (hostMetric != null) { - block(hostMetric, point.value) - } - } - } - - private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) { - val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - - monitor.reportProvisionerMetrics( - clock.millis(), - hosts, - availableHosts, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms - ) - } - - private class HostMetrics { - var requestedBurst: Long = 0 - var grantedBurst: Long = 0 - var overcommissionedBurst: Long = 0 - var interferedBurst: Long = 0 - var cpuUsage: Double = 0.0 - var cpuDemand: Double = 0.0 - var numberOfDeployedImages: Int = 0 - var powerDraw: Double = 0.0 - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt deleted file mode 100644 index 68631dee..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState - -/** - * A monitor watches the events of an experiment. - */ -public interface ExperimentMonitor : AutoCloseable { - /** - * This method is invoked when the state of a VM changes. - */ - public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - /** - * This method is invoked when the state of a host changes. - */ - public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {} - - /** - * This method is invoked for a host for each slice that is finishes. - */ - public fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - numberOfDeployedImages: Int, - host: Host - ) { - } - - /** - * This method is invoked for a provisioner event. - */ - public fun reportProvisionerMetrics( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) {} -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt deleted file mode 100644 index 983b4cff..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.monitor - -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState -import org.opendc.experiments.capelin.telemetry.HostEvent -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent -import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter -import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter -import java.io.File - -/** - * The logger instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * An [ExperimentMonitor] that logs the events to a Parquet file. - */ -public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { - private val hostWriter = ParquetHostEventWriter( - File(base, "host-metrics/$partition/data.parquet"), - bufferSize - ) - private val provisionerWriter = ParquetProvisionerEventWriter( - File(base, "provisioner-metrics/$partition/data.parquet"), - bufferSize - ) - - override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {} - - override fun reportHostStateChange(time: Long, host: Host, newState: HostState) { - logger.debug { "Host ${host.uid} changed state $newState [$time]" } - } - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - numberOfDeployedImages: Int, - host: Host - ) { - hostWriter.write( - HostEvent( - time, - 5 * 60 * 1000L, - host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, - cpuUsage, - cpuDemand, - powerDraw, - host.model.cpuCount - ) - ) - } - - override fun reportProvisionerMetrics( - time: Long, - totalHostCount: Int, - availableHostCount: Int, - totalVmCount: Int, - activeVmCount: Int, - inactiveVmCount: Int, - waitingVmCount: Int, - failedVmCount: Int - ) { - provisionerWriter.write( - ProvisionerEvent( - time, - totalHostCount, - availableHostCount, - totalVmCount, - activeVmCount, - inactiveVmCount, - waitingVmCount, - failedVmCount - ) - ) - } - - override fun close() { - hostWriter.close() - provisionerWriter.close() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt deleted file mode 100644 index c29e116e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/Event.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -/** - * An event that occurs within the system. - */ -public abstract class Event(public val name: String) { - /** - * The time of occurrence of this event. - */ - public abstract val timestamp: Long -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt deleted file mode 100644 index 899fc9b1..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.compute.service.driver.Host - -/** - * A periodic report of the host machine metrics. - */ -public data class HostEvent( - override val timestamp: Long, - public val duration: Long, - public val host: Host, - public val vmCount: Int, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val powerDraw: Double, - public val cores: Int -) : Event("host-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt deleted file mode 100644 index 539c9bc9..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/ProvisionerEvent.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -/** - * A periodic report of the provisioner's metrics. - */ -public data class ProvisionerEvent( - override val timestamp: Long, - public val totalHostCount: Int, - public val availableHostCount: Int, - public val totalVmCount: Int, - public val activeVmCount: Int, - public val inactiveVmCount: Int, - public val waitingVmCount: Int, - public val failedVmCount: Int -) : Event("provisioner-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt deleted file mode 100644 index 6c8fc941..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/RunEvent.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.experiments.capelin.Portfolio - -/** - * A periodic report of the host machine metrics. - */ -public data class RunEvent( - val portfolio: Portfolio, - val repeat: Int, - override val timestamp: Long -) : Event("run") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt deleted file mode 100644 index 7631f55f..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry - -import org.opendc.compute.api.Server - -/** - * A periodic report of a virtual machine's metrics. - */ -public data class VmEvent( - override val timestamp: Long, - public val duration: Long, - public val vm: Server, - public val host: Server, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val overcommissionedBurst: Long, - public val interferedBurst: Long, - public val cpuUsage: Double, - public val cpuDemand: Double -) : Event("vm-metrics") diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt deleted file mode 100644 index 38930ee5..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.experiments.capelin.telemetry.Event -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread - -/** - * The logging instance to use. - */ -private val logger = KotlinLogging.logger {} - -/** - * A writer that writes events in Parquet format. - */ -public open class ParquetEventWriter<in T : Event>( - private val path: File, - private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { - /** - * The writer to write the Parquet file. - */ - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - /** - * The queue of commands to process. - */ - private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) - - /** - * The thread that is responsible for writing the Parquet records. - */ - private val writerThread = thread(start = false, name = "parquet-writer") { run() } - - /** - * Write the specified metrics to the database. - */ - public fun write(event: T) { - queue.put(Action.Write(event)) - } - - /** - * Signal the writer to stop. - */ - public override fun close() { - queue.put(Action.Stop) - writerThread.join() - } - - init { - writerThread.start() - } - - /** - * Start the writer thread. - */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.event as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write<out T : Event>(val event: T) : Action() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt deleted file mode 100644 index c8fe1cb2..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.HostEvent -import java.io.File - -/** - * A Parquet event writer for [HostEvent]s. - */ -public class ParquetHostEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<HostEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "host-writer" - - public companion object { - private val convert: (HostEvent, GenericData.Record) -> Unit = { event, record -> - // record.put("portfolio_id", event.run.parent.parent.id) - // record.put("scenario_id", event.run.parent.id) - // record.put("run_id", event.run.id) - record.put("host_id", event.host.name) - record.put("state", event.host.state.name) - record.put("timestamp", event.timestamp) - record.put("duration", event.duration) - record.put("vm_count", event.vmCount) - record.put("requested_burst", event.requestedBurst) - record.put("granted_burst", event.grantedBurst) - record.put("overcommissioned_burst", event.overcommissionedBurst) - record.put("interfered_burst", event.interferedBurst) - record.put("cpu_usage", event.cpuUsage) - record.put("cpu_demand", event.cpuDemand) - record.put("power_draw", event.powerDraw) - record.put("cores", event.cores) - } - - private val schema: Schema = SchemaBuilder - .record("host_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - // .name("portfolio_id").type().intType().noDefault() - // .name("scenario_id").type().intType().noDefault() - // .name("run_id").type().intType().noDefault() - .name("timestamp").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("vm_count").type().intType().noDefault() - .name("requested_burst").type().longType().noDefault() - .name("granted_burst").type().longType().noDefault() - .name("overcommissioned_burst").type().longType().noDefault() - .name("interfered_burst").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("cores").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt deleted file mode 100644 index 8feff8d9..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.ProvisionerEvent -import java.io.File - -/** - * A Parquet event writer for [ProvisionerEvent]s. - */ -public class ParquetProvisionerEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<ProvisionerEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "provisioner-writer" - - public companion object { - private val convert: (ProvisionerEvent, GenericData.Record) -> Unit = { event, record -> - record.put("timestamp", event.timestamp) - record.put("host_total_count", event.totalHostCount) - record.put("host_available_count", event.availableHostCount) - record.put("vm_total_count", event.totalVmCount) - record.put("vm_active_count", event.activeVmCount) - record.put("vm_inactive_count", event.inactiveVmCount) - record.put("vm_waiting_count", event.waitingVmCount) - record.put("vm_failed_count", event.failedVmCount) - } - - private val schema: Schema = SchemaBuilder - .record("provisioner_metrics") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("vm_total_count").type().intType().noDefault() - .name("vm_active_count").type().intType().noDefault() - .name("vm_inactive_count").type().intType().noDefault() - .name("vm_waiting_count").type().intType().noDefault() - .name("vm_failed_count").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt deleted file mode 100644 index 946410eb..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetRunEventWriter.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.telemetry.parquet - -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.opendc.experiments.capelin.telemetry.RunEvent -import java.io.File - -/** - * A Parquet event writer for [RunEvent]s. - */ -public class ParquetRunEventWriter(path: File, bufferSize: Int) : - ParquetEventWriter<RunEvent>(path, schema, convert, bufferSize) { - - override fun toString(): String = "run-writer" - - public companion object { - private val convert: (RunEvent, GenericData.Record) -> Unit = { event, record -> - val portfolio = event.portfolio - record.put("portfolio_name", portfolio.name) - record.put("scenario_id", portfolio.id) - record.put("run_id", event.repeat) - record.put("topology", portfolio.topology.name) - record.put("workload_name", portfolio.workload.name) - record.put("workload_fraction", portfolio.workload.fraction) - record.put("workload_sampler", portfolio.workload.samplingStrategy) - record.put("allocation_policy", portfolio.allocationPolicy) - record.put("failure_frequency", portfolio.operationalPhenomena.failureFrequency) - record.put("interference", portfolio.operationalPhenomena.hasInterference) - record.put("seed", event.repeat) - } - - private val schema: Schema = SchemaBuilder - .record("runs") - .namespace("org.opendc.experiments.sc20") - .fields() - .name("portfolio_name").type().stringType().noDefault() - .name("scenario_id").type().intType().noDefault() - .name("run_id").type().intType().noDefault() - .name("topology").type().stringType().noDefault() - .name("workload_name").type().stringType().noDefault() - .name("workload_fraction").type().doubleType().noDefault() - .name("workload_sampler").type().stringType().noDefault() - .name("allocation_policy").type().stringType().noDefault() - .name("failure_frequency").type().doubleType().noDefault() - .name("interference").type().booleanType().noDefault() - .name("seed").type().intType().noDefault() - .endRecord() - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt deleted file mode 100644 index a8462a51..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.TreeSet - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param reader The internal trace reader to use. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - * @param run The run to which this reader belongs. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20ParquetTraceReader( - rawReaders: List<Sc20RawParquetTraceReader>, - performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, - workload: Workload, - seed: Int -) : TraceReader<SimWorkload> { - /** - * The iterator over the actual trace. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> = - rawReaders - .map { it.read() } - .run { - if (workload is CompositeWorkload) { - this.zip(workload.workloads) - } else { - this.zip(listOf(workload)) - } - } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() - .run { - // Apply performance interference model - if (performanceInterferenceModel.isEmpty()) - this - else { - map { entry -> - val id = entry.name - val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) - - entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems)) - } - } - } - .iterator() - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() {} -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt deleted file mode 100644 index ffbf46d4..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.util.UUID - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param path The directory of the traces. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20RawParquetTraceReader(private val path: File) { - /** - * Read the fragments into memory. - */ - private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) - .disableCompatibility() - .build() - - val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() - - return try { - while (true) { - val record = reader.read() ?: break - - val id = record["id"].toString() - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - fragments.getOrPut(id) { mutableListOf() }.add(fragment) - } - - fragments - } finally { - reader.close() - } - } - - /** - * Read the metadata into a workload. - */ - private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) - .disableCompatibility() - .build() - - var counter = 0 - val entries = mutableListOf<TraceEntry<SimWorkload>>() - - return try { - while (true) { - val record = metaReader.read() ?: break - - val id = record["id"].toString() - if (!fragments.containsKey(id)) { - continue - } - - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs - val workload = SimTraceWorkload(vmFragments) - entries.add( - TraceEntry( - uid, id, submissionTime, workload, - mapOf( - "submit-time" to submissionTime, - "end-time" to endTime, - "total-load" to totalLoad, - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - ) - } - - entries - } catch (e: Exception) { - e.printStackTrace() - throw e - } finally { - metaReader.close() - } - } - - /** - * The entries in the trace. - */ - private val entries: List<TraceEntry<SimWorkload>> - - init { - val fragments = parseFragments(path) - entries = parseMeta(path, fragments) - } - - /** - * Read the entries in the trace. - */ - public fun read(): List<TraceEntry<SimWorkload>> = entries - - /** - * Create a [TraceReader] instance. - */ - public fun createReader(): TraceReader<SimWorkload> { - return object : TraceReader<SimWorkload>, Iterator<TraceEntry<SimWorkload>> by entries.iterator() { - override fun close() {} - } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt deleted file mode 100644 index c5294b55..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.filter2.predicate.Statistics -import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.io.api.Binary -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * A [TraceReader] for the internal VM workload trace format that streams workloads on the fly. - * - * @param traceFile The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -@OptIn(ExperimentalStdlibApi::class) -public class Sc20StreamingParquetTraceReader( - traceFile: File, - performanceInterferenceModel: PerformanceInterferenceModel? = null, - selectedVms: List<String> = emptyList(), - random: Random -) : TraceReader<SimWorkload> { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator<TraceEntry<SimWorkload>> - - /** - * The intermediate buffer to store the read records in. - */ - private val queue = ArrayBlockingQueue<Pair<String, SimTraceWorkload.Fragment>>(1024) - - /** - * An optional filter for filtering the selected VMs - */ - private val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get( - FilterApi.userDefined( - FilterApi.binaryColumn("id"), - SelectedVmFilter( - TreeSet(selectedVms) - ) - ) - ) - - /** - * A poisonous fragment. - */ - private val poison = Pair("\u0000", SimTraceWorkload.Fragment(0, 0.0, 0)) - - /** - * The thread to read the records in. - */ - private val readerThread = thread(start = true, name = "sc20-reader") { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - try { - while (true) { - val record = reader.read() - - if (record == null) { - queue.put(poison) - break - } - - val id = record["id"].toString() - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - - val fragment = SimTraceWorkload.Fragment( - duration, - cpuUsage, - cores - ) - - queue.put(id to fragment) - } - } catch (e: InterruptedException) { - // Do not rethrow this - } finally { - reader.close() - } - } - - /** - * Fill the buffers with the VMs - */ - private fun pull(buffers: Map<String, List<MutableList<SimTraceWorkload.Fragment>>>) { - if (!hasNext) { - return - } - - val fragments = mutableListOf<Pair<String, SimTraceWorkload.Fragment>>() - queue.drainTo(fragments) - - for ((id, fragment) in fragments) { - if (id == poison.first) { - hasNext = false - return - } - buffers[id]?.forEach { it.add(fragment) } - } - } - - /** - * A flag to indicate whether the reader has more entries. - */ - private var hasNext: Boolean = true - - /** - * Initialize the reader. - */ - init { - val takenIds = mutableSetOf<UUID>() - val entries = mutableMapOf<String, GenericData.Record>() - val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() - - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - - while (true) { - val record = metaReader.read() ?: break - val id = record["id"].toString() - entries[id] = record - } - - metaReader.close() - - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms - - // Create the entry iterator - iterator = selection.asSequence() - .mapNotNull { entries[it] } - .mapIndexed { index, record -> - val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) - - assert(uid !in takenIds) - takenIds += uid - - logger.info("Processing VM $id") - - val internalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - val externalBuffer = mutableListOf<SimTraceWorkload.Fragment>() - buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) - val fragments = sequence { - var time = submissionTime - repeat@ while (true) { - if (externalBuffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break - } - } - - internalBuffer.addAll(externalBuffer) - externalBuffer.clear() - - for (fragment in internalBuffer) { - yield(fragment) - - time += fragment.duration - if (time >= endTime) { - break@repeat - } - } - - internalBuffer.clear() - } - - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - if (performanceInterferenceModel != null) - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(), - Random(random.nextInt()) - ) - else - null - val workload = SimTraceWorkload(fragments) - val meta = mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - - TraceEntry( - uid, id, submissionTime, workload, - if (performanceInterferenceModel != null) - meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems as Any) - else - meta - ) - } - .sortedBy { it.start } - .toList() - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry<SimWorkload> = iterator.next() - - override fun close() { - readerThread.interrupt() - } - - private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { - override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) - - override fun canDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() - } - - override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean { - val min = statistics.min - val max = statistics.max - - return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() - } - } -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt deleted file mode 100644 index 7713c06f..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.long -import me.tongfei.progressbar.ProgressBar -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.sc20.Sc20VmPlacementReader -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min - -/** - * Represents the command for converting traces - */ -public class TraceConverterCli : CliktCommand(name = "trace-converter") { - /** - * The directory where the trace should be stored. - */ - private val outputPath by option("-O", "--output", help = "path to store the trace") - .file(canBeFile = false, mustExist = false) - .defaultLazy { File("output") } - - /** - * The directory where the input trace is located. - */ - private val inputPath by argument("input", help = "path to the input trace") - .file(canBeFile = false) - - /** - * The input type of the trace. - */ - private val type by option("-t", "--type", help = "input type of trace").groupChoice( - "solvinity" to SolvinityConversion(), - "bitbrains" to BitbrainsConversion(), - "azure" to AzureConversion() - ) - - override fun run() { - val metaSchema = SchemaBuilder - .record("meta") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val metaParquet = File(outputPath, "meta.parquet") - val traceParquet = File(outputPath, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI())) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI())) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - try { - val type = type ?: throw IllegalArgumentException("Invalid trace conversion") - val allFragments = type.read(inputPath, metaSchema, metaWriter) - allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) - - writer.write(record) - } - } finally { - writer.close() - metaWriter.close() - } - } -} - -/** - * The supported trace conversions. - */ -public sealed class TraceConversion(name: String) : OptionGroup(name) { - /** - * Read the fragments of the trace. - */ - public abstract fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> -} - -public class SolvinityConversion : TraceConversion("Solvinity") { - private val clusters by option() - .split(",") - - private val vmPlacements by option("--vm-placements", help = "file containing the VM placements") - .file(canBeDir = false) - .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } - .required() - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> { - val clusters = clusters?.toSet() ?: emptySet() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach file@{ vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@file - } - } - } - } - - println("Start of trace at $minTimestamp") - - val allFragments = mutableListOf<Fragment>() - - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split("\t") - - vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val timestamp = - (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue - } - - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - } - - return allFragments - } -} - -/** - * Conversion of the Bitbrains public trace. - */ -public class BitbrainsConversion : TraceConversion("Bitbrains") { - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf<Fragment>() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - - var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -/** - * Conversion of the Azure public VM trace. - */ -public class AzureConversion : TraceConversion("Azure") { - private val seed by option(help = "seed for trace sampling") - .long() - .default(0) - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter<GenericData.Record> - ): MutableList<Fragment> { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf<String>() - val vmIdToMetadata = mutableMapOf<String, VmInfo>() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } - } - } - - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>() - val vmIdToLastFragment = mutableMapOf<String, Fragment?>() - val allFragments = mutableListOf<Fragment>() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } - - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) - } - fragment - } - } - } - } - } - - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() - } - vmIdToFragments[entry.key]!!.add(entry.value!!) - } - } - - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments - } -} - -public data class Fragment( - public val id: String, - public val tick: Long, - public val flops: Long, - public val duration: Long, - public val usage: Double, - public val cores: Int -) - -public class VmInfo(public val cores: Int, public val requiredMemory: Long, public var minTime: Long, public var maxTime: Long) - -/** - * A script to convert a trace in text format into a Parquet trace. - */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt deleted file mode 100644 index 5c8727ea..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.experiments.capelin.trace - -import mu.KotlinLogging -import org.opendc.experiments.capelin.model.CompositeWorkload -import org.opendc.experiments.capelin.model.SamplingStrategy -import org.opendc.experiments.capelin.model.Workload -import org.opendc.format.trace.TraceEntry -import org.opendc.simulator.compute.workload.SimWorkload -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Sample the workload for the specified [run]. - */ -public fun sampleWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - return when { - workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed) - workload.samplingStrategy == SamplingStrategy.HPC -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = false) - workload.samplingStrategy == SamplingStrategy.HPC_LOAD -> - sampleHpcWorkload(trace, workload, seed, sampleOnLoad = true) - else -> - sampleRegularWorkload(trace, workload, workload, seed) - } -} - -/** - * Sample a regular (non-HPC) workload. - */ -public fun sampleRegularWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - subWorkload: Workload, - seed: Int -): List<TraceEntry<SimWorkload>> { - val fraction = subWorkload.fraction - - val shuffled = trace.shuffled(Random(seed)) - val res = mutableListOf<TraceEntry<SimWorkload>>() - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - shuffled.sumByDouble { it.meta.getValue("total-load") as Double } - } - var currentLoad = 0.0 - - for (entry in shuffled) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - currentLoad += entryLoad - res += entry - } - - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a HPC workload. - */ -public fun sampleHpcWorkload( - trace: List<TraceEntry<SimWorkload>>, - workload: Workload, - seed: Int, - sampleOnLoad: Boolean -): List<TraceEntry<SimWorkload>> { - val pattern = Regex("^vm__workload__(ComputeNode|cn).*") - val random = Random(seed) - - val fraction = workload.fraction - val (hpc, nonHpc) = trace.partition { entry -> - val name = entry.name - name.matches(pattern) - } - - val hpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - hpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - val nonHpcSequence = generateSequence(0) { it + 1 } - .map { index -> - val res = mutableListOf<TraceEntry<SimWorkload>>() - nonHpc.mapTo(res) { sample(it, index) } - res.shuffle(random) - res - } - .flatten() - - logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } - - val totalLoad = if (workload is CompositeWorkload) { - workload.totalLoad - } else { - trace.sumByDouble { it.meta.getValue("total-load") as Double } - } - - logger.debug { "Total trace load: $totalLoad" } - var hpcCount = 0 - var hpcLoad = 0.0 - var nonHpcCount = 0 - var nonHpcLoad = 0.0 - - val res = mutableListOf<TraceEntry<SimWorkload>>() - - if (sampleOnLoad) { - var currentLoad = 0.0 - for (entry in hpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction) { - break - } - - hpcLoad += entryLoad - hpcCount += 1 - currentLoad += entryLoad - res += entry - } - - for (entry in nonHpcSequence) { - val entryLoad = entry.meta.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1) { - break - } - - nonHpcLoad += entryLoad - nonHpcCount += 1 - currentLoad += entryLoad - res += entry - } - } else { - hpcSequence - .take((fraction * trace.size).toInt()) - .forEach { entry -> - hpcLoad += entry.meta.getValue("total-load") as Double - hpcCount += 1 - res.add(entry) - } - - nonHpcSequence - .take(((1 - fraction) * trace.size).toInt()) - .forEach { entry -> - nonHpcLoad += entry.meta.getValue("total-load") as Double - nonHpcCount += 1 - res.add(entry) - } - } - - logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } - logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } - logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - - return res -} - -/** - * Sample a random trace entry. - */ -private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> { - val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) - return entry.copy(uid = uid) -} diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml deleted file mode 100644 index d1c01b8e..00000000 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/resources/log4j2.xml +++ /dev/null @@ -1,49 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ MIT License - ~ - ~ Copyright (c) 2020 atlarge-research - ~ - ~ Permission is hereby granted, free of charge, to any person obtaining a copy - ~ of this software and associated documentation files (the "Software"), to deal - ~ in the Software without restriction, including without limitation the rights - ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - ~ copies of the Software, and to permit persons to whom the Software is - ~ furnished to do so, subject to the following conditions: - ~ - ~ The above copyright notice and this permission notice shall be included in all - ~ copies or substantial portions of the Software. - ~ - ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - ~ SOFTWARE. - --> - -<Configuration status="WARN"> - <Appenders> - <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/> - </Console> - </Appenders> - <Loggers> - <Logger name="org.opendc" level="debug" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="org.opendc.experiments.capelin" level="info" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="org.opendc.experiments.capelin.trace" level="debug" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="org.apache.hadoop" level="warn" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Root level="error"> - <AppenderRef ref="Console"/> - </Root> - </Loggers> -</Configuration> |
