diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-07-20 12:44:04 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 19:48:12 +0200 |
| commit | 53e60ccf0636e0076837d66a7dbea527e3b6e98d (patch) | |
| tree | 041f83ae919d7ee9b5691a1666dbb61af26967d0 /simulator/opendc | |
| parent | d8479e7e3744b8d1d31ac4d9f972e560eacd2cf8 (diff) | |
| parent | 2a5f50e591f5e9c1da5db2f2011c779a88121675 (diff) | |
Merge pull request #9 from atlarge-research/feat/opendc-node
Add simulator integration
Diffstat (limited to 'simulator/opendc')
44 files changed, 963 insertions, 106 deletions
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index 01968cd8..fd0fc836 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import kotlinx.coroutines.flow.Flow import java.util.UUID +import kotlinx.coroutines.flow.Flow /** * A server instance that is running on some physical or virtual machine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 7cb4c0c5..cb637aea 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import kotlinx.coroutines.flow.Flow import java.util.UUID +import kotlinx.coroutines.flow.Flow /** * A bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 41cec291..17d8ee53 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey -import kotlinx.coroutines.flow.Flow import java.util.UUID +import kotlinx.coroutines.flow.Flow /** * A driver interface for the management interface of a bare-metal compute node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 6a77415c..a453e459 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -28,10 +28,10 @@ import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext @@ -46,6 +46,14 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry +import java.lang.Exception +import java.time.Clock +import java.util.UUID +import kotlin.coroutines.ContinuationInterceptor +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle @@ -59,15 +67,7 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min import kotlinx.coroutines.withContext -import java.lang.Exception -import java.time.Clock -import kotlin.coroutines.ContinuationInterceptor -import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt index 69b0124d..1e7e351f 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.core.Identity -import kotlinx.coroutines.flow.Flow import java.util.UUID +import kotlinx.coroutines.flow.Flow /** * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index bd395f0d..607759a8 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.core.resource.TagContainer +import java.util.UUID import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine -import java.util.UUID /** * A hypervisor managing the VMs of a node. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 3c41f52e..192db413 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -35,11 +35,15 @@ import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DisposableHandle @@ -55,10 +59,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 1002d382..b1844f67 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey -import kotlinx.coroutines.flow.Flow import java.util.UUID +import kotlinx.coroutines.flow.Flow /** * A driver interface for a hypervisor running on some host server and communicating with the central compute service to diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index ff4aa3d7..79388bc3 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -11,11 +11,14 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent -import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.HypervisorImage import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException +import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.math.max import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job @@ -27,9 +30,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import mu.KotlinLogging -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.math.max private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt index 417db77d..1c7b751c 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.compute.core.image +import java.util.UUID import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import java.util.UUID /** * Test suite for [FlopsApplicationImage] diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 071c0626..af9d3421 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -31,6 +31,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage +import java.util.ServiceLoader +import java.util.UUID import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -39,8 +41,6 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import java.util.ServiceLoader -import java.util.UUID internal class SimpleBareMetalDriverTest { /** diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index f8bd786e..ed2256c0 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -29,13 +29,13 @@ import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import java.util.ServiceLoader +import java.util.UUID import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test -import java.util.ServiceLoader -import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index ca00fc94..622b185e 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -25,14 +25,16 @@ package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver +import java.util.ServiceLoader +import java.util.UUID import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn @@ -43,8 +45,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import java.util.ServiceLoader -import java.util.UUID /** * Basic test-suite for the hypervisor. diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt index 50261db5..f77a581e 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -26,14 +26,14 @@ package com.atlarge.opendc.core.failure import com.atlarge.odcsim.Domain import com.atlarge.odcsim.simulationContext -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.launch import kotlin.math.exp import kotlin.math.max import kotlin.random.Random import kotlin.random.asJavaRandom +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.launch /** * A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 1b896858..0f62667f 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -25,11 +25,11 @@ package com.atlarge.opendc.core.failure import com.atlarge.odcsim.simulationContext -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlin.math.ln1p import kotlin.math.pow import kotlin.random.Random +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch /** * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index b0182ab3..7659b18e 100644 --- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -38,6 +38,8 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import java.io.File +import java.util.ServiceLoader import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel @@ -46,8 +48,6 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import java.io.File -import java.util.ServiceLoader /** * Main entry point of the experiment. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 677af381..faa68e34 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -48,9 +48,9 @@ import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.choice import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.int -import mu.KotlinLogging import java.io.File import java.io.InputStream +import mu.KotlinLogging /** * The logger for this experiment. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index a70297d2..b09c0dbb 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -45,19 +45,20 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import java.io.File +import kotlin.math.ln +import kotlin.math.max +import kotlin.random.Random import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import mu.KotlinLogging -import java.io.File -import kotlin.math.ln -import kotlin.math.max -import kotlin.random.Random /** * The logger for this experiment. @@ -209,7 +210,6 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP try { var submitted = 0 - val finished = Channel<Unit>(Channel.CONFLATED) while (reader.hasNext()) { val (time, workload) = reader.next() @@ -228,17 +228,20 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP if (it is ServerEvent.StateChanged) { monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) } - - delay(1) - finished.send(Unit) } .collect() } } - while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) { - finished.receive() - } + scheduler.events + .takeWhile { + when (it) { + is VirtProvisioningEvent.MetricsAvailable -> + it.inactiveVmCount + it.failedVmCount != submitted + } + } + .collect() + delay(1) } finally { reader.close() } diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 5d1c29e2..1580e4dd 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import java.io.File +import java.util.ServiceLoader +import kotlin.random.Random import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import mu.KotlinLogging -import java.io.File -import java.util.ServiceLoader -import kotlin.random.Random /** * The logger for the experiment scenario. @@ -106,7 +106,11 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) ?.construct(seeder) ?: emptyMap() val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed) - val monitor = ParquetExperimentMonitor(this) + val monitor = ParquetExperimentMonitor( + parent.parent.parent.output, + "portfolio_id=${parent.parent.id}/scenario_id=${parent.id}/run_id=$id", + parent.parent.parent.bufferSize + ) root.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index be60e5b7..b931fef9 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -27,13 +27,12 @@ package com.atlarge.opendc.experiments.sc20.experiment.monitor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent -import com.atlarge.opendc.experiments.sc20.experiment.Run import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter -import mu.KotlinLogging import java.io.File +import mu.KotlinLogging /** * The logger instance to use. @@ -43,15 +42,14 @@ private val logger = KotlinLogging.logger {} /** * An [ExperimentMonitor] that logs the events to a Parquet file. */ -class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { - private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}" +class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor { private val hostWriter = ParquetHostEventWriter( - File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"), - run.parent.parent.parent.bufferSize + File(base, "host-metrics/$partition/data.parquet"), + bufferSize ) private val provisionerWriter = ParquetProvisionerEventWriter( - File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"), - run.parent.parent.parent.bufferSize + File(base, "provisioner-metrics/$partition/data.parquet"), + bufferSize ) private val currentHostEvent = mutableMapOf<Server, HostEvent>() private var startTime = -1L diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt index 31632b8c..a7c8ba4d 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt @@ -25,12 +25,12 @@ package com.atlarge.opendc.experiments.sc20.runner.execution import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor +import java.util.concurrent.Executors import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.withContext -import java.util.concurrent.Executors /** * An [ExperimentScheduler] that runs experiments using a local thread pool. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt index 3b80276f..28a19172 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt @@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler -import kotlinx.coroutines.runBlocking import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.runBlocking /** * The default implementation of the [ExperimentRunner] interface. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt index a69bd4b2..e42ac654 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt @@ -25,17 +25,17 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.Event +import java.io.Closeable +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.io.Closeable -import java.io.File -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.BlockingQueue -import kotlin.concurrent.thread /** * The logging instance to use. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt index 3bc09435..9fa4e0fb 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent +import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import java.io.File /** * A Parquet event writer for [HostEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt index 1f3b0472..3d28860c 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent +import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import java.io.File /** * A Parquet event writer for [ProvisionerEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt index 1549b8d2..c1724369 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt @@ -25,10 +25,10 @@ package com.atlarge.opendc.experiments.sc20.telemetry.parquet import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent +import java.io.File import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import java.io.File /** * A Parquet event writer for [RunEvent]s. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 652f7746..f9709b9f 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader +import java.io.File +import java.util.UUID import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader -import java.io.File -import java.util.UUID private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt index f6d6e6fd..8b7b222f 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt @@ -32,6 +32,14 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random import mu.KotlinLogging import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path @@ -41,14 +49,6 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary -import java.io.File -import java.io.Serializable -import java.util.SortedSet -import java.util.TreeSet -import java.util.UUID -import java.util.concurrent.ArrayBlockingQueue -import kotlin.concurrent.thread -import kotlin.random.Random private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 0877ad52..d6726910 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -25,6 +25,12 @@ package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import java.util.Random +import kotlin.math.max +import kotlin.math.min import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -33,12 +39,6 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.Random -import kotlin.math.max -import kotlin.math.min /** * A script to convert a trace in text format into a Parquet trace. diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index dd70d4f1..f2a0e627 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry -import mu.KotlinLogging import kotlin.random.Random +import mu.KotlinLogging private val logger = KotlinLogging.logger {} diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 5ecf7605..a79e9a5a 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -42,6 +42,8 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import java.io.File +import java.util.ServiceLoader import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -52,8 +54,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import java.io.File -import java.util.ServiceLoader /** * An integration test suite for the SC20 experiments. diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 5f220ad0..a9aa3337 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -25,9 +25,9 @@ package com.atlarge.opendc.format.environment.sc18 import com.atlarge.odcsim.Domain -import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 2a8fefeb..1cabc8bc 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -26,10 +26,10 @@ package com.atlarge.opendc.format.trace.bitbrains import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.core.User import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 076274d5..8e34505a 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -26,10 +26,10 @@ package com.atlarge.opendc.format.trace.sc20 import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage -import com.atlarge.opendc.compute.core.workload.VmWorkload -import com.atlarge.opendc.core.User import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt index 41ad8aba..94e4b0fc 100644 --- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt +++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt @@ -1,8 +1,8 @@ package com.atlarge.opendc.format.trace.swf +import java.io.File import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import java.io.File class SwfTraceReaderTest { @Test diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts new file mode 100644 index 00000000..6f725de1 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/build.gradle.kts @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiment runner for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-convention` + application +} + +application { + mainClassName = "com.atlarge.opendc.runner.web.MainKt" +} + +dependencies { + api(project(":opendc:opendc-core")) + implementation(project(":opendc:opendc-compute")) + implementation(project(":opendc:opendc-format")) + implementation(project(":opendc:opendc-experiments-sc20")) + + implementation("com.github.ajalt:clikt:2.8.0") + implementation("io.github.microutils:kotlin-logging:1.7.10") + + implementation("org.mongodb:mongodb-driver-sync:4.0.5") + implementation("org.apache.spark:spark-sql_2.12:3.0.0") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } + + runtimeOnly(project(":odcsim:odcsim-engine-omega")) + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") + runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt new file mode 100644 index 00000000..0ff9b870 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -0,0 +1,338 @@ +package com.atlarge.opendc.runner.web + +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.virt.service.allocation.* +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.mongodb.MongoClientSettings +import com.mongodb.MongoCredential +import com.mongodb.ServerAddress +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoCollection +import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.Filters +import java.io.File +import java.util.* +import kotlin.random.Random +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import mu.KotlinLogging +import org.bson.Document + +private val logger = KotlinLogging.logger {} + +/** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + +/** + * Represents the CLI command for starting the OpenDC web runner. + */ +class RunnerCli : CliktCommand(name = "runner") { + /** + * The name of the database to use. + */ + private val mongoDb by option( + "--mongo-db", + help = "name of the database to use", + envvar = "OPENDC_DB" + ) + .default("opendc") + + /** + * The database host to connect to. + */ + private val mongoHost by option( + "--mongo-host", + help = "database host to connect to", + envvar = "OPENDC_DB_HOST" + ) + .default("localhost") + + /** + * The database port to connect to. + */ + private val mongoPort by option( + "--mongo-port", + help = "database port to connect to", + envvar = "OPENDC_DB_PORT" + ) + .int() + .default(27017) + + /** + * The database user to connect with. + */ + private val mongoUser by option( + "--mongo-user", + help = "database user to connect with", + envvar = "OPENDC_DB_USER" + ) + .default("opendc") + + /** + * The database password to connect with. + */ + private val mongoPassword by option( + "--mongo-password", + help = "database password to connect with", + envvar = "OPENDC_DB_PASSWORD" + ) + .convert { it.toCharArray() } + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The path to the output directory. + */ + private val outputPath by option( + "--output", + help = "path to the results directory", + envvar = "OPENDC_OUTPUT" + ) + .file(canBeFile = false) + .defaultLazy { File("results/") } + + /** + * The Spark master to connect to. + */ + private val spark by option( + "--spark", + help = "Spark master to connect to", + envvar = "OPENDC_SPARK" + ) + .default("local[*]") + + /** + * Connect to the user-specified database. + */ + private fun createDatabase(): MongoDatabase { + val credential = MongoCredential.createScramSha1Credential( + mongoUser, + mongoDb, + mongoPassword + ) + + val settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } + .build() + val client = MongoClients.create(settings) + return client.getDatabase(mongoDb) + } + + /** + * Run a single scenario. + */ + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { + val id = scenario.getString("_id") + + logger.info { "Constructing performance interference model" } + + val traceDir = File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) + ) + val traceReader = Sc20RawParquetTraceReader(traceDir) + val performanceInterferenceReader = let { + val path = File(traceDir, "performance-interference-model.json") + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") + + if (!enabled || !path.exists()) { + return@let null + } + + path.inputStream().use { Sc20PerformanceInterferenceReader(it) } + } + + val targets = portfolio.get("targets", Document::class.java) + + repeat(targets.getInteger("repeatsPerScenario")) { + logger.info { "Starting repeat $it" } + runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) + } + + logger.info { "Finished simulation for scenario $id" } + } + + /** + * Run a single repeat. + */ + private suspend fun runRepeat( + scenario: Document, + repeat: Int, + topologies: MongoCollection<Document>, + traceReader: Sc20RawParquetTraceReader, + performanceInterferenceReader: Sc20PerformanceInterferenceReader? + ) { + val id = scenario.getString("_id") + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val system = provider("experiment-$id") + val root = system.newDomain("root") + + val chan = Channel<Unit>(Channel.CONFLATED) + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val environment = TopologyParser(topologies, topologyId) + val monitor = ParquetExperimentMonitor( + outputPath, + "scenario_id=$id/run_id=$repeat", + 4096 + ) + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + root, + environment, + allocationPolicy + ) + + val failureDomain = if (operational.getBoolean("failuresEnabled")) { + logger.debug("ENABLING failures") + createFailureDomain( + seeder.nextInt(), + operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace( + trace, + scheduler, + chan, + monitor, + emptyMap() + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + system.run() + } finally { + system.terminate() + monitor.close() + } + } + + override fun run() = runBlocking(Dispatchers.Default) { + logger.info { "Starting OpenDC web runner" } + logger.info { "Connecting to MongoDB instance" } + val database = createDatabase() + val manager = ScenarioManager(database.getCollection("scenarios")) + val portfolios = database.getCollection("portfolios") + val topologies = database.getCollection("topologies") + + logger.info { "Launching Spark" } + val resultProcessor = ResultProcessor(spark, outputPath) + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(5000) + continue + } + + val id = scenario.getString("_id") + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + launch { + delay(60000) + manager.heartbeat(id) + } + + try { + val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + runScenario(portfolio, scenario, topologies) + + logger.info { "Starting result processing" } + + val result = resultProcessor.process(id) + manager.finish(id, result) + + logger.info { "Successfully finished scenario $id" } + } catch (e: Exception) { + logger.warn(e) { "Scenario failed to finish" } + manager.fail(id) + } + } + } + } +} + +/** + * Main entry point of the runner. + */ +fun main(args: Array<String>) = RunnerCli().main(args) diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt new file mode 100644 index 00000000..39092653 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -0,0 +1,187 @@ +package com.atlarge.opendc.runner.web + +import java.io.File +import org.apache.spark.sql.Column +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.* + +/** + * A helper class for processing the experiment results using Apache Spark. + */ +class ResultProcessor(private val master: String, private val outputPath: File) { + /** + * Process the results of the scenario with the given [id]. + */ + fun process(id: String): Result { + val spark = SparkSession.builder() + .master(master) + .appName("opendc-simulator-$id") + .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver + .orCreate + + try { + val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) + val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) + val res = aggregate(hostMetrics, provisionerMetrics).first() + + return Result( + res.getList<Long>(1), + res.getList<Long>(2), + res.getList<Long>(3), + res.getList<Long>(4), + res.getList<Double>(5), + res.getList<Double>(6), + res.getList<Double>(7), + res.getList<Int>(8), + res.getList<Long>(9), + res.getList<Long>(10), + res.getList<Long>(11), + res.getList<Int>(12), + res.getList<Int>(13), + res.getList<Int>(14), + res.getList<Int>(15) + ) + } finally { + spark.close() + } + } + + data class Result( + val totalRequestedBurst: List<Long>, + val totalGrantedBurst: List<Long>, + val totalOvercommittedBurst: List<Long>, + val totalInterferedBurst: List<Long>, + val meanCpuUsage: List<Double>, + val meanCpuDemand: List<Double>, + val meanNumDeployedImages: List<Double>, + val maxNumDeployedImages: List<Int>, + val totalPowerDraw: List<Long>, + val totalFailureSlices: List<Long>, + val totalFailureVmSlices: List<Long>, + val totalVmsSubmitted: List<Int>, + val totalVmsQueued: List<Int>, + val totalVmsFinished: List<Int>, + val totalVmsFailed: List<Int> + ) + + /** + * Perform aggregation of the experiment results. + */ + private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> { + // Extrapolate the duration of the entries to span the entire trace + val hostMetricsExtra = hostMetrics + .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) + .withColumn("power_draw", col("power_draw") * col("slice_counts")) + .withColumn("state_int", states[col("state")]) + .withColumn("state_opposite_int", oppositeStates[col("state")]) + .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) + .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) + .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) + .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) + + // Process all data in a single run + val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") + + // Aggregate the summed total metrics + val systemMetrics = hostMetricsGrouped.agg( + sum("requested_burst").alias("total_requested_burst"), + sum("granted_burst").alias("total_granted_burst"), + sum("overcommissioned_burst").alias("total_overcommitted_burst"), + sum("interfered_burst").alias("total_interfered_burst"), + sum("power_draw").alias("total_power_draw"), + sum("failure_slice_count").alias("total_failure_slices"), + sum("failure_vm_slice_count").alias("total_failure_vm_slices") + ) + + // Aggregate metrics per host + val hvMetrics = hostMetrics + .groupBy("run_id", "host_id") + .agg( + sum("cpu_usage").alias("mean_cpu_usage"), + sum("cpu_demand").alias("mean_cpu_demand"), + avg("vm_count").alias("mean_num_deployed_images"), + count(lit(1)).alias("num_rows") + ) + .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) + .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) + .groupBy("run_id") + .agg( + avg("mean_cpu_usage").alias("mean_cpu_usage"), + avg("mean_cpu_demand").alias("mean_cpu_demand"), + avg("mean_num_deployed_images").alias("mean_num_deployed_images"), + max("mean_num_deployed_images").alias("max_num_deployed_images") + ) + + // Group the provisioner metrics per run + val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") + + // Aggregate the provisioner metrics + val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( + max("vm_total_count").alias("total_vms_submitted"), + max("vm_waiting_count").alias("total_vms_queued"), + max("vm_active_count").alias("total_vms_running"), + max("vm_inactive_count").alias("total_vms_finished"), + max("vm_failed_count").alias("total_vms_failed") + ) + + // Join the results into a single data frame + return systemMetrics + .join(hvMetrics, "run_id") + .join(provisionerMetricsAggregated, "run_id") + .select( + col("total_requested_burst"), + col("total_granted_burst"), + col("total_overcommitted_burst"), + col("total_interfered_burst"), + col("mean_cpu_usage"), + col("mean_cpu_demand"), + col("mean_num_deployed_images"), + col("max_num_deployed_images"), + col("total_power_draw"), + col("total_failure_slices"), + col("total_failure_vm_slices"), + col("total_vms_submitted"), + col("total_vms_queued"), + col("total_vms_finished"), + col("total_vms_failed") + ) + .groupBy(lit(1)) + .agg( + // TODO Check if order of values is correct + collect_list(col("total_requested_burst")).alias("total_requested_burst"), + collect_list(col("total_granted_burst")).alias("total_granted_burst"), + collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), + collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), + collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), + collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), + collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), + collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), + collect_list(col("total_power_draw")).alias("total_power_draw"), + collect_list(col("total_failure_slices")).alias("total_failure_slices"), + collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), + collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), + collect_list(col("total_vms_queued")).alias("total_vms_queued"), + collect_list(col("total_vms_finished")).alias("total_vms_finished"), + collect_list(col("total_vms_failed")).alias("total_vms_failed") + ) + } + + // Spark helper functions + operator fun Column.times(other: Column): Column = `$times`(other) + operator fun Column.div(other: Column): Column = `$div`(other) + operator fun Column.get(other: Column): Column = this.apply(other) + + val sliceLength = 5 * 60 * 1000 + val states = map( + lit("ERROR"), lit(1), + lit("ACTIVE"), lit(0), + lit("SHUTOFF"), lit(0) + ) + val oppositeStates = map( + lit("ERROR"), lit(0), + lit("ACTIVE"), lit(1), + lit("SHUTOFF"), lit(1) + ) +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt new file mode 100644 index 00000000..40ffd282 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -0,0 +1,93 @@ +package com.atlarge.opendc.runner.web + +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import java.time.Instant +import org.bson.Document + +/** + * Manages the queue of scenarios that need to be processed. + */ +class ScenarioManager(private val collection: MongoCollection<Document>) { + /** + * Find the next scenario that the simulator needs to process. + */ + fun findNext(): Document? { + return collection + .find(Filters.eq("simulation.state", "QUEUED")) + .first() + } + + /** + * Claim the scenario in the database with the specified id. + */ + fun claim(id: String): Boolean { + val res = collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "QUEUED") + ), + Updates.combine( + Updates.set("simulation.state", "RUNNING"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + return res != null + } + + /** + * Update the heartbeat of the specified scenario. + */ + fun heartbeat(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "RUNNING") + ), + Updates.set("simulation.heartbeat", Instant.now()) + ) + } + + /** + * Mark the scenario as failed. + */ + fun fail(id: String) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FAILED"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + } + + /** + * Persist the specified results. + */ + fun finish(id: String, result: ResultProcessor.Result) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FINISHED"), + Updates.unset("simulation.time"), + Updates.set("results.total_requested_burst", result.totalRequestedBurst), + Updates.set("results.total_granted_burst", result.totalGrantedBurst), + Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), + Updates.set("results.total_interfered_burst", result.totalInterferedBurst), + Updates.set("results.mean_cpu_usage", result.meanCpuUsage), + Updates.set("results.mean_cpu_demand", result.meanCpuDemand), + Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.total_power_draw", result.totalPowerDraw), + Updates.set("results.total_failure_slices", result.totalFailureSlices), + Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), + Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), + Updates.set("results.total_vms_queued", result.totalVmsQueued), + Updates.set("results.total_vms_finished", result.totalVmsFinished), + Updates.set("results.total_vms_failed", result.totalVmsFailed) + ) + ) + } +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt new file mode 100644 index 00000000..499585ec --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -0,0 +1,127 @@ +package com.atlarge.opendc.runner.web + +import com.atlarge.odcsim.Domain +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.mongodb.client.AggregateIterable +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Aggregates +import com.mongodb.client.model.Field +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Projections +import java.util.* +import kotlinx.coroutines.launch +import org.bson.Document + +/** + * A helper class that converts the MongoDB topology into an OpenDC environment. + */ +class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader { + /** + * Parse the topology with the specified [id]. + */ + override suspend fun construct(dom: Domain): Environment { + val nodes = mutableListOf<SimpleBareMetalDriver>() + val random = Random(0) + + for (machine in fetchMachines(id)) { + val machineId = machine.getString("_id") + val clusterId = machine.getString("rack_id") + val position = machine.getInteger("position") + + val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> + val cores = cpu.getInteger("numberOfCores") + val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() + // TODO Remove hardcoding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> + MemoryUnit( + "Samsung", + memory.getString("name"), + memory.get("speedMbPerS", Number::class.java).toDouble(), + memory.get("sizeMb", Number::class.java).toLong() + ) + } + nodes.add( + SimpleBareMetalDriver( + dom.newDomain(machineId), + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf(NODE_CLUSTER to clusterId), + processors, + memoryUnits, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearLoadPowerModel(200.0, 350.0) + ) + ) + } + + val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + dom.launch { + for (node in nodes) { + provisioningService.create(node) + } + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), "opendc-platform", listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(fetchName(id), null, listOf(platform)) + } + + override fun close() {} + + /** + * Fetch the metadata of the topology. + */ + private fun fetchName(id: String): String { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.include("name")) + ) + ) + .first()!! + .getString("name") + } + + /** + * Fetch a topology from the database with the specified [id]. + */ + private fun fetchMachines(id: String): AggregateIterable<Document> { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), + Aggregates.unwind("\$racks"), + Aggregates.unwind("\$racks"), + Aggregates.replaceRoot("\$racks"), + Aggregates.addFields(Field("machines.rack_id", "\$_id")), + Aggregates.unwind("\$machines"), + Aggregates.replaceRoot("\$machines") + ) + ) + } +} diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml new file mode 100644 index 00000000..1d873554 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ MIT License + ~ + ~ Copyright (c) 2020 atlarge-research + ~ + ~ Permission is hereby granted, free of charge, to any person obtaining a copy + ~ of this software and associated documentation files (the "Software"), to deal + ~ in the Software without restriction, including without limitation the rights + ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + ~ copies of the Software, and to permit persons to whom the Software is + ~ furnished to do so, subject to the following conditions: + ~ + ~ The above copyright notice and this permission notice shall be included in all + ~ copies or substantial portions of the Software. + ~ + ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + ~ SOFTWARE. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" /> + </Console> + </Appenders> + <Loggers> + <Logger name="com.atlarge.odcsim" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="com.atlarge.opendc" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="com.atlarge.opendc.runner" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="org.apache.hadoop" level="warn" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="org.apache.spark" level="info" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + <Root level="error"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index 7c7990e2..1193f7b2 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -39,13 +39,13 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job +import java.util.PriorityQueue +import java.util.Queue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import java.util.PriorityQueue -import java.util.Queue import kotlinx.coroutines.launch import kotlinx.coroutines.withContext diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index ad818dde..a60ba0e2 100644 --- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.core.services.AbstractServiceKey import com.atlarge.opendc.workflows.workload.Job -import kotlinx.coroutines.flow.Flow import java.util.UUID +import kotlinx.coroutines.flow.Flow /** * A service for cloud workflow management. diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 5ee6d5e6..5c129e37 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,6 +35,8 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import java.util.ServiceLoader +import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -44,8 +46,6 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import java.util.ServiceLoader -import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. |
