summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--api/opendc/api/v2/portfolios/portfolioId/scenarios/endpoint.py2
-rw-r--r--api/opendc/api/v2/portfolios/portfolioId/scenarios/test_endpoint.py10
-rw-r--r--frontend/src/shapes/index.js4
-rw-r--r--opendc-api-spec.yml7
-rw-r--r--simulator/.editorconfig2
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt20
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt12
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt8
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt6
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt8
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt25
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt12
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt14
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt10
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt16
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt12
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt2
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt2
-rw-r--r--simulator/opendc/opendc-runner-web/build.gradle.kts5
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt285
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt77
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt127
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt4
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt4
48 files changed, 621 insertions, 117 deletions
diff --git a/api/opendc/api/v2/portfolios/portfolioId/scenarios/endpoint.py b/api/opendc/api/v2/portfolios/portfolioId/scenarios/endpoint.py
index 1c5e0ab6..ca1db36a 100644
--- a/api/opendc/api/v2/portfolios/portfolioId/scenarios/endpoint.py
+++ b/api/opendc/api/v2/portfolios/portfolioId/scenarios/endpoint.py
@@ -33,7 +33,7 @@ def POST(request):
scenario = Scenario(request.params_body['scenario'])
scenario.set_property('portfolioId', request.params_path['portfolioId'])
- scenario.set_property('simulationState', 'QUEUED')
+ scenario.set_property('simulation', {'state': 'QUEUED'})
scenario.insert()
diff --git a/api/opendc/api/v2/portfolios/portfolioId/scenarios/test_endpoint.py b/api/opendc/api/v2/portfolios/portfolioId/scenarios/test_endpoint.py
index 8b55bab0..329e68e8 100644
--- a/api/opendc/api/v2/portfolios/portfolioId/scenarios/test_endpoint.py
+++ b/api/opendc/api/v2/portfolios/portfolioId/scenarios/test_endpoint.py
@@ -72,7 +72,9 @@ def test_add_scenario(client, mocker):
'projectId': '1',
'authorizationLevel': 'EDIT'
}],
- 'simulationState': 'QUEUED',
+ 'simulation': {
+ 'state': 'QUEUED',
+ },
})
mocker.patch.object(DB,
'insert',
@@ -92,7 +94,9 @@ def test_add_scenario(client, mocker):
'schedulerName': 'DEFAULT',
},
'portfolioId': '1',
- 'simulationState': 'QUEUED',
+ 'simulationState': {
+ 'state': 'QUEUED',
+ },
})
mocker.patch.object(DB, 'update', return_value=None)
res = client.post(
@@ -115,5 +119,5 @@ def test_add_scenario(client, mocker):
}
})
assert 'portfolioId' in res.json['content']
- assert 'simulationState' in res.json['content']
+ assert 'simulation' in res.json['content']
assert '200' in res.status
diff --git a/frontend/src/shapes/index.js b/frontend/src/shapes/index.js
index 32914f25..8296055a 100644
--- a/frontend/src/shapes/index.js
+++ b/frontend/src/shapes/index.js
@@ -111,7 +111,9 @@ Shapes.Scenario = PropTypes.shape({
_id: PropTypes.string.isRequired,
portfolioId: PropTypes.string.isRequired,
name: PropTypes.string.isRequired,
- simulationState: PropTypes.string.isRequired,
+ simulation: PropTypes.shape({
+ state: PropTypes.string.isRequired,
+ }).isRequired,
trace: PropTypes.shape({
traceId: PropTypes.string.isRequired,
trace: Shapes.Trace,
diff --git a/opendc-api-spec.yml b/opendc-api-spec.yml
index 39cb4f1c..36ef3c83 100644
--- a/opendc-api-spec.yml
+++ b/opendc-api-spec.yml
@@ -860,8 +860,11 @@ definitions:
type: string
name:
type: string
- simulationState:
- type: string
+ simulation:
+ type: object
+ properties:
+ state:
+ type: string
trace:
type: object
properties:
diff --git a/simulator/.editorconfig b/simulator/.editorconfig
index a17544c9..a5584e95 100644
--- a/simulator/.editorconfig
+++ b/simulator/.editorconfig
@@ -4,4 +4,4 @@
# ktlint
[*.{kt, kts}]
-disabled_rules = import-ordering
+disabled_rules = no-wildcard-imports
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
index 5d9af9ec..0e18f82f 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
@@ -24,6 +24,7 @@
package com.atlarge.odcsim.flow
+import java.util.WeakHashMap
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
@@ -32,7 +33,6 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.consumeAsFlow
-import java.util.WeakHashMap
/**
* A [Flow] that can be used to emit events.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index 01968cd8..fd0fc836 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A server instance that is running on some physical or virtual machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index 7cb4c0c5..cb637aea 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 41cec291..17d8ee53 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A driver interface for the management interface of a bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 6a77415c..a453e459 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -28,10 +28,10 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
-import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
@@ -46,6 +46,14 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
+import java.lang.Exception
+import java.time.Clock
+import java.util.UUID
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -59,15 +67,7 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
import kotlinx.coroutines.withContext
-import java.lang.Exception
-import java.time.Clock
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 69b0124d..1e7e351f 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.core.Identity
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index bd395f0d..607759a8 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
+import java.util.UUID
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
-import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 3c41f52e..192db413 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -35,11 +35,15 @@ import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
@@ -55,10 +59,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index 1002d382..b1844f67 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A driver interface for a hypervisor running on some host server and communicating with the central compute service to
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index ff4aa3d7..79388bc3 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -11,11 +11,14 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.math.max
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
@@ -27,9 +30,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import mu.KotlinLogging
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
index 417db77d..1c7b751c 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.compute.core.image
+import java.util.UUID
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import java.util.UUID
/**
* Test suite for [FlopsApplicationImage]
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 071c0626..af9d3421 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -31,6 +31,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
+import java.util.ServiceLoader
+import java.util.UUID
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -39,8 +41,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
-import java.util.UUID
internal class SimpleBareMetalDriverTest {
/**
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index f8bd786e..ed2256c0 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -29,13 +29,13 @@ import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import java.util.ServiceLoader
+import java.util.UUID
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
-import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index ca00fc94..622b185e 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -25,14 +25,16 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import java.util.ServiceLoader
+import java.util.UUID
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
@@ -43,8 +45,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import java.util.ServiceLoader
-import java.util.UUID
/**
* Basic test-suite for the hypervisor.
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
index 50261db5..f77a581e 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -26,14 +26,14 @@ package com.atlarge.opendc.core.failure
import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.simulationContext
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.ensureActive
-import kotlinx.coroutines.launch
import kotlin.math.exp
import kotlin.math.max
import kotlin.random.Random
import kotlin.random.asJavaRandom
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.ensureActive
+import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 1b896858..0f62667f 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -25,11 +25,11 @@
package com.atlarge.opendc.core.failure
import com.atlarge.odcsim.simulationContext
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlin.math.ln1p
import kotlin.math.pow
import kotlin.random.Random
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index b0182ab3..7659b18e 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -38,6 +38,8 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import java.io.File
+import java.util.ServiceLoader
import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
@@ -46,8 +48,6 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import java.io.File
-import java.util.ServiceLoader
/**
* Main entry point of the experiment.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index 677af381..faa68e34 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -48,9 +48,9 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.int
-import mu.KotlinLogging
import java.io.File
import java.io.InputStream
+import mu.KotlinLogging
/**
* The logger for this experiment.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index a70297d2..b09c0dbb 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -45,19 +45,20 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging
-import java.io.File
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
/**
* The logger for this experiment.
@@ -209,7 +210,6 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
try {
var submitted = 0
- val finished = Channel<Unit>(Channel.CONFLATED)
while (reader.hasNext()) {
val (time, workload) = reader.next()
@@ -228,17 +228,20 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
if (it is ServerEvent.StateChanged) {
monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
}
-
- delay(1)
- finished.send(Unit)
}
.collect()
}
}
- while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
- finished.receive()
- }
+ scheduler.events
+ .takeWhile {
+ when (it) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ it.inactiveVmCount + it.failedVmCount != submitted
+ }
+ }
+ .collect()
+ delay(1)
} finally {
reader.close()
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 5d1c29e2..1580e4dd 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.random.Random
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import java.io.File
-import java.util.ServiceLoader
-import kotlin.random.Random
/**
* The logger for the experiment scenario.
@@ -106,7 +106,11 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
?.construct(seeder) ?: emptyMap()
val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed)
- val monitor = ParquetExperimentMonitor(this)
+ val monitor = ParquetExperimentMonitor(
+ parent.parent.parent.output,
+ "portfolio_id=${parent.parent.id}/scenario_id=${parent.id}/run_id=$id",
+ parent.parent.parent.bufferSize
+ )
root.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index be60e5b7..b931fef9 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -27,13 +27,12 @@ package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
-import com.atlarge.opendc.experiments.sc20.experiment.Run
import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
-import mu.KotlinLogging
import java.io.File
+import mu.KotlinLogging
/**
* The logger instance to use.
@@ -43,15 +42,14 @@ private val logger = KotlinLogging.logger {}
/**
* An [ExperimentMonitor] that logs the events to a Parquet file.
*/
-class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
- private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}"
+class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
private val hostWriter = ParquetHostEventWriter(
- File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"),
- run.parent.parent.parent.bufferSize
+ File(base, "host-metrics/$partition/data.parquet"),
+ bufferSize
)
private val provisionerWriter = ParquetProvisionerEventWriter(
- File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"),
- run.parent.parent.parent.bufferSize
+ File(base, "provisioner-metrics/$partition/data.parquet"),
+ bufferSize
)
private val currentHostEvent = mutableMapOf<Server, HostEvent>()
private var startTime = -1L
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
index 31632b8c..a7c8ba4d 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -25,12 +25,12 @@
package com.atlarge.opendc.experiments.sc20.runner.execution
import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import java.util.concurrent.Executors
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withContext
-import java.util.concurrent.Executors
/**
* An [ExperimentScheduler] that runs experiments using a local thread pool.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
index 3b80276f..28a19172 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
-import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
+import kotlinx.coroutines.runBlocking
/**
* The default implementation of the [ExperimentRunner] interface.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index a69bd4b2..e42ac654 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -25,17 +25,17 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.Event
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
/**
* The logging instance to use.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
index 3bc09435..9fa4e0fb 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import java.io.File
/**
* A Parquet event writer for [HostEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
index 1f3b0472..3d28860c 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import java.io.File
/**
* A Parquet event writer for [ProvisionerEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
index 1549b8d2..c1724369 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import java.io.File
/**
* A Parquet event writer for [RunEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index 652f7746..f9709b9f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import java.util.UUID
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import java.io.File
-import java.util.UUID
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index f6d6e6fd..8b7b222f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -32,6 +32,14 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
@@ -41,14 +49,6 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-import kotlin.random.Random
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 0877ad52..d6726910 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -25,6 +25,12 @@
package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -33,12 +39,6 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.Random
-import kotlin.math.max
-import kotlin.math.min
/**
* A script to convert a trace in text format into a Parquet trace.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
index dd70d4f1..f2a0e627 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
import com.atlarge.opendc.format.trace.TraceEntry
-import mu.KotlinLogging
import kotlin.random.Random
+import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5ecf7605..a79e9a5a 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -42,6 +42,8 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import java.util.ServiceLoader
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
@@ -52,8 +54,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import java.io.File
-import java.util.ServiceLoader
/**
* An integration test suite for the SC20 experiments.
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 5f220ad0..a9aa3337 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -25,9 +25,9 @@
package com.atlarge.opendc.format.environment.sc18
import com.atlarge.odcsim.Domain
-import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 2a8fefeb..1cabc8bc 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -26,10 +26,10 @@ package com.atlarge.opendc.format.trace.bitbrains
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.core.User
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index 076274d5..8e34505a 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -26,10 +26,10 @@ package com.atlarge.opendc.format.trace.sc20
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.core.User
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 41ad8aba..94e4b0fc 100644
--- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -1,8 +1,8 @@
package com.atlarge.opendc.format.trace.swf
+import java.io.File
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import java.io.File
class SwfTraceReaderTest {
@Test
diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts
index 50789789..52a59694 100644
--- a/simulator/opendc/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc/opendc-runner-web/build.gradle.kts
@@ -36,10 +36,13 @@ application {
dependencies {
api(project(":opendc:opendc-core"))
+ implementation(project(":opendc:opendc-compute"))
+ implementation(project(":opendc:opendc-format"))
+ implementation(project(":opendc:opendc-experiments-sc20"))
implementation("com.github.ajalt:clikt:2.8.0")
implementation("io.github.microutils:kotlin-logging:1.7.10")
- implementation("org.mongodb:mongo-java-driver:3.12.6")
+ implementation("org.mongodb:mongodb-driver-sync:4.0.5")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
runtimeOnly(project(":odcsim:odcsim-engine-omega"))
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
index 3cee259c..d70ad6bd 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -1,16 +1,299 @@
package com.atlarge.opendc.runner.web
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.virt.service.allocation.*
+import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
+import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.processTrace
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import com.mongodb.MongoClientSettings
+import com.mongodb.MongoCredential
+import com.mongodb.ServerAddress
+import com.mongodb.client.MongoClients
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.MongoDatabase
+import com.mongodb.client.model.Filters
+import java.io.File
+import java.util.*
+import kotlin.random.Random
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
+import org.bson.Document
private val logger = KotlinLogging.logger {}
/**
+ * The provider for the simulation engine to use.
+ */
+private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+
+/**
* Represents the CLI command for starting the OpenDC web runner.
*/
class RunnerCli : CliktCommand(name = "runner") {
- override fun run() {
+ /**
+ * The name of the database to use.
+ */
+ private val mongoDb by option(
+ "--mongo-db",
+ help = "name of the database to use",
+ envvar = "OPENDC_DB"
+ )
+ .default("opendc")
+
+ /**
+ * The database host to connect to.
+ */
+ private val mongoHost by option(
+ "--mongo-host",
+ help = "database host to connect to",
+ envvar = "OPENDC_DB_HOST"
+ )
+ .default("localhost")
+
+ /**
+ * The database port to connect to.
+ */
+ private val mongoPort by option(
+ "--mongo-port",
+ help = "database port to connect to",
+ envvar = "OPENDC_DB_PORT"
+ )
+ .int()
+ .default(27017)
+
+ /**
+ * The database user to connect with.
+ */
+ private val mongoUser by option(
+ "--mongo-user",
+ help = "database user to connect with",
+ envvar = "OPENDC_DB_USER"
+ )
+ .default("opendc")
+
+ /**
+ * The database password to connect with.
+ */
+ private val mongoPassword by option(
+ "--mongo-password",
+ help = "database password to connect with",
+ envvar = "OPENDC_DB_PASSWORD"
+ )
+ .convert { it.toCharArray() }
+ .required()
+
+ /**
+ * The path to the traces directory.
+ */
+ private val tracePath by option(
+ "--traces",
+ help = "path to the directory containing the traces",
+ envvar = "OPENDC_TRACES"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("traces/") }
+
+ /**
+ * The path to the output directory.
+ */
+ private val outputPath by option(
+ "--output",
+ help = "path to the results directory"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("results/") }
+
+ /**
+ * Connect to the user-specified database.
+ */
+ private fun createDatabase(): MongoDatabase {
+ val credential = MongoCredential.createScramSha1Credential(
+ mongoUser,
+ mongoDb,
+ mongoPassword
+ )
+
+ val settings = MongoClientSettings.builder()
+ .credential(credential)
+ .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) }
+ .build()
+ val client = MongoClients.create(settings)
+ return client.getDatabase(mongoDb)
+ }
+
+ /**
+ * Run a single scenario.
+ */
+ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) {
+ val id = scenario.getString("_id")
+ val traceReader = Sc20RawParquetTraceReader(
+ File(
+ tracePath,
+ scenario.getEmbedded(listOf("trace", "traceId"), String::class.java)
+ )
+ )
+
+ val targets = portfolio.get("targets", Document::class.java)
+
+ repeat(targets.getInteger("repeatsPerScenario")) {
+ logger.info { "Starting repeat $it" }
+ runRepeat(scenario, it, topologies, traceReader)
+ }
+
+ logger.info { "Finished scenario $id" }
+ }
+
+ /**
+ * Run a single repeat.
+ */
+ private suspend fun runRepeat(
+ scenario: Document,
+ repeat: Int,
+ topologies: MongoCollection<Document>,
+ traceReader: Sc20RawParquetTraceReader
+ ) {
+ val id = scenario.getString("_id")
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val system = provider("experiment-$id")
+ val root = system.newDomain("root")
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ emptyMap(),
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val monitor = ParquetExperimentMonitor(
+ outputPath,
+ "scenario_id=$id/run_id=$repeat",
+ 4096
+ )
+
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ root,
+ environment,
+ allocationPolicy
+ )
+
+ val failureDomain = if (operational.getBoolean("failuresEnabled")) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ seeder.nextInt(),
+ operational.getDouble("failureFrequency"),
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ trace,
+ scheduler,
+ chan,
+ monitor,
+ emptyMap()
+ )
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ try {
+ system.run()
+ } finally {
+ system.terminate()
+ monitor.close()
+ }
+ }
+
+ override fun run() = runBlocking(Dispatchers.Default) {
logger.info { "Starting OpenDC web runner" }
+
+ logger.info { "Connecting to MongoDB instance" }
+ val database = createDatabase()
+ val manager = ScenarioManager(database.getCollection("scenarios"))
+ val portfolios = database.getCollection("portfolios")
+ val topologies = database.getCollection("topologies")
+
+ logger.info { "Watching for queued scenarios" }
+
+ while (true) {
+ val scenario = manager.findNext()
+
+ if (scenario == null) {
+ delay(5000)
+ continue
+ }
+
+ val id = scenario.getString("_id")
+
+ logger.info { "Found queued scenario $id: attempting to claim" }
+
+ if (!manager.claim(id)) {
+ logger.info { "Failed to claim scenario" }
+ continue
+ }
+
+ coroutineScope {
+ // Launch heartbeat process
+ launch {
+ delay(60000)
+ manager.heartbeat(id)
+ }
+
+ try {
+ val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!!
+ runScenario(portfolio, scenario, topologies)
+ manager.finish(id)
+ } catch (e: Exception) {
+ logger.warn(e) { "Scenario failed to finish" }
+ manager.fail(id)
+ }
+ }
+ }
}
}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
new file mode 100644
index 00000000..0f375385
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
@@ -0,0 +1,77 @@
+package com.atlarge.opendc.runner.web
+
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Updates
+import java.time.Instant
+import org.bson.Document
+
+/**
+ * Manages the queue of scenarios that need to be processed.
+ */
+class ScenarioManager(private val collection: MongoCollection<Document>) {
+ /**
+ * Find the next scenario that the simulator needs to process.
+ */
+ fun findNext(): Document? {
+ return collection
+ .find(Filters.eq("simulation.state", "QUEUED"))
+ .first()
+ }
+
+ /**
+ * Claim the scenario in the database with the specified id.
+ */
+ fun claim(id: String): Boolean {
+ val res = collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "QUEUED")
+ ),
+ Updates.combine(
+ Updates.set("simulation.state", "RUNNING"),
+ Updates.set("simulation.time", Instant.now())
+ )
+ )
+ return res != null
+ }
+
+ /**
+ * Update the heartbeat of the specified scenario.
+ */
+ fun heartbeat(id: String) {
+ collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "RUNNING")
+ ),
+ Updates.set("simulation.time", Instant.now())
+ )
+ }
+
+ /**
+ * Mark the scenario as failed.
+ */
+ fun fail(id: String) {
+ collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "FAILED")
+ ),
+ Updates.set("simulation.time", Instant.now())
+ )
+ }
+
+ /**
+ * Mark the scenario as finished.
+ */
+ fun finish(id: String) {
+ collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "FINISHED")
+ ),
+ Updates.set("simulation.time", Instant.now())
+ )
+ }
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
new file mode 100644
index 00000000..499585ec
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -0,0 +1,127 @@
+package com.atlarge.opendc.runner.web
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
+import com.atlarge.opendc.core.Environment
+import com.atlarge.opendc.core.Platform
+import com.atlarge.opendc.core.Zone
+import com.atlarge.opendc.core.services.ServiceRegistry
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.mongodb.client.AggregateIterable
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Aggregates
+import com.mongodb.client.model.Field
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Projections
+import java.util.*
+import kotlinx.coroutines.launch
+import org.bson.Document
+
+/**
+ * A helper class that converts the MongoDB topology into an OpenDC environment.
+ */
+class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader {
+ /**
+ * Parse the topology with the specified [id].
+ */
+ override suspend fun construct(dom: Domain): Environment {
+ val nodes = mutableListOf<SimpleBareMetalDriver>()
+ val random = Random(0)
+
+ for (machine in fetchMachines(id)) {
+ val machineId = machine.getString("_id")
+ val clusterId = machine.getString("rack_id")
+ val position = machine.getInteger("position")
+
+ val processors = machine.getList("cpus", Document::class.java).flatMap { cpu ->
+ val cores = cpu.getInteger("numberOfCores")
+ val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
+ // TODO Remove hardcoding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.getString("name"),
+ memory.get("speedMbPerS", Number::class.java).toDouble(),
+ memory.get("sizeMb", Number::class.java).toLong()
+ )
+ }
+ nodes.add(
+ SimpleBareMetalDriver(
+ dom.newDomain(machineId),
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf(NODE_CLUSTER to clusterId),
+ processors,
+ memoryUnits,
+ // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
+ // power draw of 350W.
+ // Source: https://stackoverflow.com/questions/6128960
+ LinearLoadPowerModel(200.0, 350.0)
+ )
+ )
+ }
+
+ val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ dom.launch {
+ for (node in nodes) {
+ provisioningService.create(node)
+ }
+ }
+
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
+
+ val platform = Platform(
+ UUID.randomUUID(), "opendc-platform", listOf(
+ Zone(UUID.randomUUID(), "zone", serviceRegistry)
+ )
+ )
+
+ return Environment(fetchName(id), null, listOf(platform))
+ }
+
+ override fun close() {}
+
+ /**
+ * Fetch the metadata of the topology.
+ */
+ private fun fetchName(id: String): String {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.include("name"))
+ )
+ )
+ .first()!!
+ .getString("name")
+ }
+
+ /**
+ * Fetch a topology from the database with the specified [id].
+ */
+ private fun fetchMachines(id: String): AggregateIterable<Document> {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))),
+ Aggregates.unwind("\$racks"),
+ Aggregates.unwind("\$racks"),
+ Aggregates.replaceRoot("\$racks"),
+ Aggregates.addFields(Field("machines.rack_id", "\$_id")),
+ Aggregates.unwind("\$machines"),
+ Aggregates.replaceRoot("\$machines")
+ )
+ )
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 7c7990e2..1193f7b2 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -39,13 +39,13 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
+import java.util.PriorityQueue
+import java.util.Queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import java.util.PriorityQueue
-import java.util.Queue
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index ad818dde..a60ba0e2 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
import com.atlarge.opendc.workflows.workload.Job
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A service for cloud workflow management.
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 5ee6d5e6..5c129e37 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,6 +35,8 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import java.util.ServiceLoader
+import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -44,8 +46,6 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
-import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].