summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc-compute/opendc-compute-api/build.gradle.kts33
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt46
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt)9
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt)6
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt)19
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt)52
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt)6
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt39
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt56
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt3
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt63
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt66
-rw-r--r--simulator/opendc-compute/opendc-compute-service/build.gradle.kts40
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt92
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt)16
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt104
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt)22
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt)22
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt31
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt38
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt)2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt)2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt)2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt)2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt)6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt)2
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt86
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt414
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt)18
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt)13
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt)8
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt)6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt)20
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt)6
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt)8
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt)16
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt)19
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts3
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt36
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt37
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt107
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt303
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt69
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt253
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt418
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt43
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt6
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt25
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt (renamed from simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt)82
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt7
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt91
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt11
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt20
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt56
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt13
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt21
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt25
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt35
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt22
-rw-r--r--simulator/opendc-format/build.gradle.kts2
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt4
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt6
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt4
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt27
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt6
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt27
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt27
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt11
-rw-r--r--simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt7
-rw-r--r--simulator/opendc-metal/build.gradle.kts (renamed from simulator/opendc-compute/opendc-compute-core/build.gradle.kts)3
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt)2
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt)16
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt)6
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt)2
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt)10
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt)10
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt (renamed from simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt)10
-rw-r--r--simulator/opendc-platform/build.gradle.kts1
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt10
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt6
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt70
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt38
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt5
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt49
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt40
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt28
-rw-r--r--simulator/opendc-workflows/build.gradle.kts7
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt30
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt2
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt12
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt43
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt2
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt15
-rw-r--r--simulator/opendc-workflows/src/test/resources/log4j2.xml35
-rw-r--r--simulator/settings.gradle.kts4
103 files changed, 2019 insertions, 1730 deletions
diff --git a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts
new file mode 100644
index 00000000..10046322
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "API interface for the OpenDC Compute service"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api(project(":opendc-core"))
+}
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
new file mode 100644
index 00000000..025513e6
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.api
+
+/**
+ * A client interface for the OpenDC Compute service.
+ */
+public interface ComputeClient : AutoCloseable {
+ /**
+ * Create a new [Server] instance at this compute service.
+ *
+ * @param name The name of the server to deploy.
+ * @param image The image to be deployed.
+ * @param flavor The flavor of the machine instance to run this [image] on.
+ */
+ public suspend fun newServer(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server
+
+ /**
+ * Release the resources associated with this client, preventing any further API calls.
+ */
+ public override fun close()
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt
index 6c724277..64a47277 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/workload/VmWorkload.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,9 +20,8 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.workload
+package org.opendc.compute.api
-import org.opendc.compute.core.image.Image
import org.opendc.core.User
import org.opendc.core.workload.Workload
import java.util.UUID
@@ -35,13 +34,13 @@ import java.util.UUID
* @property owner The owner of the VM.
* @property image The image of the VM.
*/
-public data class VmWorkload(
+public data class ComputeWorkload(
override val uid: UUID,
override val name: String,
override val owner: User,
val image: Image
) : Workload {
- override fun equals(other: Any?): Boolean = other is VmWorkload && uid == other.uid
+ override fun equals(other: Any?): Boolean = other is ComputeWorkload && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
index e5ca115f..bf5f0ce4 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Flavor.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core
+package org.opendc.compute.api
/**
* Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
index e481fcc3..280c4d89 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/Image.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,9 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.image
+package org.opendc.compute.api
import org.opendc.core.resource.Resource
+import org.opendc.core.resource.TagContainer
+import java.util.*
/**
* An image containing a bootable operating system that can directly be executed by physical or virtual server.
@@ -32,4 +34,15 @@ import org.opendc.core.resource.Resource
* useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server
* configuration frequently.
*/
-public interface Image : Resource
+public data class Image(
+ public override val uid: UUID,
+ public override val name: String,
+ public override val tags: TagContainer
+) : Resource {
+ public companion object {
+ /**
+ * An empty boot disk [Image] that exits immediately on start.
+ */
+ public val EMPTY: Image = Image(UUID.randomUUID(), "empty", emptyMap())
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
index 948f622f..ab1eb860 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/Server.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,59 +20,55 @@
* SOFTWARE.
*/
-package org.opendc.compute.core
+package org.opendc.compute.api
-import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.image.Image
import org.opendc.core.resource.Resource
-import org.opendc.core.resource.TagContainer
-import org.opendc.core.services.ServiceRegistry
-import java.util.UUID
/**
- * A server instance that is running on some physical or virtual machine.
+ * A stateful object representing a server instance that is running on some physical or virtual machine.
*/
-public data class Server(
+public interface Server : Resource {
/**
- * The unique identifier of the server.
+ * The name of the server.
*/
- public override val uid: UUID,
+ public override val name: String
/**
- * The optional name of the server.
+ * The flavor of the server.
*/
- public override val name: String,
+ public val flavor: Flavor
/**
- * The tags of this server.
+ * The image of the server.
*/
- public override val tags: TagContainer,
+ public val image: Image
/**
- * The hardware configuration of the server.
+ * The tags assigned to the server.
*/
- public val flavor: Flavor,
+ public override val tags: Map<String, String>
/**
- * The image running on the server.
+ * The last known state of the server.
*/
- public val image: Image,
+ public val state: ServerState
/**
- * The last known state of the server.
+ * Register the specified [ServerWatcher] to watch the state of the server.
+ *
+ * @param watcher The watcher to register for the server.
*/
- public val state: ServerState,
+ public fun watch(watcher: ServerWatcher)
/**
- * The services published by this server.
+ * De-register the specified [ServerWatcher] from the server to stop it from receiving events.
+ *
+ * @param watcher The watcher to de-register from the server.
*/
- public val services: ServiceRegistry,
+ public fun unwatch(watcher: ServerWatcher)
/**
- * The events that are emitted by the server.
+ * Refresh the local state of the resource.
*/
- public val events: Flow<ServerEvent>
-) : Resource {
- override fun hashCode(): Int = uid.hashCode()
- override fun equals(other: Any?): Boolean = other is Server && uid == other.uid
+ public suspend fun refresh()
}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
index 4b9d7c13..25d2e519 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerState.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core
+package org.opendc.compute.api
/**
* An enumeration describing the possible states of a server.
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt
new file mode 100644
index 00000000..48a17b30
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerWatcher.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.api
+
+/**
+ * An interface used to watch the state of [Server] instances.
+ */
+public interface ServerWatcher {
+ /**
+ * This method is invoked when the state of a [Server] changes.
+ *
+ * Note that the state of [server] might not reflect the state as reported by the invocation, as a call to
+ * [Server.refresh] is required to update its state.
+ *
+ * @param server The server whose state has changed.
+ * @param newState The new state of the server.
+ */
+ public fun onStateChanged(server: Server, newState: ServerState) {}
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt
deleted file mode 100644
index 1ae52baa..00000000
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.core.virt
-
-import kotlinx.coroutines.flow.Flow
-import org.opendc.core.Identity
-import java.util.UUID
-
-/**
- * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
- * into several virtual guest machines.
- */
-public class Hypervisor(
- /**
- * The unique identifier of the hypervisor.
- */
- override val uid: UUID,
-
- /**
- * The optional name of the hypervisor.
- */
- override val name: String,
-
- /**
- * Metadata of the hypervisor.
- */
- public val metadata: Map<String, Any>,
-
- /**
- * The events that are emitted by the hypervisor.
- */
- public val events: Flow<HypervisorEvent>
-) : Identity {
- override fun hashCode(): Int = uid.hashCode()
- override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid
-}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt
deleted file mode 100644
index 6fe84ea6..00000000
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt
+++ /dev/null
@@ -1,3 +0,0 @@
-package org.opendc.compute.core.virt.driver
-
-public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.")
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
deleted file mode 100644
index 68cc7b50..00000000
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.core.virt.driver
-
-import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.core.services.AbstractServiceKey
-import java.util.UUID
-
-/**
- * A driver interface for a hypervisor running on some host server and communicating with the central compute service to
- * provide virtualization for that particular resource.
- */
-public interface VirtDriver {
- /**
- * The events emitted by the driver.
- */
- public val events: Flow<HypervisorEvent>
-
- /**
- * Determine whether the specified [flavor] can still fit on this driver.
- */
- public fun canFit(flavor: Flavor): Boolean
-
- /**
- * Spawn the given [Image] on the compute resource of this driver.
- *
- * @param name The name of the server to spawn.
- * @param image The image to deploy.
- * @param flavor The flavor of the server which this driver is controlling.
- * @return The virtual server spawned by this method.
- */
- public suspend fun spawn(
- name: String,
- image: Image,
- flavor: Flavor
- ): Server
-
- public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
-}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
deleted file mode 100644
index 3d722110..00000000
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.core.virt.service
-
-import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.driver.VirtDriver
-
-/**
- * A service for VM provisioning on a cloud.
- */
-public interface VirtProvisioningService {
- /**
- * The events emitted by the service.
- */
- public val events: Flow<VirtProvisioningEvent>
-
- /**
- * Obtain the active hypervisors for this provisioner.
- */
- public suspend fun drivers(): Set<VirtDriver>
-
- /**
- * The number of hosts available in the system.
- */
- public val hostCount: Int
-
- /**
- * Submit the specified [Image] to the provisioning service.
- *
- * @param name The name of the server to deploy.
- * @param image The image to be deployed.
- * @param flavor The flavor of the machine instance to run this [image] on.
- */
- public suspend fun deploy(
- name: String,
- image: Image,
- flavor: org.opendc.compute.core.Flavor
- ): Server
-
- /**
- * Terminate the provisioning service releasing all the leased bare-metal machines.
- */
- public suspend fun terminate()
-}
diff --git a/simulator/opendc-compute/opendc-compute-service/build.gradle.kts b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
new file mode 100644
index 00000000..1b09ef6d
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/build.gradle.kts
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "OpenDC Compute Service implementation"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+}
+
+dependencies {
+ api(platform(project(":opendc-platform")))
+ api(project(":opendc-compute:opendc-compute-api"))
+ api(project(":opendc-trace:opendc-trace-core"))
+ implementation(project(":opendc-utils"))
+ implementation("io.github.microutils:kotlin-logging")
+
+ testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
new file mode 100644
index 00000000..593e4b56
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service
+
+import kotlinx.coroutines.flow.Flow
+import org.opendc.compute.api.ComputeClient
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.trace.core.EventTracer
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * The [ComputeService] hosts the API implementation of the OpenDC Compute service.
+ */
+public interface ComputeService : AutoCloseable {
+ /**
+ * The events emitted by the service.
+ */
+ public val events: Flow<ComputeServiceEvent>
+
+ /**
+ * The hosts that are used by the compute service.
+ */
+ public val hosts: Set<Host>
+
+ /**
+ * The number of hosts available in the system.
+ */
+ public val hostCount: Int
+
+ /**
+ * Create a new [ComputeClient] to control the compute service.
+ */
+ public fun newClient(): ComputeClient
+
+ /**
+ * Add a [host] to the scheduling pool of the compute service.
+ */
+ public fun addHost(host: Host)
+
+ /**
+ * Remove a [host] from the scheduling pool of the compute service.
+ */
+ public fun removeHost(host: Host)
+
+ /**
+ * Terminate the lifecycle of the compute service, stopping all running instances.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [ComputeService] implementation.
+ *
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param tracer The event tracer to use.
+ * @param allocationPolicy The allocation policy to use.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ tracer: EventTracer,
+ allocationPolicy: AllocationPolicy,
+ schedulingQuantum: Long = 300000,
+ ): ComputeService {
+ return ComputeServiceImpl(context, clock, tracer, allocationPolicy, schedulingQuantum)
+ }
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
index abd2fc95..193008a7 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeServiceEvent.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,22 +20,22 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service
+package org.opendc.compute.service
/**
- * An event that is emitted by the [VirtProvisioningService].
+ * An event that is emitted by the [ComputeService].
*/
-public sealed class VirtProvisioningEvent {
+public sealed class ComputeServiceEvent {
/**
* The service that has emitted the event.
*/
- public abstract val provisioner: VirtProvisioningService
+ public abstract val provisioner: ComputeService
/**
* An event emitted for writing metrics.
*/
public data class MetricsAvailable(
- override val provisioner: VirtProvisioningService,
+ override val provisioner: ComputeService,
public val totalHostCount: Int,
public val availableHostCount: Int,
public val totalVmCount: Int,
@@ -45,5 +43,5 @@ public sealed class VirtProvisioningEvent {
public val inactiveVmCount: Int,
public val waitingVmCount: Int,
public val failedVmCount: Int
- ) : VirtProvisioningEvent()
+ ) : ComputeServiceEvent()
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
new file mode 100644
index 00000000..2cd91144
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver
+
+import kotlinx.coroutines.flow.Flow
+import org.opendc.compute.api.Server
+import java.util.*
+
+/**
+ * Base interface for representing compute resources that host virtualized [Server] instances.
+ */
+public interface Host {
+ /**
+ * A unique identifier representing the host.
+ */
+ public val uid: UUID
+
+ /**
+ * The name of this host.
+ */
+ public val name: String
+
+ /**
+ * The machine model of the host.
+ */
+ public val model: HostModel
+
+ /**
+ * The state of the host.
+ */
+ public val state: HostState
+
+ /**
+ * The events emitted by the driver.
+ */
+ public val events: Flow<HostEvent>
+
+ /**
+ * Determine whether the specified [instance][server] can still fit on this host.
+ */
+ public fun canFit(server: Server): Boolean
+
+ /**
+ * Register the specified [instance][server] on the host.
+ *
+ * Once the method returns, the instance should be running if [start] is true or else the instance should be
+ * stopped.
+ */
+ public suspend fun spawn(server: Server, start: Boolean = true)
+
+ /**
+ * Determine whether the specified [instance][server] exists on the host.
+ */
+ public operator fun contains(server: Server): Boolean
+
+ /**
+ * Stat the server [instance][server] if it is currently not running on this host.
+ *
+ * @throws IllegalArgumentException if the server is not present on the host.
+ */
+ public suspend fun start(server: Server)
+
+ /**
+ * Stop the server [instance][server] if it is currently running on this host.
+ *
+ * @throws IllegalArgumentException if the server is not present on the host.
+ */
+ public suspend fun stop(server: Server)
+
+ /**
+ * Terminate the specified [instance][server] on this host and cleanup all resources associated with it.
+ */
+ public suspend fun terminate(server: Server)
+
+ /**
+ * Add a [HostListener] to this host.
+ */
+ public fun addListener(listener: HostListener)
+
+ /**
+ * Remove a [HostListener] from this host.
+ */
+ public fun removeListener(listener: HostListener)
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
index 9fb437de..97350679 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostEvent.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt
-
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.virt.driver.VirtDriver
+package org.opendc.compute.service.driver
/**
- * An event that is emitted by a [VirtDriver].
+ * An event that is emitted by a [Host].
*/
-public sealed class HypervisorEvent {
+public sealed class HostEvent {
/**
* The driver that emitted the event.
*/
- public abstract val driver: VirtDriver
+ public abstract val driver: Host
/**
* This event is emitted when the number of active servers on the server managed by this driver is updated.
@@ -42,10 +39,10 @@ public sealed class HypervisorEvent {
* @property availableMemory The available memory, in MB.
*/
public data class VmsUpdated(
- override val driver: VirtDriver,
+ override val driver: Host,
public val numberOfActiveServers: Int,
public val availableMemory: Long
- ) : HypervisorEvent()
+ ) : HostEvent()
/**
* This event is emitted when a slice is finished.
@@ -63,7 +60,7 @@ public sealed class HypervisorEvent {
* @property numberOfDeployedImages The number of images deployed on this hypervisor.
*/
public data class SliceFinished(
- override val driver: VirtDriver,
+ override val driver: Host,
public val requestedBurst: Long,
public val grantedBurst: Long,
public val overcommissionedBurst: Long,
@@ -71,6 +68,5 @@ public sealed class HypervisorEvent {
public val cpuUsage: Double,
public val cpuDemand: Double,
public val numberOfDeployedImages: Int,
- public val hostServer: Server
- ) : HypervisorEvent()
+ ) : HostEvent()
}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt
index e9212832..f076cae3 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/ServerEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostListener.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,22 +20,22 @@
* SOFTWARE.
*/
-package org.opendc.compute.core
+package org.opendc.compute.service.driver
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
/**
- * An event that is emitted by a [Server].
+ * Listener interface for events originating from a [Host].
*/
-public sealed class ServerEvent {
+public interface HostListener {
/**
- * The server that emitted the event.
+ * This method is invoked when the state of an [instance][server] on [host] changes.
*/
- public abstract val server: Server
+ public fun onStateChanged(host: Host, server: Server, newState: ServerState) {}
/**
- * This event is emitted when the state of [server] changes.
- *
- * @property server The server of which the state changed.
- * @property previousState The previous state of the server.
+ * This method is invoked when the state of a [Host] has changed.
*/
- public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent()
+ public fun onStateChanged(host: Host, newState: HostState) {}
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
new file mode 100644
index 00000000..5632a55e
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver
+
+/**
+ * Describes the static machine properties of the host.
+ *
+ * @property vcpuCount The number of logical processing cores available for this host.
+ * @property memorySize The amount of memory available for this host in MB.
+ */
+public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt
new file mode 100644
index 00000000..6d85ee2d
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.driver
+
+/**
+ * The state of a host.
+ */
+public enum class HostState {
+ /**
+ * The host is up.
+ */
+ UP,
+
+ /**
+ * The host is down.
+ */
+ DOWN
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt
index c1802e64..a7974062 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorAvailableEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorAvailableEvent.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service.events
+package org.opendc.compute.service.events
import org.opendc.trace.core.Event
import java.util.*
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt
index 1fea21ea..75bb09ed 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/HypervisorUnavailableEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/HypervisorUnavailableEvent.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service.events
+package org.opendc.compute.service.events
import org.opendc.trace.core.Event
import java.util.*
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt
index 662068dd..f59c74b7 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmScheduledEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmScheduledEvent.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service.events
+package org.opendc.compute.service.events
import org.opendc.trace.core.Event
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt
index 96103129..eaf0736b 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmStoppedEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmStoppedEvent.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service.events
+package org.opendc.compute.service.events
import org.opendc.trace.core.Event
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt
index f6b71e22..fa0a8a13 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionEvent.kt
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service.events
+package org.opendc.compute.service.events
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.image.Image
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Image
import org.opendc.trace.core.Event
/**
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt
index d0e5c102..52b91616 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/events/VmSubmissionInvalidEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/events/VmSubmissionInvalidEvent.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.virt.service.events
+package org.opendc.compute.service.events
import org.opendc.trace.core.Event
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
new file mode 100644
index 00000000..f84b7435
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.internal
+
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Image
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.api.ServerWatcher
+import java.util.*
+
+/**
+ * A [Server] implementation that is passed to clients but delegates its implementation to another class.
+ */
+internal class ClientServer(private val delegate: Server) : Server, ServerWatcher {
+ private val watchers = mutableListOf<ServerWatcher>()
+
+ override val uid: UUID = delegate.uid
+
+ override var name: String = delegate.name
+ private set
+
+ override var flavor: Flavor = delegate.flavor
+ private set
+
+ override var image: Image = delegate.image
+ private set
+
+ override var tags: Map<String, String> = delegate.tags.toMap()
+ private set
+
+ override var state: ServerState = delegate.state
+ private set
+
+ override fun watch(watcher: ServerWatcher) {
+ if (watchers.isEmpty()) {
+ delegate.watch(this)
+ }
+
+ watchers += watcher
+ }
+
+ override fun unwatch(watcher: ServerWatcher) {
+ watchers += watcher
+
+ if (watchers.isEmpty()) {
+ delegate.unwatch(this)
+ }
+ }
+
+ override suspend fun refresh() {
+ name = delegate.name
+ flavor = delegate.flavor
+ image = delegate.image
+ tags = delegate.tags
+ state = delegate.state
+ }
+
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ val watchers = watchers
+
+ for (watcher in watchers) {
+ watcher.onStateChanged(this, newState)
+ }
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
new file mode 100644
index 00000000..69d6bb59
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -0,0 +1,414 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.internal
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.suspendCancellableCoroutine
+import mu.KotlinLogging
+import org.opendc.compute.api.*
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostState
+import org.opendc.compute.service.events.*
+import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.trace.core.EventTracer
+import org.opendc.utils.TimerScheduler
+import org.opendc.utils.flow.EventFlow
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
+import kotlin.math.max
+
+/**
+ * Internal implementation of the OpenDC Compute service.
+ *
+ * @param context The [CoroutineContext] to use.
+ * @param clock The clock instance to keep track of time.
+ */
+public class ComputeServiceImpl(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val tracer: EventTracer,
+ private val allocationPolicy: AllocationPolicy,
+ private val schedulingQuantum: Long
+) : ComputeService, HostListener {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context)
+
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The [Random] instance used to generate unique identifiers for the objects.
+ */
+ private val random = Random(0)
+
+ /**
+ * A mapping from host to host view.
+ */
+ private val hostToView = mutableMapOf<Host, HostView>()
+
+ /**
+ * The available hypervisors.
+ */
+ private val availableHosts: MutableSet<HostView> = mutableSetOf()
+
+ /**
+ * The servers that should be launched by the service.
+ */
+ private val queue: Deque<LaunchRequest> = ArrayDeque()
+
+ /**
+ * The active servers in the system.
+ */
+ private val activeServers: MutableSet<Server> = mutableSetOf()
+
+ public var submittedVms: Int = 0
+ public var queuedVms: Int = 0
+ public var runningVms: Int = 0
+ public var finishedVms: Int = 0
+ public var unscheduledVms: Int = 0
+
+ private var maxCores = 0
+ private var maxMemory = 0L
+
+ /**
+ * The allocation logic to use.
+ */
+ private val allocationLogic = allocationPolicy()
+
+ override val events: Flow<ComputeServiceEvent>
+ get() = _events
+ private val _events = EventFlow<ComputeServiceEvent>()
+
+ /**
+ * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ */
+ private var scheduler: TimerScheduler<Unit> = TimerScheduler(scope, clock)
+
+ override val hosts: Set<Host>
+ get() = hostToView.keys
+
+ override val hostCount: Int
+ get() = hostToView.size
+
+ override fun newClient(): ComputeClient = object : ComputeClient {
+ private var isClosed: Boolean = false
+
+ override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server {
+ check(!isClosed) { "Client is closed" }
+ tracer.commit(VmSubmissionEvent(name, image, flavor))
+
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ ++submittedVms,
+ runningVms,
+ finishedVms,
+ ++queuedVms,
+ unscheduledVms
+ )
+ )
+
+ return suspendCancellableCoroutine { cont ->
+ val request = LaunchRequest(createServer(name, image, flavor), cont)
+ queue += request
+ requestCycle()
+ }
+ }
+
+ override fun close() {
+ isClosed = true
+ }
+
+ override fun toString(): String = "ComputeClient"
+ }
+
+ override fun addHost(host: Host) {
+ // Check if host is already known
+ if (host in hostToView) {
+ return
+ }
+
+ val hv = HostView(host)
+ maxCores = max(maxCores, host.model.cpuCount)
+ maxMemory = max(maxMemory, host.model.memorySize)
+ hostToView[host] = hv
+
+ if (host.state == HostState.UP) {
+ availableHosts += hv
+ }
+
+ host.addListener(this)
+ }
+
+ override fun removeHost(host: Host) {
+ host.removeListener(this)
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+
+ private fun createServer(
+ name: String,
+ image: Image,
+ flavor: Flavor
+ ): Server {
+ return ServerImpl(
+ uid = UUID(random.nextLong(), random.nextLong()),
+ name = name,
+ flavor = flavor,
+ image = image
+ )
+ }
+
+ private fun requestCycle() {
+ // Bail out in case we have already requested a new cycle.
+ if (scheduler.isTimerActive(Unit)) {
+ return
+ }
+
+ // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
+ // This is important because the slices of the VMs need to be aligned.
+ // We calculate here the delay until the next scheduling slot.
+ val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
+
+ scheduler.startSingleTimer(Unit, delay) {
+ schedule()
+ }
+ }
+
+ private fun schedule() {
+ while (queue.isNotEmpty()) {
+ val (server, cont) = queue.peekFirst()
+ val requiredMemory = server.flavor.memorySize
+ val selectedHv = allocationLogic.select(availableHosts, server)
+
+ if (selectedHv == null || !selectedHv.host.canFit(server)) {
+ logger.trace { "Server $server selected for scheduling but no capacity available for it." }
+
+ if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) {
+ tracer.commit(VmSubmissionInvalidEvent(server.name))
+
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ --queuedVms,
+ ++unscheduledVms
+ )
+ )
+
+ // Remove the incoming image
+ queue.poll()
+
+ logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]")
+ continue
+ } else {
+ break
+ }
+ }
+
+ logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" }
+ queue.poll()
+
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ selectedHv.numberOfActiveServers++
+ selectedHv.provisionedCores += server.flavor.cpuCount
+ selectedHv.availableMemory -= requiredMemory // XXX Temporary hack
+
+ scope.launch {
+ try {
+ cont.resume(ClientServer(server))
+ selectedHv.host.spawn(server)
+ activeServers += server
+
+ tracer.commit(VmScheduledEvent(server.name))
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
+ )
+ } catch (e: Throwable) {
+ logger.error("Failed to deploy VM", e)
+
+ selectedHv.numberOfActiveServers--
+ selectedHv.provisionedCores -= server.flavor.cpuCount
+ selectedHv.availableMemory += requiredMemory
+ }
+ }
+ }
+ }
+
+ override fun onStateChanged(host: Host, newState: HostState) {
+ when (newState) {
+ HostState.UP -> {
+ logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+
+ val hv = hostToView[host]
+ if (hv != null) {
+ // Corner case for when the hypervisor already exists
+ availableHosts += hv
+ }
+
+ tracer.commit(HypervisorAvailableEvent(host.uid))
+
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
+
+ // Re-schedule on the new machine
+ if (queue.isNotEmpty()) {
+ requestCycle()
+ }
+ }
+ HostState.DOWN -> {
+ logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
+
+ val hv = hostToView[host] ?: return
+ availableHosts -= hv
+
+ tracer.commit(HypervisorUnavailableEvent(hv.uid))
+
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ runningVms,
+ finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
+
+ if (queue.isNotEmpty()) {
+ requestCycle()
+ }
+ }
+ }
+ }
+
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ val serverImpl = server as ServerImpl
+ serverImpl.state = newState
+ serverImpl.watchers.forEach { it.onStateChanged(server, newState) }
+
+ if (newState == ServerState.SHUTOFF) {
+ logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
+
+ tracer.commit(VmStoppedEvent(server.name))
+
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ --runningVms,
+ ++finishedVms,
+ queuedVms,
+ unscheduledVms
+ )
+ )
+
+ activeServers -= server
+ val hv = hostToView[host]
+ if (hv != null) {
+ hv.provisionedCores -= server.flavor.cpuCount
+ hv.numberOfActiveServers--
+ hv.availableMemory += server.flavor.memorySize
+ } else {
+ logger.error { "Unknown host $host" }
+ }
+
+ // Try to reschedule if needed
+ if (queue.isNotEmpty()) {
+ requestCycle()
+ }
+ }
+ }
+
+ public data class LaunchRequest(val server: Server, val cont: Continuation<Server>)
+
+ private class ServerImpl(
+ override val uid: UUID,
+ override val name: String,
+ override val flavor: Flavor,
+ override val image: Image
+ ) : Server {
+ val watchers = mutableListOf<ServerWatcher>()
+
+ override fun watch(watcher: ServerWatcher) {
+ watchers += watcher
+ }
+
+ override fun unwatch(watcher: ServerWatcher) {
+ watchers -= watcher
+ }
+
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override val tags: Map<String, String> = emptyMap()
+
+ override var state: ServerState = ServerState.BUILD
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
index 01f86a1b..1bdfdf1a 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/image/EmptyImage.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.image
+package org.opendc.compute.service.internal
-import org.opendc.core.resource.TagContainer
+import org.opendc.compute.service.driver.Host
import java.util.UUID
-/**
- * An empty boot disk [Image] that exits immediately on start.
- */
-public object EmptyImage : Image {
- override val uid: UUID = UUID.randomUUID()
- override val name: String = "empty"
- override val tags: TagContainer = emptyMap()
+public class HostView(public val host: Host) {
+ public val uid: UUID
+ get() = host.uid
+
+ public var numberOfActiveServers: Int = 0
+ public var availableMemory: Long = host.model.memorySize
+ public var provisionedCores: Int = 0
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt
index 2018b9f2..5ee4c70f 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AllocationPolicy.kt
@@ -20,11 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
/**
* A policy for selecting the [Node] an image should be deployed to,
@@ -38,9 +37,9 @@ public interface AllocationPolicy {
* Select the node on which the server should be scheduled.
*/
public fun select(
- hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
- ): HypervisorView?
+ hypervisors: Set<HostView>,
+ server: Server
+ ): HostView?
}
/**
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt
index 38a07b2b..ad422415 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableCoreMemoryAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableCoreMemoryAllocationPolicy.kt
@@ -20,9 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.simulator.HypervisorView
+import org.opendc.compute.service.internal.HostView
/**
* An [AllocationPolicy] that selects the machine with the highest/lowest amount of memory per core.
@@ -31,8 +31,8 @@ import org.opendc.compute.simulator.HypervisorView
*/
public class AvailableCoreMemoryAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HypervisorView> =
- compareBy<HypervisorView> { -it.availableMemory / it.server.flavor.cpuCount }
+ override val comparator: Comparator<HostView> =
+ compareBy<HostView> { -it.availableMemory / it.host.model.cpuCount }
.run { if (reversed) reversed() else this }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt
index e87abd7b..6712b8a2 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AvailableMemoryAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/AvailableMemoryAllocationPolicy.kt
@@ -20,9 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.simulator.HypervisorView
+import org.opendc.compute.service.internal.HostView
/**
* Allocation policy that selects the node with the most available memory.
@@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView
*/
public class AvailableMemoryAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { -it.availableMemory }
+ override val comparator: Comparator<HostView> = compareBy<HostView> { -it.availableMemory }
.run { if (reversed) reversed() else this }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt
index 4470eab9..265d514d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ComparableAllocationPolicyLogic.kt
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
/**
* The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node.
@@ -32,18 +32,18 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic {
/**
* The comparator to use.
*/
- public val comparator: Comparator<HypervisorView>
+ public val comparator: Comparator<HostView>
override fun select(
- hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
- ): HypervisorView? {
+ hypervisors: Set<HostView>,
+ server: Server
+ ): HostView? {
return hypervisors.asSequence()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.flavor.memorySize)
- val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsMemory = hv.availableMemory >= (server.flavor.memorySize)
+ val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
- .minWithOrNull(comparator.thenBy { it.server.uid })
+ .minWithOrNull(comparator.thenBy { it.host.uid })
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt
index 5e2b895c..29eba782 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/NumberOfActiveServersAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/NumberOfActiveServersAllocationPolicy.kt
@@ -20,9 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.simulator.HypervisorView
+import org.opendc.compute.service.internal.HostView
/**
* Allocation policy that selects the node with the least amount of active servers.
@@ -31,7 +31,7 @@ import org.opendc.compute.simulator.HypervisorView
*/
public class NumberOfActiveServersAllocationPolicy(public val reversed: Boolean = false) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HypervisorView> = compareBy<HypervisorView> { it.numberOfActiveServers }
+ override val comparator: Comparator<HostView> = compareBy<HostView> { it.numberOfActiveServers }
.run { if (reversed) reversed() else this }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt
index 4344d979..4c196953 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ProvisionedCoresAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ProvisionedCoresAllocationPolicy.kt
@@ -20,9 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.simulator.HypervisorView
+import org.opendc.compute.service.internal.HostView
/**
* An [AllocationPolicy] that takes into account the number of vCPUs that have been provisioned on this machine
@@ -33,8 +33,8 @@ import org.opendc.compute.simulator.HypervisorView
*/
public class ProvisionedCoresAllocationPolicy(private val reversed: Boolean = false) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : ComparableAllocationPolicyLogic {
- override val comparator: Comparator<HypervisorView> =
- compareBy<HypervisorView> { it.provisionedCores / it.server.flavor.cpuCount }
+ override val comparator: Comparator<HostView> =
+ compareBy<HostView> { it.provisionedCores / it.host.model.cpuCount }
.run { if (reversed) reversed() else this }
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
index ac34f410..3facb182 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.compute.simulator.allocation
+package org.opendc.compute.service.scheduler
-import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
import kotlin.random.Random
/**
@@ -33,13 +33,13 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al
@OptIn(ExperimentalStdlibApi::class)
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
- hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
- ): HypervisorView? {
+ hypervisors: Set<HostView>,
+ server: Server
+ ): HostView? {
return hypervisors.asIterable()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long)
- val fitsCpu = hv.server.flavor.cpuCount >= image.flavor.cpuCount
+ val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long)
+ val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
.randomOrNull(random)
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt
index 5312f4da..ed1dc662 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/ReplayAllocationPolicy.kt
@@ -23,8 +23,9 @@
package org.opendc.compute.simulator.allocation
import mu.KotlinLogging
-import org.opendc.compute.simulator.HypervisorView
-import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+import org.opendc.compute.service.scheduler.AllocationPolicy
private val logger = KotlinLogging.logger {}
@@ -37,15 +38,15 @@ private val logger = KotlinLogging.logger {}
public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String>) : AllocationPolicy {
override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic {
override fun select(
- hypervisors: Set<HypervisorView>,
- image: SimVirtProvisioningService.ImageView
- ): HypervisorView? {
- val clusterName = vmPlacements[image.name]
- ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}")
- val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) }
+ hypervisors: Set<HostView>,
+ server: Server
+ ): HostView? {
+ val clusterName = vmPlacements[server.name]
+ ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}")
+ val machinesInCluster = hypervisors.filter { it.host.name.contains(clusterName) }
if (machinesInCluster.isEmpty()) {
- logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." }
+ logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." }
return hypervisors.maxByOrNull { it.availableMemory }
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index f52d0f97..d7d5f002 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -30,7 +30,8 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-compute:opendc-compute-service"))
+ api(project(":opendc-metal"))
api(project(":opendc-simulator:opendc-simulator-compute"))
api(project(":opendc-simulator:opendc-simulator-failures"))
implementation(project(":opendc-utils"))
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt
deleted file mode 100644
index 153a86b3..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/ComputeSimExecutionContext.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import org.opendc.compute.core.Server
-import org.opendc.simulator.compute.SimExecutionContext
-
-/**
- * Extended [SimExecutionContext] in which workloads within the OpenDC Compute module run.
- */
-public interface ComputeSimExecutionContext : SimExecutionContext {
- /**
- * The server on which the image runs.
- */
- public val server: Server
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
deleted file mode 100644
index 1a79523e..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.virt.driver.VirtDriver
-import java.util.UUID
-
-public class HypervisorView(
- public val uid: UUID,
- public var server: Server,
- public var numberOfActiveServers: Int,
- public var availableMemory: Long,
- public var provisionedCores: Int
-) {
- public lateinit var driver: VirtDriver
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
index 1e459e6f..2405a8f9 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
@@ -24,31 +24,23 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.image.EmptyImage
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.metal.NodeEvent
-import org.opendc.compute.core.metal.NodeState
-import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Image
import org.opendc.compute.simulator.power.api.CpuPowerModel
import org.opendc.compute.simulator.power.api.Powerable
import org.opendc.compute.simulator.power.models.ConstantPowerModel
-import org.opendc.core.services.ServiceRegistry
+import org.opendc.metal.Node
+import org.opendc.metal.NodeEvent
+import org.opendc.metal.NodeState
+import org.opendc.metal.driver.BareMetalDriver
import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.SimExecutionContext
import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.workload.SimResourceCommand
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
import org.opendc.utils.flow.StateFlow
import java.time.Clock
import java.util.UUID
-import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
@@ -88,7 +80,7 @@ public class SimBareMetalDriver(
* The machine state.
*/
private val nodeState =
- StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events))
+ StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events))
/**
* The [SimBareMetalMachine] we use to run the workload.
@@ -103,20 +95,10 @@ public class SimBareMetalDriver(
override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this)
/**
- * The internal random instance.
- */
- private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits)
-
- /**
* The [Job] that runs the simulated workload.
*/
private var job: Job? = null
- /**
- * The event stream to publish to for the server.
- */
- private var serverEvents: EventFlow<ServerEvent>? = null
-
override suspend fun init(): Node {
return nodeState.value
}
@@ -127,51 +109,13 @@ public class SimBareMetalDriver(
return node
}
- val events = EventFlow<ServerEvent>()
- serverEvents = events
- val server = Server(
- UUID(random.nextLong(), random.nextLong()),
- node.name,
- emptyMap(),
- flavor,
- node.image,
- ServerState.BUILD,
- ServiceRegistry().put(BareMetalDriver, this@SimBareMetalDriver),
- events
- )
-
- val delegate = (node.image as SimWorkloadImage).workload
- // Wrap the workload to pass in a ComputeSimExecutionContext
- val workload = object : SimWorkload {
- lateinit var wrappedCtx: ComputeSimExecutionContext
-
- override fun onStart(ctx: SimExecutionContext) {
- wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
- override val server: Server
- get() = nodeState.value.server!!
-
- override fun toString(): String = "WrappedSimExecutionContext"
- }
-
- delegate.onStart(wrappedCtx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return delegate.onStart(wrappedCtx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return delegate.onNext(wrappedCtx, cpu, remainingWork)
- }
-
- override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)"
- }
+ val workload = node.image.tags["workload"] as SimWorkload
job = coroutineScope.launch {
delay(1) // TODO Introduce boot time
initMachine()
try {
- machine.run(workload)
+ machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node))
exitMachine(null)
} catch (_: CancellationException) {
// Ignored
@@ -180,31 +124,21 @@ public class SimBareMetalDriver(
}
}
- setNode(node.copy(state = NodeState.BOOT, server = server))
+ setNode(node.copy(state = NodeState.BOOT))
return nodeState.value
}
private fun initMachine() {
- val server = nodeState.value.server?.copy(state = ServerState.ACTIVE)
- setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE))
}
private fun exitMachine(cause: Throwable?) {
- val newServerState =
- if (cause == null)
- ServerState.SHUTOFF
- else
- ServerState.ERROR
val newNodeState =
if (cause == null)
- nodeState.value.state
+ NodeState.SHUTOFF
else
NodeState.ERROR
- val server = nodeState.value.server?.copy(state = newServerState)
- setNode(nodeState.value.copy(state = newNodeState, server = server))
-
- serverEvents?.close()
- serverEvents = null
+ setNode(nodeState.value.copy(state = newNodeState))
}
override suspend fun stop(): Node {
@@ -214,7 +148,7 @@ public class SimBareMetalDriver(
}
job?.cancelAndJoin()
- setNode(node.copy(state = NodeState.SHUTOFF, server = null))
+ setNode(node.copy(state = NodeState.SHUTOFF))
return node
}
@@ -236,13 +170,6 @@ public class SimBareMetalDriver(
events.emit(NodeEvent.StateChanged(value, field.state))
}
- val oldServer = field.server
- val newServer = value.server
-
- if (oldServer != null && newServer != null && oldServer.state != newServer.state) {
- serverEvents?.emit(ServerEvent.StateChanged(newServer, oldServer.state))
- }
-
nodeState.value = value
}
@@ -250,13 +177,11 @@ public class SimBareMetalDriver(
get() = coroutineScope
override suspend fun fail() {
- val server = nodeState.value.server?.copy(state = ServerState.ERROR)
- setNode(nodeState.value.copy(state = NodeState.ERROR, server = server))
+ setNode(nodeState.value.copy(state = NodeState.ERROR))
}
override suspend fun recover() {
- val server = nodeState.value.server?.copy(state = ServerState.ACTIVE)
- setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE))
}
override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
new file mode 100644
index 00000000..fd547d3d
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -0,0 +1,303 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import mu.KotlinLogging
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.*
+import org.opendc.metal.Node
+import org.opendc.simulator.compute.*
+import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.utils.flow.EventFlow
+import java.util.*
+import kotlin.coroutines.resume
+
+/**
+ * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
+ */
+public class SimHost(
+ public val node: Node,
+ private val coroutineScope: CoroutineScope,
+ hypervisor: SimHypervisorProvider
+) : Host, SimWorkload {
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The execution context in which the [Host] runs.
+ */
+ private lateinit var ctx: SimExecutionContext
+
+ override val events: Flow<HostEvent>
+ get() = _events
+ internal val _events = EventFlow<HostEvent>()
+
+ /**
+ * The event listeners registered with this host.
+ */
+ private val listeners = mutableListOf<HostListener>()
+
+ /**
+ * Current total memory use of the images on this hypervisor.
+ */
+ private var availableMemory: Long = 0
+
+ /**
+ * The hypervisor to run multiple workloads.
+ */
+ private val hypervisor = hypervisor.create(
+ object : SimHypervisor.Listener {
+ override fun onSliceFinish(
+ hypervisor: SimHypervisor,
+ requestedWork: Long,
+ grantedWork: Long,
+ overcommittedWork: Long,
+ interferedWork: Long,
+ cpuUsage: Double,
+ cpuDemand: Double
+ ) {
+ _events.emit(
+ HostEvent.SliceFinished(
+ this@SimHost,
+ requestedWork,
+ grantedWork,
+ overcommittedWork,
+ interferedWork,
+ cpuUsage,
+ cpuDemand,
+ guests.size
+ )
+ )
+ }
+ }
+ )
+
+ /**
+ * The virtual machines running on the hypervisor.
+ */
+ private val guests = HashMap<Server, Guest>()
+
+ override val uid: UUID
+ get() = node.uid
+
+ override val name: String
+ get() = node.name
+
+ override val model: HostModel
+ get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize)
+
+ override val state: HostState
+ get() = _state
+ private var _state: HostState = HostState.UP
+ set(value) {
+ listeners.forEach { it.onStateChanged(this, value) }
+ field = value
+ }
+
+ override fun canFit(server: Server): Boolean {
+ val sufficientMemory = availableMemory > server.flavor.memorySize
+ val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount
+ val canFit = hypervisor.canFit(server.flavor.toMachineModel())
+
+ return sufficientMemory && enoughCpus && canFit
+ }
+
+ override suspend fun spawn(server: Server, start: Boolean) {
+ // Return if the server already exists on this host
+ if (server in this) {
+ return
+ }
+
+ require(canFit(server)) { "Server does not fit" }
+ val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
+ guests[server] = guest
+
+ if (start) {
+ guest.start()
+ }
+
+ _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ }
+
+ override fun contains(server: Server): Boolean {
+ return server in guests
+ }
+
+ override suspend fun start(server: Server) {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ guest.start()
+ }
+
+ override suspend fun stop(server: Server) {
+ val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" }
+ guest.stop()
+ }
+
+ override suspend fun terminate(server: Server) {
+ val guest = guests.remove(server) ?: return
+ guest.terminate()
+ }
+
+ override fun addListener(listener: HostListener) {
+ listeners.add(listener)
+ }
+
+ override fun removeListener(listener: HostListener) {
+ listeners.remove(listener)
+ }
+
+ /**
+ * Convert flavor to machine model.
+ */
+ private fun Flavor.toMachineModel(): SimMachineModel {
+ val originalCpu = ctx.machine.cpus[0]
+ val processingNode = originalCpu.node.copy(coreCount = cpuCount)
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return SimMachineModel(processingUnits, memoryUnits)
+ }
+
+ private fun onGuestStart(vm: Guest) {
+ guests.forEach { _, guest ->
+ if (guest.state == ServerState.ACTIVE) {
+ vm.performanceInterferenceModel?.onStart(vm.server.image.name)
+ }
+ }
+
+ listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
+ }
+
+ private fun onGuestStop(vm: Guest) {
+ guests.forEach { _, guest ->
+ if (guest.state == ServerState.ACTIVE) {
+ vm.performanceInterferenceModel?.onStop(vm.server.image.name)
+ }
+ }
+
+ listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
+
+ _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ }
+
+ /**
+ * A virtual machine instance that the driver manages.
+ */
+ private inner class Guest(val server: Server, val machine: SimMachine) {
+ val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+
+ var state: ServerState = ServerState.SHUTOFF
+
+ suspend fun start() {
+ when (state) {
+ ServerState.SHUTOFF -> {
+ logger.info { "User requested to start server ${server.uid}" }
+ launch()
+ }
+ ServerState.ACTIVE -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ suspend fun stop() {
+ when (state) {
+ ServerState.ACTIVE, ServerState.ERROR -> {
+ val job = job ?: throw IllegalStateException("Server should be active")
+ job.cancel()
+ job.join()
+ }
+ ServerState.SHUTOFF -> return
+ else -> assert(false) { "Invalid state transition" }
+ }
+ }
+
+ suspend fun terminate() {
+ stop()
+ }
+
+ private var job: Job? = null
+
+ private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
+ assert(job == null) { "Concurrent job running" }
+ val workload = server.image.tags["workload"] as SimWorkload
+
+ val job = coroutineScope.launch {
+ delay(1) // TODO Introduce boot time
+ init()
+ cont.resume(Unit)
+ try {
+ machine.run(workload, mapOf("driver" to this@SimHost, "server" to server))
+ exit(null)
+ } catch (cause: Throwable) {
+ exit(cause)
+ } finally {
+ machine.close()
+ }
+ }
+ this.job = job
+ job.invokeOnCompletion {
+ this.job = null
+ }
+ }
+
+ private fun init() {
+ state = ServerState.ACTIVE
+ onGuestStart(this)
+ }
+
+ private fun exit(cause: Throwable?) {
+ state =
+ if (cause == null)
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+
+ availableMemory += server.flavor.memorySize
+ onGuestStop(this)
+ }
+ }
+
+ override fun onStart(ctx: SimExecutionContext) {
+ this.ctx = ctx
+ this.availableMemory = ctx.machine.memory.map { it.size }.sum()
+ this.hypervisor.onStart(ctx)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return hypervisor.onStart(ctx, cpu)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return hypervisor.onNext(ctx, cpu, remainingWork)
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt
new file mode 100644
index 00000000..bb03777b
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.simulator
+
+import kotlinx.coroutines.*
+import org.opendc.compute.api.Image
+import org.opendc.compute.service.driver.Host
+import org.opendc.metal.Node
+import org.opendc.metal.service.ProvisioningService
+import org.opendc.simulator.compute.SimHypervisorProvider
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService].
+ *
+ * @param context The [CoroutineContext] to use.
+ * @param metal The [ProvisioningService] to use.
+ * @param hypervisor The type of hypervisor to use.
+ */
+public class SimHostProvisioner(
+ private val context: CoroutineContext,
+ private val metal: ProvisioningService,
+ private val hypervisor: SimHypervisorProvider
+) : AutoCloseable {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context)
+
+ /**
+ * Provision all machines with a host.
+ */
+ public suspend fun provisionAll(): List<Host> = coroutineScope {
+ metal.nodes().map { node -> async { provision(node) } }.awaitAll()
+ }
+
+ /**
+ * Provision the specified [Node].
+ */
+ public suspend fun provision(node: Node): Host = coroutineScope {
+ val host = SimHost(node, scope, hypervisor)
+ metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host)))
+ host
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
deleted file mode 100644
index d7a8a8b2..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.launch
-import org.opendc.compute.core.*
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.core.services.ServiceRegistry
-import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
-import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.utils.flow.EventFlow
-import java.util.*
-
-/**
- * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor].
- */
-public class SimVirtDriver(private val coroutineScope: CoroutineScope, hypervisor: SimHypervisorProvider) : VirtDriver, SimWorkload {
- /**
- * The execution context in which the [VirtDriver] runs.
- */
- private lateinit var ctx: ComputeSimExecutionContext
-
- /**
- * The server hosting this hypervisor.
- */
- public val server: Server
- get() = ctx.server
-
- /**
- * The [EventFlow] to emit the events.
- */
- internal val eventFlow = EventFlow<HypervisorEvent>()
-
- override val events: Flow<HypervisorEvent> = eventFlow
-
- /**
- * Current total memory use of the images on this hypervisor.
- */
- private var availableMemory: Long = 0
-
- /**
- * The hypervisor to run multiple workloads.
- */
- private val hypervisor = hypervisor.create(
- object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- eventFlow.emit(
- HypervisorEvent.SliceFinished(
- this@SimVirtDriver,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- vms.size,
- ctx.server
- )
- )
- }
- }
- )
-
- /**
- * The virtual machines running on the hypervisor.
- */
- private val vms = HashSet<VirtualMachine>()
-
- override fun canFit(flavor: Flavor): Boolean {
- val sufficientMemory = availableMemory > flavor.memorySize
- val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount
- val canFit = hypervisor.canFit(flavor.toMachineModel())
-
- return sufficientMemory && enoughCpus && canFit
- }
-
- override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server {
- val requiredMemory = flavor.memorySize
- if (availableMemory - requiredMemory < 0) {
- throw InsufficientMemoryOnServerException()
- }
- require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" }
-
- val events = EventFlow<ServerEvent>()
- val server = Server(
- UUID.randomUUID(),
- name,
- emptyMap(),
- flavor,
- image,
- ServerState.BUILD,
- ServiceRegistry(),
- events
- )
- availableMemory -= requiredMemory
-
- val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel()))
- vms.add(vm)
- vmStarted(vm)
- eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
- return server
- }
-
- /**
- * Convert flavor to machine model.
- */
- private fun Flavor.toMachineModel(): SimMachineModel {
- val originalCpu = ctx.machine.cpus[0]
- val processingNode = originalCpu.node.copy(coreCount = cpuCount)
- val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
-
- return SimMachineModel(processingUnits, memoryUnits)
- }
-
- private fun vmStarted(vm: VirtualMachine) {
- vms.forEach { it ->
- vm.performanceInterferenceModel?.onStart(it.server.image.name)
- }
- }
-
- private fun vmStopped(vm: VirtualMachine) {
- vms.forEach { it ->
- vm.performanceInterferenceModel?.onStop(it.server.image.name)
- }
- }
-
- /**
- * A virtual machine instance that the driver manages.
- */
- private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
-
- val job = coroutineScope.launch {
- val delegate = (server.image as SimWorkloadImage).workload
- // Wrap the workload to pass in a ComputeSimExecutionContext
- val workload = object : SimWorkload {
- lateinit var wrappedCtx: ComputeSimExecutionContext
-
- override fun onStart(ctx: SimExecutionContext) {
- wrappedCtx = object : ComputeSimExecutionContext, SimExecutionContext by ctx {
- override val server: Server
- get() = server
-
- override fun toString(): String = "WrappedSimExecutionContext"
- }
-
- delegate.onStart(wrappedCtx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return delegate.onStart(wrappedCtx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return delegate.onNext(wrappedCtx, cpu, remainingWork)
- }
-
- override fun toString(): String = "SimWorkloadWrapper(delegate=$delegate)"
- }
-
- delay(1) // TODO Introduce boot time
- init()
- try {
- machine.run(workload)
- exit(null)
- } catch (cause: Throwable) {
- exit(cause)
- } finally {
- machine.close()
- }
- }
-
- var server: Server = server
- set(value) {
- if (field.state != value.state) {
- events.emit(ServerEvent.StateChanged(value, field.state))
- }
-
- field = value
- }
-
- private fun init() {
- server = server.copy(state = ServerState.ACTIVE)
- }
-
- private fun exit(cause: Throwable?) {
- val serverState =
- if (cause == null)
- ServerState.SHUTOFF
- else
- ServerState.ERROR
- server = server.copy(state = serverState)
- availableMemory += server.flavor.memorySize
- vms.remove(this)
- vmStopped(this)
- eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory))
- events.close()
- }
- }
-
- override fun onStart(ctx: SimExecutionContext) {
- this.ctx = ctx as ComputeSimExecutionContext
- this.availableMemory = ctx.machine.memory.map { it.size }.sum()
- this.hypervisor.onStart(ctx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return hypervisor.onStart(ctx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return hypervisor.onNext(ctx, cpu, remainingWork)
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
deleted file mode 100644
index defea888..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
-import org.opendc.compute.core.virt.service.VirtProvisioningService
-import org.opendc.compute.core.virt.service.events.*
-import org.opendc.compute.simulator.allocation.AllocationPolicy
-import org.opendc.simulator.compute.SimHypervisorProvider
-import org.opendc.trace.core.EventTracer
-import org.opendc.utils.TimerScheduler
-import org.opendc.utils.flow.EventFlow
-import java.time.Clock
-import java.util.*
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
-
-@OptIn(ExperimentalCoroutinesApi::class)
-public class SimVirtProvisioningService(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
- private val provisioningService: ProvisioningService,
- public val allocationPolicy: AllocationPolicy,
- private val tracer: EventTracer,
- private val hypervisor: SimHypervisorProvider,
- private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds
-) : VirtProvisioningService {
- /**
- * The logger instance to use.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The hypervisors that have been launched by the service.
- */
- private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf()
-
- /**
- * The available hypervisors.
- */
- private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf()
-
- /**
- * The incoming images to be processed by the provisioner.
- */
- private val incomingImages: Deque<ImageView> = ArrayDeque()
-
- /**
- * The active images in the system.
- */
- private val activeImages: MutableSet<ImageView> = mutableSetOf()
-
- public var submittedVms: Int = 0
- public var queuedVms: Int = 0
- public var runningVms: Int = 0
- public var finishedVms: Int = 0
- public var unscheduledVms: Int = 0
-
- private var maxCores = 0
- private var maxMemory = 0L
-
- /**
- * The allocation logic to use.
- */
- private val allocationLogic = allocationPolicy()
-
- /**
- * The [EventFlow] to emit the events.
- */
- internal val eventFlow = EventFlow<VirtProvisioningEvent>()
-
- override val events: Flow<VirtProvisioningEvent> = eventFlow
-
- /**
- * The [TimerScheduler] to use for scheduling the scheduler cycles.
- */
- private var scheduler: TimerScheduler<Unit> = TimerScheduler(coroutineScope, clock)
-
- init {
- coroutineScope.launch {
- val provisionedNodes = provisioningService.nodes()
- provisionedNodes.forEach { node ->
- val workload = SimVirtDriver(coroutineScope, hypervisor)
- val hypervisorImage = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), workload)
- launch {
- var init = false
- val deployedNode = provisioningService.deploy(node, hypervisorImage)
- val server = deployedNode.server!!
- server.events.onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- if (!init) {
- init = true
- }
- stateChanged(event.server)
- }
- }
- }.launchIn(this)
-
- delay(1)
- onHypervisorAvailable(server, workload)
- }
- }
- }
- }
-
- override suspend fun drivers(): Set<VirtDriver> {
- return availableHypervisors.map { it.driver }.toSet()
- }
-
- override val hostCount: Int = hypervisors.size
-
- override suspend fun deploy(
- name: String,
- image: Image,
- flavor: Flavor
- ): Server {
- tracer.commit(VmSubmissionEvent(name, image, flavor))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- ++submittedVms,
- runningVms,
- finishedVms,
- ++queuedVms,
- unscheduledVms
- )
- )
-
- return suspendCancellableCoroutine<Server> { cont ->
- val vmInstance = ImageView(name, image, flavor, cont)
- incomingImages += vmInstance
- requestCycle()
- }
- }
-
- override suspend fun terminate() {
- val provisionedNodes = provisioningService.nodes()
- provisionedNodes.forEach { node -> provisioningService.stop(node) }
- }
-
- private fun requestCycle() {
- // Bail out in case we have already requested a new cycle.
- if (scheduler.isTimerActive(Unit)) {
- return
- }
-
- // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
- // This is important because the slices of the VMs need to be aligned.
- // We calculate here the delay until the next scheduling slot.
- val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
-
- scheduler.startSingleTimer(Unit, delay) {
- coroutineScope.launch { schedule() }
- }
- }
-
- private suspend fun schedule() {
- while (incomingImages.isNotEmpty()) {
- val imageInstance = incomingImages.peekFirst()
- val requiredMemory = imageInstance.flavor.memorySize
- val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
-
- if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) {
- logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." }
-
- if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
- tracer.commit(VmSubmissionInvalidEvent(imageInstance.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- --queuedVms,
- ++unscheduledVms
- )
- )
-
- // Remove the incoming image
- incomingImages.poll()
-
- logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]")
- continue
- } else {
- break
- }
- }
-
- try {
- logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}" }
- incomingImages.poll()
-
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- selectedHv.numberOfActiveServers++
- selectedHv.provisionedCores += imageInstance.flavor.cpuCount
- selectedHv.availableMemory -= requiredMemory // XXX Temporary hack
-
- val server = selectedHv.driver.spawn(
- imageInstance.name,
- imageInstance.image,
- imageInstance.flavor
- )
- imageInstance.server = server
- imageInstance.continuation.resume(server)
-
- tracer.commit(VmScheduledEvent(imageInstance.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
- )
- )
- activeImages += imageInstance
-
- server.events
- .onEach { event ->
- when (event) {
- is ServerEvent.StateChanged -> {
- if (event.server.state == ServerState.SHUTOFF) {
- logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." }
-
- tracer.commit(VmStoppedEvent(event.server.name))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- --runningVms,
- ++finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- activeImages -= imageInstance
- selectedHv.provisionedCores -= server.flavor.cpuCount
-
- // Try to reschedule if needed
- if (incomingImages.isNotEmpty()) {
- requestCycle()
- }
- }
- }
- }
- }
- .launchIn(coroutineScope)
- } catch (e: InsufficientMemoryOnServerException) {
- logger.error("Failed to deploy VM", e)
-
- selectedHv.numberOfActiveServers--
- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount
- selectedHv.availableMemory += requiredMemory
- } catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
- }
- }
- }
-
- private fun stateChanged(server: Server) {
- when (server.state) {
- ServerState.ACTIVE -> {
- logger.debug { "[${clock.millis()}] Server ${server.uid} available: ${server.state}" }
-
- if (server in hypervisors) {
- // Corner case for when the hypervisor already exists
- availableHypervisors += hypervisors.getValue(server)
- } else {
- val hv = HypervisorView(
- server.uid,
- server,
- 0,
- server.flavor.memorySize,
- 0
- )
- maxCores = max(maxCores, server.flavor.cpuCount)
- maxMemory = max(maxMemory, server.flavor.memorySize)
- hypervisors[server] = hv
- }
-
- tracer.commit(HypervisorAvailableEvent(server.uid))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- // Re-schedule on the new machine
- if (incomingImages.isNotEmpty()) {
- requestCycle()
- }
- }
- ServerState.SHUTOFF, ServerState.ERROR -> {
- logger.debug { "[${clock.millis()}] Server ${server.uid} unavailable: ${server.state}" }
- val hv = hypervisors[server] ?: return
- availableHypervisors -= hv
-
- tracer.commit(HypervisorUnavailableEvent(hv.uid))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- if (incomingImages.isNotEmpty()) {
- requestCycle()
- }
- }
- else -> throw IllegalStateException()
- }
- }
-
- private fun onHypervisorAvailable(server: Server, hypervisor: SimVirtDriver) {
- val hv = hypervisors[server] ?: return
- hv.driver = hypervisor
- availableHypervisors += hv
-
- tracer.commit(HypervisorAvailableEvent(hv.uid))
-
- eventFlow.emit(
- VirtProvisioningEvent.MetricsAvailable(
- this@SimVirtProvisioningService,
- hypervisors.size,
- availableHypervisors.size,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
- )
- )
-
- hv.driver.events
- .onEach { event ->
- if (event is HypervisorEvent.VmsUpdated) {
- hv.numberOfActiveServers = event.numberOfActiveServers
- hv.availableMemory = event.availableMemory
- }
- }.launchIn(coroutineScope)
-
- requestCycle()
- }
-
- public data class ImageView(
- public val name: String,
- public val image: Image,
- public val flavor: Flavor,
- public val continuation: Continuation<Server>,
- public var server: Server? = null
- )
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt
deleted file mode 100644
index b48de1d5..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadImage.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import org.opendc.compute.core.image.Image
-import org.opendc.core.resource.TagContainer
-import org.opendc.simulator.compute.workload.SimWorkload
-import java.util.*
-
-/**
- * An application [Image] that runs a [SimWorkload].
- *
- * @property uid The unique identifier of this image.
- * @property name The name of this image.
- * @property tags The tags attached to the image.
- * @property workload The workload to run for this image.
- */
-public data class SimWorkloadImage(
- public override val uid: UUID,
- public override val name: String,
- public override val tags: TagContainer,
- public val workload: SimWorkload
-) : Image
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt
index ee9e130b..0141bc8c 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt
@@ -2,7 +2,7 @@ package org.opendc.compute.simulator.power.api
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
-import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.metal.driver.BareMetalDriver
public interface CpuPowerModel {
/**
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt
index 938e5607..b0c3fa4c 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/models/ZeroIdlePowerDecorator.kt
@@ -5,10 +5,10 @@ import org.opendc.compute.simulator.power.api.CpuPowerModel
/**
* A decorator for ignoring the idle power when computing energy consumption of components.
*
- * @param cpuModelWrappee The wrappe of a [CpuPowerModel].
+ * @param delegate The [CpuPowerModel] to delegate to.
*/
-public class ZeroIdlePowerDecorator(private val cpuModelWrappee: CpuPowerModel) : CpuPowerModel {
+public class ZeroIdlePowerDecorator(private val delegate: CpuPowerModel) : CpuPowerModel {
override fun computeCpuPower(cpuUtil: Double): Double {
- return if (cpuUtil == 0.0) 0.0 else cpuModelWrappee.computeCpuPower(cpuUtil)
+ return if (cpuUtil == 0.0) 0.0 else delegate.computeCpuPower(cpuUtil)
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
index fb8a5f47..0d90376e 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
@@ -30,8 +30,10 @@ import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
+import org.junit.jupiter.api.assertAll
+import org.opendc.compute.api.Image
+import org.opendc.metal.NodeEvent
+import org.opendc.metal.NodeState
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -59,22 +61,21 @@ internal class SimBareMetalDriverTest {
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
- var finalState: ServerState = ServerState.BUILD
+ var finalState: NodeState = NodeState.UNKNOWN
var finalTime = 0L
testScope.launch {
val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
- val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(4_000, utilization = 1.0))
-
+ val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to SimFlopsWorkload(4_000, utilization = 1.0)))
// Batch driver commands
withContext(coroutineContext) {
driver.init()
driver.setImage(image)
- val server = driver.start().server!!
- server.events.collect { event ->
+ val node = driver.start()
+ node.events.collect { event ->
when (event) {
- is ServerEvent.StateChanged -> {
- finalState = event.server.state
+ is NodeEvent.StateChanged -> {
+ finalState = event.node.state
finalTime = clock.millis()
}
}
@@ -83,7 +84,9 @@ internal class SimBareMetalDriverTest {
}
testScope.advanceUntilIdle()
- assertEquals(ServerState.SHUTOFF, finalState)
- assertEquals(501, finalTime)
+ assertAll(
+ { assertEquals(NodeState.SHUTOFF, finalState) },
+ { assertEquals(501, finalTime) }
+ )
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 1831eae0..61bff39f 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,6 +24,7 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -32,8 +33,14 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.virt.HypervisorEvent
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Image
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.service.driver.HostEvent
+import org.opendc.metal.Node
+import org.opendc.metal.NodeState
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -48,7 +55,7 @@ import java.util.UUID
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimVirtDriverTest {
+internal class SimHostTest {
private lateinit var scope: TestCoroutineScope
private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@@ -75,35 +82,42 @@ internal class SimVirtDriverTest {
var grantedWork = 0L
var overcommittedWork = 0L
+ val node = Node(
+ UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF,
+ Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow()
+ )
+
scope.launch {
- val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider())
- val vmm = SimWorkloadImage(UUID.randomUUID(), "vmm", emptyMap(), virtDriver)
+ val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider())
+ val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver))
val duration = 5 * 60L
- val vmImageA = SimWorkloadImage(
+ val vmImageA = Image(
UUID.randomUUID(),
"<unnamed>",
- emptyMap(),
- SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
- ),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
+ ),
+ )
)
)
- val vmImageB = SimWorkloadImage(
+ val vmImageB = Image(
UUID.randomUUID(),
"<unnamed>",
- emptyMap(),
- SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
+ )
)
- ),
+ )
)
val metalDriver =
@@ -119,7 +133,7 @@ internal class SimVirtDriverTest {
virtDriver.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> {
+ is HostEvent.SliceFinished -> {
requestedWork += event.requestedBurst
grantedWork += event.grantedBurst
overcommittedWork += event.overcommissionedBurst
@@ -128,8 +142,8 @@ internal class SimVirtDriverTest {
}
.launchIn(this)
- virtDriver.spawn("a", vmImageA, flavor)
- virtDriver.spawn("b", vmImageB, flavor)
+ launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
+ launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
}
scope.advanceUntilIdle()
@@ -142,4 +156,20 @@ internal class SimVirtDriverTest {
{ assertEquals(1200006, scope.currentTime) }
)
}
+
+ private class MockServer(
+ override val uid: UUID,
+ override val name: String,
+ override val flavor: Flavor,
+ override val image: Image
+ ) : Server {
+ override val tags: Map<String, String> = emptyMap()
+ override val state: ServerState = ServerState.BUILD
+
+ override fun watch(watcher: ServerWatcher) {}
+
+ override fun unwatch(watcher: ServerWatcher) {}
+
+ override suspend fun refresh() {}
+ }
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
index a33a4e5f..33b3db94 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
@@ -29,7 +29,8 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
-import org.opendc.compute.core.metal.service.SimpleProvisioningService
+import org.opendc.compute.api.Image
+import org.opendc.metal.service.SimpleProvisioningService
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -64,7 +65,7 @@ internal class SimProvisioningServiceTest {
val clock = DelayControllerClockAdapter(testScope)
testScope.launch {
- val image = SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(1000))
+ val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("machine" to SimFlopsWorkload(1000)))
val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
val provisioner = SimpleProvisioningService()
@@ -72,7 +73,7 @@ internal class SimProvisioningServiceTest {
delay(5)
val nodes = provisioner.nodes()
val node = provisioner.deploy(nodes.first(), image)
- node.server!!.events.collect { println(it) }
+ node.events.collect { println(it) }
}
testScope.advanceUntilIdle()
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt
index 7b0c7515..d4d88fb1 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt
@@ -7,9 +7,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
-import org.opendc.compute.core.metal.driver.BareMetalDriver
import org.opendc.compute.simulator.power.api.CpuPowerModel
import org.opendc.compute.simulator.power.models.*
+import org.opendc.metal.driver.BareMetalDriver
import java.util.stream.Stream
import kotlin.math.pow
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 8f3e686a..a5cf4fc0 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -32,22 +32,26 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.metal.NODE_CLUSTER
-import org.opendc.compute.core.metal.driver.BareMetalDriver
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.virt.HypervisorEvent
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
-import org.opendc.compute.core.workload.VmWorkload
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AllocationPolicy
import org.opendc.compute.simulator.SimBareMetalDriver
-import org.opendc.compute.simulator.SimVirtDriver
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.AllocationPolicy
+import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.metal.NODE_CLUSTER
+import org.opendc.metal.NodeEvent
+import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.failures.CorrelatedFaultInjector
@@ -135,6 +139,12 @@ public fun createTraceReader(
)
}
+public data class ProvisionerResult(
+ val metal: ProvisioningService,
+ val provisioner: SimHostProvisioner,
+ val compute: ComputeServiceImpl
+)
+
/**
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
@@ -144,45 +154,52 @@ public suspend fun createProvisioner(
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
eventTracer: EventTracer
-): Pair<ProvisioningService, SimVirtProvisioningService> {
+): ProvisionerResult {
val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy, eventTracer, SimFairShareHypervisorProvider())
+ val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+
+ val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+
+ for (host in hosts) {
+ scheduler.addHost(host)
+ }
// Wait for the hypervisors to be spawned
delay(10)
- return bareMetalProvisioner to scheduler
+ return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler)
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public suspend fun attachMonitor(
+public fun attachMonitor(
coroutineScope: CoroutineScope,
clock: Clock,
- scheduler: SimVirtProvisioningService,
+ scheduler: ComputeService,
monitor: ExperimentMonitor
) {
- val hypervisors = scheduler.drivers()
+ val hypervisors = scheduler.hosts
// Monitor hypervisor events
for (hypervisor in hypervisors) {
- // TODO Do not expose VirtDriver directly but use Hypervisor class.
- val server = (hypervisor as SimVirtDriver).server
+ // TODO Do not expose Host directly but use Hypervisor class.
+ val server = (hypervisor as SimHost).node
monitor.reportHostStateChange(clock.millis(), hypervisor, server)
server.events
.onEach { event ->
val time = clock.millis()
when (event) {
- is ServerEvent.StateChanged -> {
- monitor.reportHostStateChange(time, hypervisor, event.server)
+ is NodeEvent.StateChanged -> {
+ monitor.reportHostStateChange(time, hypervisor, event.node)
}
}
}
@@ -190,7 +207,7 @@ public suspend fun attachMonitor(
hypervisor.events
.onEach { event ->
when (event) {
- is HypervisorEvent.SliceFinished -> monitor.reportHostSlice(
+ is HostEvent.SliceFinished -> monitor.reportHostSlice(
clock.millis(),
event.requestedBurst,
event.grantedBurst,
@@ -199,22 +216,22 @@ public suspend fun attachMonitor(
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- event.hostServer
+ (event.driver as SimHost).node
)
}
}
.launchIn(coroutineScope)
- val driver = hypervisor.server.services[BareMetalDriver.Key] as SimBareMetalDriver
+ val driver = server.metadata["driver"] as SimBareMetalDriver
driver.powerDraw
- .onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
+ .onEach { monitor.reportPowerConsumption(server, it) }
.launchIn(coroutineScope)
}
scheduler.events
.onEach { event ->
when (event) {
- is VirtProvisioningEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable ->
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
@@ -227,11 +244,12 @@ public suspend fun attachMonitor(
public suspend fun processTrace(
coroutineScope: CoroutineScope,
clock: Clock,
- reader: TraceReader<VmWorkload>,
- scheduler: SimVirtProvisioningService,
+ reader: TraceReader<ComputeWorkload>,
+ scheduler: ComputeService,
chan: Channel<Unit>,
monitor: ExperimentMonitor
) {
+ val client = scheduler.newClient()
try {
var submitted = 0
@@ -242,7 +260,7 @@ public suspend fun processTrace(
delay(max(0, time - clock.millis()))
coroutineScope.launch {
chan.send(Unit)
- val server = scheduler.deploy(
+ val server = client.newServer(
workload.image.name,
workload.image,
Flavor(
@@ -250,21 +268,19 @@ public suspend fun processTrace(
workload.image.tags["required-memory"] as Long
)
)
- // Monitor server events
- server.events
- .onEach {
- if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(clock.millis(), it.server)
- }
+
+ server.watch(object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor.reportVmStateChange(clock.millis(), server, newState)
}
- .collect()
+ })
}
}
scheduler.events
.takeWhile {
when (it) {
- is VirtProvisioningEvent.MetricsAvailable ->
+ is ComputeServiceEvent.MetricsAvailable ->
it.inactiveVmCount + it.failedVmCount != submitted
}
}
@@ -272,5 +288,6 @@ public suspend fun processTrace(
delay(1)
} finally {
reader.close()
+ client.close()
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 75b0d735..ff0a026d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -28,6 +28,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
+import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
@@ -151,7 +157,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
)
testScope.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(
+ val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
this,
clock,
environment,
@@ -190,7 +196,8 @@ public abstract class Portfolio(name: String) : Experiment(name) {
logger.debug("FINISHED=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
+ provisioner.close()
}
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 3c6637bf..1e42cf56 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -22,9 +22,11 @@
package org.opendc.experiments.capelin.monitor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.metal.Node
import java.io.Closeable
/**
@@ -34,22 +36,22 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a VM changes.
*/
- public fun reportVmStateChange(time: Long, server: Server) {}
+ public fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
/**
* This method is invoked when the state of a host changes.
*/
public fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
- server: Server
+ driver: Host,
+ host: Node
) {
}
/**
* Report the power consumption of a host.
*/
- public fun reportPowerConsumption(host: Server, draw: Double) {}
+ public fun reportPowerConsumption(host: Node, draw: Double) {}
/**
* This method is invoked for a host for each slice that is finishes.
@@ -63,7 +65,7 @@ public interface ExperimentMonitor : Closeable {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long = 5 * 60 * 1000L
) {
}
@@ -71,5 +73,5 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked for a provisioner event.
*/
- public fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {}
+ public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index a0d57656..98052214 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -23,13 +23,15 @@
package org.opendc.experiments.capelin.monitor
import mu.KotlinLogging
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import org.opendc.experiments.capelin.telemetry.HostEvent
import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
+import org.opendc.metal.Node
import java.io.File
/**
@@ -49,10 +51,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
File(base, "provisioner-metrics/$partition/data.parquet"),
bufferSize
)
- private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
if (startTime < 0) {
startTime = time
@@ -63,12 +65,12 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
override fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
- server: Server
+ driver: Host,
+ host: Node
) {
- logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+ logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
- val previousEvent = currentHostEvent[server]
+ val previousEvent = currentHostEvent[host]
val roundedTime = previousEvent?.let {
val duration = time - it.timestamp
@@ -91,13 +93,13 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
0.0,
0.0,
0,
- server
+ host
)
}
- private val lastPowerConsumption = mutableMapOf<Server, Double>()
+ private val lastPowerConsumption = mutableMapOf<Node, Double>()
- override fun reportPowerConsumption(host: Server, draw: Double) {
+ override fun reportPowerConsumption(host: Node, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -110,16 +112,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
- val previousEvent = currentHostEvent[hostServer]
+ val previousEvent = currentHostEvent[host]
when {
previousEvent == null -> {
val event = HostEvent(
time,
5 * 60 * 1000L,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -127,17 +129,17 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
previousEvent.timestamp == time -> {
val event = HostEvent(
time,
previousEvent.duration,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -145,11 +147,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
else -> {
hostWriter.write(previousEvent)
@@ -157,7 +159,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
val event = HostEvent(
time,
time - previousEvent.timestamp,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -165,16 +167,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
}
}
- override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
provisionerWriter.write(
ProvisionerEvent(
time,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
index e5e9d520..e7b6a7bb 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.compute.core.Server
+import org.opendc.metal.Node
/**
* A periodic report of the host machine metrics.
@@ -30,7 +30,7 @@ import org.opendc.compute.core.Server
public data class HostEvent(
override val timestamp: Long,
public val duration: Long,
- public val host: Server,
+ public val node: Node,
public val vmCount: Int,
public val requestedBurst: Long,
public val grantedBurst: Long,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
index 427c453a..7631f55f 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/VmEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.compute.core.Server
+import org.opendc.compute.api.Server
/**
* A periodic report of a virtual machine's metrics.
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
index 4a3e7963..b4fdd66a 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
// record.put("portfolio_id", event.run.parent.parent.id)
// record.put("scenario_id", event.run.parent.id)
// record.put("run_id", event.run.id)
- record.put("host_id", event.host.name)
- record.put("state", event.host.state.name)
+ record.put("host_id", event.node.name)
+ record.put("state", event.node.state.name)
record.put("timestamp", event.timestamp)
record.put("duration", event.duration)
record.put("vm_count", event.vmCount)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index 6cfdae40..f9630078 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -22,8 +22,8 @@
package org.opendc.experiments.capelin.trace
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
@@ -45,11 +45,11 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
workload: Workload,
seed: Int
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The iterator over the actual trace.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>> =
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>> =
rawReaders
.map { it.read() }
.run {
@@ -73,11 +73,10 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
val newImage =
- SimWorkloadImage(
+ Image(
image.uid,
image.name,
image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- (image as SimWorkloadImage).workload
)
val newWorkload = entry.workload.copy(image = newImage)
Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
@@ -88,7 +87,7 @@ public class Sc20ParquetTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index d2560d62..b29bdc54 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -26,8 +26,8 @@ import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -108,11 +108,12 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(vmFragments)
+ val vmWorkload = ComputeWorkload(
uid,
id,
UnnamedUser,
- SimWorkloadImage(
+ Image(
uid,
id,
mapOf(
@@ -120,9 +121,9 @@ public class Sc20RawParquetTraceReader(private val path: File) {
"end-time" to endTime,
"total-load" to totalLoad,
"cores" to maxCores,
- "required-memory" to requiredMemory
- ),
- SimTraceWorkload(vmFragments)
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
)
entries.add(TraceEntryImpl(submissionTime, vmWorkload))
@@ -150,7 +151,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the entries in the trace.
*/
- public fun read(): List<TraceEntry<VmWorkload>> = entries
+ public fun read(): List<TraceEntry<ComputeWorkload>> = entries
/**
* An unnamed user.
@@ -165,6 +166,6 @@ public class Sc20RawParquetTraceReader(private val path: File) {
*/
internal data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index 12705c80..c588fda3 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -31,8 +31,8 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -62,11 +62,11 @@ public class Sc20StreamingParquetTraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>>
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>>
/**
* The intermediate buffer to store the read records in.
@@ -235,19 +235,20 @@ public class Sc20StreamingParquetTraceReader(
performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSortedSet(),
Random(random.nextInt())
)
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(fragments)
+ val vmWorkload = ComputeWorkload(
uid,
"VM Workload $id",
UnnamedUser,
- SimWorkloadImage(
+ Image(
uid,
id,
mapOf(
IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
"cores" to maxCores,
- "required-memory" to requiredMemory
- ),
- SimTraceWorkload(fragments),
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
)
@@ -263,7 +264,7 @@ public class Sc20StreamingParquetTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {
readerThread.interrupt()
@@ -300,6 +301,6 @@ public class Sc20StreamingParquetTraceReader(
*/
private data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index 4d9b9df1..881652f6 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -23,8 +23,8 @@
package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
@@ -38,11 +38,11 @@ private val logger = KotlinLogging.logger {}
* Sample the workload for the specified [run].
*/
public fun sampleWorkload(
- trace: List<TraceEntry<VmWorkload>>,
+ trace: List<TraceEntry<ComputeWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<VmWorkload>> {
+): List<TraceEntry<ComputeWorkload>> {
return when {
workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
workload.samplingStrategy == SamplingStrategy.HPC ->
@@ -58,15 +58,15 @@ public fun sampleWorkload(
* Sample a regular (non-HPC) workload.
*/
public fun sampleRegularWorkload(
- trace: List<TraceEntry<VmWorkload>>,
+ trace: List<TraceEntry<ComputeWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<VmWorkload>> {
+): List<TraceEntry<ComputeWorkload>> {
val fraction = subWorkload.fraction
val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
val totalLoad = if (workload is CompositeWorkload) {
workload.totalLoad
} else {
@@ -93,11 +93,11 @@ public fun sampleRegularWorkload(
* Sample a HPC workload.
*/
public fun sampleHpcWorkload(
- trace: List<TraceEntry<VmWorkload>>,
+ trace: List<TraceEntry<ComputeWorkload>>,
workload: Workload,
seed: Int,
sampleOnLoad: Boolean
-): List<TraceEntry<VmWorkload>> {
+): List<TraceEntry<ComputeWorkload>> {
val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
val random = Random(seed)
@@ -109,7 +109,7 @@ public fun sampleHpcWorkload(
val hpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
hpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -118,7 +118,7 @@ public fun sampleHpcWorkload(
val nonHpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
nonHpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -139,7 +139,7 @@ public fun sampleHpcWorkload(
var nonHpcCount = 0
var nonHpcLoad = 0.0
- val res = mutableListOf<TraceEntry<VmWorkload>>()
+ val res = mutableListOf<TraceEntry<ComputeWorkload>>()
if (sampleOnLoad) {
var currentLoad = 0.0
@@ -194,17 +194,16 @@ public fun sampleHpcWorkload(
/**
* Sample a random trace entry.
*/
-private fun sample(entry: TraceEntry<VmWorkload>, i: Int): TraceEntry<VmWorkload> {
+private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> {
val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
- val image = SimWorkloadImage(
+ val image = Image(
id,
entry.workload.image.name,
- entry.workload.image.tags,
- (entry.workload.image as SimWorkloadImage).workload
+ entry.workload.image.tags
)
val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
return VmTraceEntry(vmWorkload, entry.submissionTime)
}
-private class VmTraceEntry(override val workload: VmWorkload, override val submissionTime: Long) :
- TraceEntry<VmWorkload>
+private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) :
+ TraceEntry<ComputeWorkload>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 6a0796f6..dfc6b90b 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,10 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.service.internal.ComputeServiceImpl
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
import org.opendc.experiments.capelin.experiment.createProvisioner
@@ -47,6 +46,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
+import org.opendc.metal.Node
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import java.io.File
@@ -97,7 +97,7 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var scheduler: SimVirtProvisioningService
+ lateinit var scheduler: ComputeServiceImpl
val tracer = EventTracer(clock)
testScope.launch {
@@ -108,8 +108,8 @@ class CapelinIntegrationTest {
allocationPolicy,
tracer
)
- val bareMetalProvisioner = res.first
- scheduler = res.second
+ val bareMetalProvisioner = res.metal
+ scheduler = res.compute
val failureDomain = if (failures) {
println("ENABLING failures")
@@ -138,8 +138,9 @@ class CapelinIntegrationTest {
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
monitor.close()
+ res.provisioner.close()
}
runSimulation()
@@ -148,9 +149,9 @@ class CapelinIntegrationTest {
assertAll(
{ assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") },
{ assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") },
- { assertEquals(1684849230562, monitor.totalRequestedBurst) },
- { assertEquals(447612683996, monitor.totalGrantedBurst) },
- { assertEquals(1219535757406, monitor.totalOvercommissionedBurst) },
+ { assertEquals(1678587333640, monitor.totalRequestedBurst) },
+ { assertEquals(438118200924, monitor.totalGrantedBurst) },
+ { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) },
{ assertEquals(0, monitor.totalInterferedBurst) }
)
}
@@ -162,19 +163,16 @@ class CapelinIntegrationTest {
val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
- lateinit var scheduler: SimVirtProvisioningService
val tracer = EventTracer(clock)
testScope.launch {
- val res = createProvisioner(
+ val (_, provisioner, scheduler) = createProvisioner(
this,
clock,
environmentReader,
allocationPolicy,
tracer
)
- scheduler = res.second
-
attachMonitor(this, clock, scheduler, monitor)
processTrace(
this,
@@ -187,8 +185,9 @@ class CapelinIntegrationTest {
println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}")
- scheduler.terminate()
+ scheduler.close()
monitor.close()
+ provisioner.close()
}
runSimulation()
@@ -210,7 +209,7 @@ class CapelinIntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> {
return Sc20ParquetTraceReader(
listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
emptyMap(),
@@ -242,7 +241,7 @@ class CapelinIntegrationTest {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
totalRequestedBurst += requestedBurst
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
index fc979363..7b9d70ed 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
@@ -24,13 +24,14 @@ package org.opendc.experiments.sc18
import kotlinx.coroutines.*
import kotlinx.coroutines.test.TestCoroutineScope
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
+import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
@@ -91,16 +92,17 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
// Wait for the bare metal nodes to be spawned
delay(10)
- val provisioner = SimVirtProvisioningService(
- testScope,
+ val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+ val compute = ComputeService(
+ testScope.coroutineContext,
clock,
- bareMetal,
- NumberOfActiveServersAllocationPolicy(),
tracer,
- SimSpaceSharedHypervisorProvider(),
- schedulingQuantum = 1000
+ NumberOfActiveServersAllocationPolicy(),
)
+ hosts.forEach { compute.addHost(it) }
+
// Wait for the hypervisors to be spawned
delay(10)
@@ -108,7 +110,7 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
testScope,
clock,
tracer,
- provisioner,
+ compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts
index cd26c077..37e9c9c8 100644
--- a/simulator/opendc-format/build.gradle.kts
+++ b/simulator/opendc-format/build.gradle.kts
@@ -31,7 +31,7 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-core"))
- api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-compute:opendc-compute-api"))
api(project(":opendc-workflows"))
implementation(project(":opendc-simulator:opendc-simulator-compute"))
implementation(project(":opendc-compute:opendc-compute-simulator"))
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 2e3e4a73..bbbbe87c 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -26,14 +26,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.metal.service.SimpleProvisioningService
import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.core.Environment
import org.opendc.core.Platform
import org.opendc.core.Zone
import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
+import org.opendc.metal.service.ProvisioningService
+import org.opendc.metal.service.SimpleProvisioningService
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 6ec8ba4a..998f9cd6 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -23,9 +23,6 @@
package org.opendc.format.environment.sc20
import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.core.metal.NODE_CLUSTER
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.metal.service.SimpleProvisioningService
import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.core.Environment
@@ -33,6 +30,9 @@ import org.opendc.core.Platform
import org.opendc.core.Zone
import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
+import org.opendc.metal.NODE_CLUSTER
+import org.opendc.metal.service.ProvisioningService
+import org.opendc.metal.service.SimpleProvisioningService
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index a58a2524..6cf65f7f 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -26,8 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.metal.service.SimpleProvisioningService
import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.core.Environment
@@ -35,6 +33,8 @@ import org.opendc.core.Platform
import org.opendc.core.Zone
import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
+import org.opendc.metal.service.ProvisioningService
+import org.opendc.metal.service.SimpleProvisioningService
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 90d751ea..1571b17d 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -22,8 +22,8 @@
package org.opendc.format.trace.bitbrains
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -45,17 +45,17 @@ import kotlin.math.min
public class BitbrainsTraceReader(
traceDirectory: File,
performanceInterferenceModel: PerformanceInterferenceModel
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>>
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>()
+ val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>()
var timestampCol = 0
var coreCol = 0
@@ -131,19 +131,20 @@ public class BitbrainsTraceReader(
.toSortedSet()
)
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(flopsHistory.asSequence())
+ val vmWorkload = ComputeWorkload(
uuid,
"VM Workload $vmId",
UnnamedUser,
- SimWorkloadImage(
+ Image(
uuid,
vmId.toString(),
mapOf(
IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
"cores" to cores,
- "required-memory" to requiredMemory
- ),
- SimTraceWorkload(flopsHistory.asSequence())
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
)
entries[vmId] = TraceEntryImpl(
@@ -158,7 +159,7 @@ public class BitbrainsTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {}
@@ -175,6 +176,6 @@ public class BitbrainsTraceReader(
*/
private data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
index c76889c8..cd7aff3c 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -22,7 +22,7 @@
package org.opendc.format.trace.gwf
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -41,7 +41,6 @@ import kotlin.collections.List
import kotlin.collections.MutableSet
import kotlin.collections.component1
import kotlin.collections.component2
-import kotlin.collections.emptyMap
import kotlin.collections.filter
import kotlin.collections.forEach
import kotlin.collections.getOrPut
@@ -136,10 +135,11 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet()))
}
val workflow = entry.workload
+ val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)),
+ Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)),
HashSet(),
mapOf(
WORKFLOW_TASK_CORES to cores,
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
index 78f581ca..07785632 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -22,8 +22,8 @@
package org.opendc.format.trace.sc20
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -49,17 +49,17 @@ public class Sc20TraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>>
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<UUID, TraceEntry<VmWorkload>>()
+ val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>()
val timestampCol = 0
val cpuUsageCol = 1
@@ -156,19 +156,20 @@ public class Sc20TraceReader(
performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(),
Random(random.nextInt())
)
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(flopsFragments.asSequence())
+ val vmWorkload = ComputeWorkload(
uuid,
"VM Workload $vmId",
UnnamedUser,
- SimWorkloadImage(
+ Image(
uuid,
vmId,
mapOf(
IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
"cores" to cores,
- "required-memory" to requiredMemory
- ),
- SimTraceWorkload(flopsFragments.asSequence())
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
)
entries[uuid] = TraceEntryImpl(
@@ -183,7 +184,7 @@ public class Sc20TraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {}
@@ -200,6 +201,6 @@ public class Sc20TraceReader(
*/
private data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index 80c54354..ead20c35 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -22,8 +22,8 @@
package org.opendc.format.trace.swf
-import org.opendc.compute.core.workload.VmWorkload
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -43,17 +43,17 @@ import java.util.*
public class SwfTraceReader(
file: File,
maxNumCores: Int = -1
-) : TraceReader<VmWorkload> {
+) : TraceReader<ComputeWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<VmWorkload>>
+ private val iterator: Iterator<TraceEntry<ComputeWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntry<VmWorkload>>()
+ val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>()
val jobNumberCol = 0
val submitTimeCol = 1 // seconds (begin of trace is 0)
@@ -154,18 +154,19 @@ public class SwfTraceReader(
}
val uuid = UUID(0L, jobNumber)
- val vmWorkload = VmWorkload(
+ val workload = SimTraceWorkload(flopsHistory.asSequence())
+ val vmWorkload = ComputeWorkload(
uuid,
"SWF Workload $jobNumber",
UnnamedUser,
- SimWorkloadImage(
+ Image(
uuid,
jobNumber.toString(),
mapOf(
"cores" to cores,
- "required-memory" to memory
- ),
- SimTraceWorkload(flopsHistory.asSequence())
+ "required-memory" to memory,
+ "workload" to workload
+ )
)
)
@@ -179,7 +180,7 @@ public class SwfTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<VmWorkload> = iterator.next()
+ override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
override fun close() {}
@@ -196,6 +197,6 @@ public class SwfTraceReader(
*/
private data class TraceEntryImpl(
override var submissionTime: Long,
- override val workload: VmWorkload
- ) : TraceEntry<VmWorkload>
+ override val workload: ComputeWorkload
+ ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index d7dc09fa..5a271fab 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -25,7 +25,7 @@ package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.simulator.SimWorkloadImage
+import org.opendc.compute.api.Image
import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
@@ -78,10 +78,17 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet()))
}
val workflow = entry.workload
+ val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- SimWorkloadImage(UUID.randomUUID(), "<unnamed>", emptyMap(), SimFlopsWorkload(flops)),
+ Image(
+ UUID.randomUUID(),
+ "<unnamed>",
+ mapOf(
+ "workload" to workload
+ )
+ ),
HashSet(),
mapOf(
WORKFLOW_TASK_CORES to cores,
diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 45c125c4..7e3d2623 100644
--- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -24,7 +24,6 @@ package org.opendc.format.trace.swf
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import org.opendc.compute.simulator.SimWorkloadImage
import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.File
@@ -35,12 +34,12 @@ class SwfTraceReaderTest {
var entry = reader.next()
assertEquals(0, entry.submissionTime)
// 1961 slices for waiting, 3 full and 1 partial running slices
- assertEquals(1965, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size)
+ assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size)
entry = reader.next()
assertEquals(164472, entry.submissionTime)
// 1188 slices for waiting, 0 full and 1 partial running slices
- assertEquals(1189, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().size)
- assertEquals(0.25, ((entry.workload.image as SimWorkloadImage).workload as SimTraceWorkload).trace.toList().last().usage)
+ assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size)
+ assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage)
}
}
diff --git a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts b/simulator/opendc-metal/build.gradle.kts
index a1b6ec0f..9207de18 100644
--- a/simulator/opendc-compute/opendc-compute-core/build.gradle.kts
+++ b/simulator/opendc-metal/build.gradle.kts
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-description = "Core implementation of the OpenDC Compute service"
+description = "Bare-metal provisioning in OpenDC"
/* Build configuration */
plugins {
@@ -30,6 +30,7 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-core"))
+ api(project(":opendc-compute:opendc-compute-api"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt
index 11eadd87..ca98dab0 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Metadata.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal
+package org.opendc.metal
/*
* Common metadata keys for bare-metal nodes.
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt
index 6d9506f1..1c5c7a8d 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/Node.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,11 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal
+package org.opendc.metal
import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.image.Image
+import org.opendc.compute.api.Flavor
+import org.opendc.compute.api.Image
import org.opendc.core.Identity
import java.util.UUID
@@ -53,14 +53,14 @@ public data class Node(
public val state: NodeState,
/**
- * The boot image of the node.
+ * The flavor of the node.
*/
- public val image: Image,
+ public val flavor: Flavor,
/**
- * The server instance that is running on the node or `null` if no server is running.
+ * The boot image of the node.
*/
- public val server: Server?,
+ public val image: Image,
/**
* The events that are emitted by the node.
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt
index 4423e2bf..30ce423c 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeEvent.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal
+package org.opendc.metal
/**
* An event that is emitted by a [Node].
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt
index bdc4841e..f1d4ea2e 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/NodeState.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal
+package org.opendc.metal
/**
* An enumeration describing the possible states of a bare-metal compute node.
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt
index 9db57127..3b15be94 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,13 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal.driver
+package org.opendc.metal.driver
import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.Node
+import org.opendc.compute.api.Image
+import org.opendc.compute.api.Server
import org.opendc.core.services.AbstractServiceKey
+import org.opendc.metal.Node
import java.util.UUID
/**
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt
index bad5b47c..6548767e 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/ProvisioningService.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,12 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal.service
+package org.opendc.metal.service
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.compute.api.Image
import org.opendc.core.services.AbstractServiceKey
+import org.opendc.metal.Node
+import org.opendc.metal.driver.BareMetalDriver
import java.util.UUID
/**
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt
index 5222f2fb..2d6353c8 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/metal/service/SimpleProvisioningService.kt
+++ b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,12 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.compute.core.metal.service
+package org.opendc.metal.service
import kotlinx.coroutines.CancellationException
-import org.opendc.compute.core.image.Image
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.metal.driver.BareMetalDriver
+import org.opendc.compute.api.Image
+import org.opendc.metal.Node
+import org.opendc.metal.driver.BareMetalDriver
/**
* A very basic implementation of the [ProvisioningService].
diff --git a/simulator/opendc-platform/build.gradle.kts b/simulator/opendc-platform/build.gradle.kts
index 0ae1c72a..ea0591ad 100644
--- a/simulator/opendc-platform/build.gradle.kts
+++ b/simulator/opendc-platform/build.gradle.kts
@@ -33,5 +33,6 @@ dependencies {
api("io.github.microutils:kotlin-logging:${versions.kotlinLogging}")
runtime("org.slf4j:slf4j-simple:${versions.slf4j}")
+ runtime("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}")
}
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 533bf321..482fe754 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -40,6 +40,11 @@ import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
import org.bson.Document
import org.bson.types.ObjectId
+import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.AvailableMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
+import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
import org.opendc.experiments.capelin.experiment.attachMonitor
import org.opendc.experiments.capelin.experiment.createFailureDomain
@@ -242,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
val tracer = EventTracer(clock)
testScope.launch {
- val (bareMetalProvisioner, scheduler) = createProvisioner(
+ val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
this,
clock,
environment,
@@ -281,7 +286,8 @@ public class RunnerCli : CliktCommand(name = "runner") {
logger.debug("FINISHED=${scheduler.finishedVms}")
failureDomain?.cancel()
- scheduler.terminate()
+ scheduler.close()
+ provisioner.close()
}
try {
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index f43d0869..2f11347d 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -32,9 +32,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.bson.Document
import org.bson.types.ObjectId
-import org.opendc.compute.core.metal.NODE_CLUSTER
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.core.metal.service.SimpleProvisioningService
import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
import org.opendc.core.Environment
@@ -42,6 +39,9 @@ import org.opendc.core.Platform
import org.opendc.core.Zone
import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
+import org.opendc.metal.NODE_CLUSTER
+import org.opendc.metal.service.ProvisioningService
+import org.opendc.metal.service.SimpleProvisioningService
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index f16f9b90..fe814c76 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -23,12 +23,14 @@
package org.opendc.runner.web
import mu.KotlinLogging
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.virt.driver.VirtDriver
-import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.telemetry.HostEvent
+import org.opendc.metal.Node
+import org.opendc.metal.NodeState
import kotlin.math.max
/**
@@ -36,10 +38,10 @@ import kotlin.math.max
*/
public class WebExperimentMonitor : ExperimentMonitor {
private val logger = KotlinLogging.logger {}
- private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Node, HostEvent>()
private var startTime = -1L
- override fun reportVmStateChange(time: Long, server: Server) {
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
if (startTime < 0) {
startTime = time
@@ -50,12 +52,12 @@ public class WebExperimentMonitor : ExperimentMonitor {
override fun reportHostStateChange(
time: Long,
- driver: VirtDriver,
- server: Server
+ driver: Host,
+ host: Node
) {
- logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+ logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
- val previousEvent = currentHostEvent[server]
+ val previousEvent = currentHostEvent[host]
val roundedTime = previousEvent?.let {
val duration = time - it.timestamp
@@ -78,13 +80,13 @@ public class WebExperimentMonitor : ExperimentMonitor {
0.0,
0.0,
0,
- server
+ host
)
}
- private val lastPowerConsumption = mutableMapOf<Server, Double>()
+ private val lastPowerConsumption = mutableMapOf<Node, Double>()
- override fun reportPowerConsumption(host: Server, draw: Double) {
+ override fun reportPowerConsumption(host: Node, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -97,16 +99,16 @@ public class WebExperimentMonitor : ExperimentMonitor {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- hostServer: Server,
+ host: Node,
duration: Long
) {
- val previousEvent = currentHostEvent[hostServer]
+ val previousEvent = currentHostEvent[host]
when {
previousEvent == null -> {
val event = HostEvent(
time,
5 * 60 * 1000L,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -114,17 +116,17 @@ public class WebExperimentMonitor : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
previousEvent.timestamp == time -> {
val event = HostEvent(
time,
previousEvent.duration,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -132,11 +134,11 @@ public class WebExperimentMonitor : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
else -> {
processHostEvent(previousEvent)
@@ -144,7 +146,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
val event = HostEvent(
time,
time - previousEvent.timestamp,
- hostServer,
+ host,
numberOfDeployedImages,
requestedBurst,
grantedBurst,
@@ -152,17 +154,17 @@ public class WebExperimentMonitor : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0,
- hostServer.flavor.cpuCount
+ lastPowerConsumption[host] ?: 200.0,
+ host.flavor.cpuCount
)
- currentHostEvent[hostServer] = event
+ currentHostEvent[host] = event
}
}
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf()
+ private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf()
private fun processHostEvent(event: HostEvent) {
val slices = event.duration / SLICE_LENGTH
@@ -173,14 +175,14 @@ public class WebExperimentMonitor : ExperimentMonitor {
hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)),
- hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0
+ hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0
)
- hostMetrics.compute(event.host) { key, prev ->
+ hostMetrics.compute(event.node) { _, prev ->
HostMetrics(
- (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
event.vmCount + (prev?.vmCount ?: 0),
1 + (prev?.count ?: 0)
)
@@ -208,7 +210,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
- override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {
provisionerMetrics = AggregateProvisionerMetrics(
max(event.totalVmCount, provisionerMetrics.vmTotalCount),
max(event.waitingVmCount, provisionerMetrics.vmWaitingCount),
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 812b5f20..f74c5697 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -84,33 +84,33 @@ public class SimBareMetalMachine(
private val scheduler = TimerScheduler<Cpu>(coroutineScope, clock)
/**
- * The execution context in which the workload runs.
- */
- private val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = this@SimBareMetalMachine.model
-
- override val clock: Clock
- get() = this@SimBareMetalMachine.clock
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- /**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload) {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
require(!isTerminated) { "Machine is terminated" }
require(cont == null) { "Run should not be called concurrently" }
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = this@SimBareMetalMachine.model
+
+ override val clock: Clock
+ get() = this@SimBareMetalMachine.clock
+
+ override val meta: Map<String, Any>
+ get() = meta
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.map { Cpu(it, workload) }
+ this.cpus = model.cpus.map { Cpu(ctx, it, workload) }
for (cpu in cpus) {
cpu.start()
@@ -161,7 +161,7 @@ public class SimBareMetalMachine(
/**
* A physical CPU of the machine.
*/
- private inner class Cpu(val model: ProcessingUnit, val workload: SimWorkload) {
+ private inner class Cpu(val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload) {
/**
* The current command.
*/
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
index c7c3d3cc..657dac66 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimExecutionContext.kt
@@ -41,6 +41,11 @@ public interface SimExecutionContext {
public val machine: SimMachineModel
/**
+ * The metadata associated with the context.
+ */
+ public val meta: Map<String, Any>
+
+ /**
* Ask the host machine to interrupt the specified vCPU.
*
* @param cpu The id of the vCPU to interrupt.
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 5e86d32b..bf6d8a5e 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -336,33 +336,33 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
private var cpus: List<VCpu> = emptyList()
/**
- * The execution context in which the workload runs.
- */
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
-
- override val clock: Clock
- get() = this@SimFairShareHypervisor.ctx.clock
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- /**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload) {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
require(!isTerminated) { "Machine is terminated" }
require(cont == null) { "Run should not be called concurrently" }
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimFairShareHypervisor.ctx.clock
+
+ override val meta: Map<String, Any>
+ get() = meta
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.map { VCpu(this, it, workload) }
+ this.cpus = model.cpus.map { VCpu(this, ctx, it, workload) }
for (cpu in cpus) {
// Register vCPU to scheduler
@@ -417,7 +417,12 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
/**
* A CPU of the virtual machine.
*/
- private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload) : Comparable<VCpu> {
+ private inner class VCpu(
+ val vm: SimVm,
+ val ctx: SimExecutionContext,
+ val model: ProcessingUnit,
+ val workload: SimWorkload
+ ) : Comparable<VCpu> {
/**
* The latest command processed by the CPU.
*/
@@ -488,7 +493,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
isIntermediate = true
latestFlush = ctx.clock.millis()
- process(workload.onStart(vm.ctx, model.id))
+ process(workload.onStart(ctx, model.id))
} catch (e: Throwable) {
fail(e)
} finally {
@@ -514,7 +519,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
// Act like nothing has happened in case the vCPU did not reach its deadline or was not
// interrupted by the user.
if (interrupt || command.deadline <= now) {
- process(workload.onNext(vm.ctx, model.id, 0.0))
+ process(workload.onNext(ctx, model.id, 0.0))
}
}
is SimResourceCommand.Consume -> {
@@ -545,7 +550,7 @@ public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener
totalOvercommittedWork += remainingWork
}
- process(workload.onNext(vm.ctx, model.id, remainingWork))
+ process(workload.onNext(ctx, model.id, remainingWork))
} else {
process(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline))
}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index ea8eeb37..bfaa60bc 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -42,7 +42,7 @@ public interface SimMachine : AutoCloseable {
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- public suspend fun run(workload: SimWorkload)
+ public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
/**
* Terminate this machine.
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index 66d3eda7..778b68ca 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -117,33 +117,33 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
private var cpus: List<VCpu> = emptyList()
/**
- * The execution context in which the workload runs.
- */
- val ctx = object : SimExecutionContext {
- override val machine: SimMachineModel
- get() = model
-
- override val clock: Clock
- get() = this@SimSpaceSharedHypervisor.ctx.clock
-
- override fun interrupt(cpu: Int) {
- require(cpu < cpus.size) { "Invalid CPU identifier" }
- cpus[cpu].interrupt()
- }
- }
-
- /**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload) {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
require(!isTerminated) { "Machine is terminated" }
require(cont == null) { "Run should not be called concurrently" }
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimSpaceSharedHypervisor.ctx.clock
+
+ override val meta: Map<String, Any>
+ get() = meta
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
workload.onStart(ctx)
return suspendCancellableCoroutine { cont ->
this.cont = cont
- this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) }
+ this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, ctx, model, workload, pCPUs[index]) }
for (cpu in cpus) {
cpu.start()
@@ -193,7 +193,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
/**
* A CPU of the virtual machine.
*/
- private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
+ private inner class VCpu(val vm: SimVm, val ctx: SimExecutionContext, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
/**
* The processing speed of the vCPU.
*/
@@ -267,7 +267,7 @@ public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listen
* Interrupt the CPU.
*/
fun interrupt() {
- ctx.interrupt(pCPU)
+ this@SimSpaceSharedHypervisor.ctx.interrupt(pCPU)
}
/**
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
index 948595b1..10f29f4e 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/EventFlow.kt
@@ -58,12 +58,22 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
private class EventFlowImpl<T> : EventFlow<T> {
private var closed: Boolean = false
- private val subscribers = HashMap<SendChannel<T>, Unit>()
+ private val subscribers = mutableListOf<SendChannel<T>>()
override fun emit(event: T) {
+ if (closed) {
+ return
+ }
+
+ val it = subscribers.iterator()
synchronized(this) {
- for ((chan, _) in subscribers) {
- chan.offer(event)
+ while (it.hasNext()) {
+ val chan = it.next()
+ if (chan.isClosedForSend) {
+ it.remove()
+ } else {
+ chan.offer(event)
+ }
}
}
}
@@ -72,9 +82,11 @@ private class EventFlowImpl<T> : EventFlow<T> {
synchronized(this) {
closed = true
- for ((chan, _) in subscribers) {
+ for (chan in subscribers) {
chan.close()
}
+
+ subscribers.clear()
}
}
@@ -87,9 +99,13 @@ private class EventFlowImpl<T> : EventFlow<T> {
}
channel = Channel(Channel.UNLIMITED)
- subscribers[channel] = Unit
+ subscribers.add(channel)
+ }
+ try {
+ channel.consumeAsFlow().collect(collector)
+ } finally {
+ channel.close()
}
- channel.consumeAsFlow().collect(collector)
}
override fun toString(): String = "EventFlow"
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index b4ffac7d..b6a2fc45 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -29,15 +29,16 @@ plugins {
}
dependencies {
+ api(platform(project(":opendc-platform")))
api(project(":opendc-core"))
- api(project(":opendc-compute:opendc-compute-core"))
+ api(project(":opendc-compute:opendc-compute-api"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
- implementation("io.github.microutils:kotlin-logging:${versions.kotlinLogging}")
+ implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
testImplementation(project(":opendc-compute:opendc-compute-simulator"))
testImplementation(project(":opendc-format"))
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}")
- testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
+ testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index e04c8a4c..6b348ed4 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -25,16 +25,10 @@ package org.opendc.workflows.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.core.Flavor
-import org.opendc.compute.core.Server
-import org.opendc.compute.core.ServerEvent
-import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.virt.service.VirtProvisioningService
+import org.opendc.compute.api.*
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
@@ -55,13 +49,13 @@ public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
internal val tracer: EventTracer,
- private val provisioningService: VirtProvisioningService,
+ private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
taskOrderPolicy: TaskOrderPolicy
-) : WorkflowService {
+) : WorkflowService, ServerWatcher {
/**
* The logger instance to use.
*/
@@ -103,12 +97,6 @@ public class StageWorkflowService(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
- * The load of the system.
- */
- internal val load: Double
- get() = (activeTasks.size / provisioningService.hostCount.toDouble())
-
- /**
* The root listener of this scheduler.
*/
private val rootListener = object : StageWorkflowSchedulerListener {
@@ -205,7 +193,7 @@ public class StageWorkflowService(
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
- private suspend fun requestCycle() = mode.requestCycle()
+ private fun requestCycle() = mode.requestCycle()
/**
* Perform a scheduling cycle immediately.
@@ -273,15 +261,13 @@ public class StageWorkflowService(
val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
val image = instance.task.image
coroutineScope.launch {
- val server = provisioningService.deploy(instance.task.name, image, flavor)
+ val server = computeClient.newServer(instance.task.name, image, flavor)
instance.state = TaskStatus.ACTIVE
instance.server = server
taskByServer[server] = instance
- server.events
- .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
- .launchIn(coroutineScope)
+ server.watch(this@StageWorkflowService)
}
activeTasks += instance
@@ -290,8 +276,8 @@ public class StageWorkflowService(
}
}
- private suspend fun stateChanged(server: Server) {
- when (server.state) {
+ public override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
index d1eb6704..ef9714c2 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
@@ -22,7 +22,7 @@
package org.opendc.workflows.service
-import org.opendc.compute.core.Server
+import org.opendc.compute.api.Server
import org.opendc.workflows.workload.Task
public class TaskState(public val job: JobState, public val task: Task) {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
index d03adc61..cf8f92e0 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,7 +24,6 @@ package org.opendc.workflows.service
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
import org.opendc.workflows.service.stage.StagePolicy
/**
@@ -38,7 +37,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
/**
* Request a new scheduling cycle to be performed.
*/
- public suspend fun requestCycle()
+ public fun requestCycle()
}
/**
@@ -46,9 +45,8 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
*/
public object Interactive : WorkflowSchedulerMode() {
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
- yield()
- scheduler.schedule()
+ override fun requestCycle() {
+ scheduler.coroutineScope.launch { scheduler.schedule() }
}
}
@@ -62,7 +60,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
@@ -88,7 +86,7 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
private var next: kotlinx.coroutines.Job? = null
override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
- override suspend fun requestCycle() {
+ override fun requestCycle() {
if (next == null) {
val delay = random.nextInt(200).toLong()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
deleted file mode 100644
index 4f0c269a..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.job
-
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
-
-/**
- * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load.
- *
- * @property limit The maximum load before stopping admission.
- */
-public data class LoadJobAdmissionPolicy(public val limit: Double) : JobAdmissionPolicy {
- override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
- override fun invoke(
- job: JobState
- ): JobAdmissionPolicy.Advice =
- if (scheduler.load < limit)
- JobAdmissionPolicy.Advice.ADMIT
- else
- JobAdmissionPolicy.Advice.STOP
- }
-
- override fun toString(): String = "Limit-Load($limit)"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
deleted file mode 100644
index a80a8c63..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.task
-
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-
-/**
- * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load.
- */
-public data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
- override fun invoke(
- task: TaskState
- ): TaskEligibilityPolicy.Advice =
- if (scheduler.load < limit)
- TaskEligibilityPolicy.Advice.ADMIT
- else
- TaskEligibilityPolicy.Advice.STOP
- }
-
- override fun toString(): String = "Limit-Load($limit)"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
index 1834a4c8..4c6d2842 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
@@ -24,7 +24,7 @@
package org.opendc.workflows.workload
-import org.opendc.compute.core.image.Image
+import org.opendc.compute.api.Image
import org.opendc.core.Identity
import java.util.*
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 2bfcba35..4207cdfd 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -36,11 +36,12 @@ import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.compute.simulator.SimVirtProvisioningService
-import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.service.ComputeService
+import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
+import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
+import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
@@ -80,7 +81,11 @@ internal class StageWorkflowSchedulerIntegrationTest {
// Wait for the bare metal nodes to be spawned
delay(10)
- val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000)
+ val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
+ val hosts = provisioner.provisionAll()
+ val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
+
+ hosts.forEach { compute.addHost(it) }
// Wait for the hypervisors to be spawned
delay(10)
@@ -89,7 +94,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope,
clock,
tracer,
- provisioner,
+ compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflows/src/test/resources/log4j2.xml
new file mode 100644
index 00000000..70a0eacc
--- /dev/null
+++ b/simulator/opendc-workflows/src/test/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2021 AtLarge Research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN" packages="org.apache.logging.log4j.core">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="warn">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index 77d78318..7a82adcd 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -23,8 +23,10 @@ rootProject.name = "opendc-simulator"
include(":opendc-platform")
include(":opendc-core")
-include(":opendc-compute:opendc-compute-core")
+include(":opendc-compute:opendc-compute-api")
+include(":opendc-compute:opendc-compute-service")
include(":opendc-compute:opendc-compute-simulator")
+include(":opendc-metal")
include(":opendc-workflows")
include(":opendc-format")
include(":opendc-experiments:opendc-experiments-sc18")