summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/README.md2
-rw-r--r--simulator/opendc-compute/opendc-compute-api/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt76
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt11
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt25
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt (renamed from simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt)15
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt (renamed from simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt)41
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt39
-rw-r--r--simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt18
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt11
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt (renamed from simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt)60
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt)51
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt25
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt317
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt (renamed from simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt)66
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt (renamed from simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt)41
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt131
-rw-r--r--simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt2
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt188
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt142
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt69
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt (renamed from simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt)24
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt (renamed from simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt)15
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt92
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt72
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt81
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt22
-rw-r--r--simulator/opendc-core/build.gradle.kts33
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt34
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt34
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt46
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt34
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt61
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt43
-rw-r--r--simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt37
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt125
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt9
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt13
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt24
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt4
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt21
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt43
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt58
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt54
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt22
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts3
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt52
-rw-r--r--simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt2
-rw-r--r--simulator/opendc-format/build.gradle.kts3
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt9
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt (renamed from simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt)27
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt45
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt39
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt42
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt36
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt5
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt45
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt55
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt42
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt57
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt53
-rw-r--r--simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt49
-rw-r--r--simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt10
-rw-r--r--simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt4
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt55
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt84
-rw-r--r--simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt65
-rw-r--r--simulator/opendc-runner-web/build.gradle.kts1
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt13
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt45
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt37
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt79
-rw-r--r--simulator/opendc-workflow/build.gradle.kts (renamed from simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt)14
-rw-r--r--simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts (renamed from simulator/opendc-metal/build.gradle.kts)7
-rw-r--r--simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt)18
-rw-r--r--simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt)2
-rw-r--r--simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt)19
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts (renamed from simulator/opendc-workflows/build.gradle.kts)6
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt)8
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt97
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt)6
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt)6
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt)95
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt)8
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt)18
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt)22
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt)8
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt)8
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt)20
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt)16
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt)16
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt)16
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt)16
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt)17
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt)16
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt)18
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt)12
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt (renamed from simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt)10
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt (renamed from simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt)63
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json (renamed from simulator/opendc-workflows/src/test/resources/environment.json)0
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml (renamed from simulator/opendc-workflows/src/test/resources/log4j2.xml)0
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf (renamed from simulator/opendc-workflows/src/test/resources/trace.gwf)0
-rw-r--r--simulator/settings.gradle.kts5
119 files changed, 1534 insertions, 2456 deletions
diff --git a/simulator/README.md b/simulator/README.md
index 61ef1d43..d02925d6 100644
--- a/simulator/README.md
+++ b/simulator/README.md
@@ -23,7 +23,7 @@ This component is responsible for modelling and simulation of datacenters and th
- **[opendc-compute](opendc-compute)**
The [Infrastructure as a Service](https://en.wikipedia.org/wiki/Infrastructure_as_a_Service) (IaaS) component of OpenDC for computing infrastructure (similar to
[Amazon EC2](https://aws.amazon.com/ec2/) and [Google Compute Engine](https://cloud.google.com/compute)).
-- **[opendc-workflows](opendc-workflows)**
+- **[opendc-workflow](opendc-workflow)**
Workflow orchestration service built on top of OpenDC.
- **[opendc-format](opendc-format)**
Collection of libraries for processing data formats related to (simulation of) cloud computing and datacenters.
diff --git a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts
index 10046322..835dbbb8 100644
--- a/simulator/opendc-compute/opendc-compute-api/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-api/build.gradle.kts
@@ -29,5 +29,4 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
}
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
index 025513e6..baa1ba2f 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeClient.kt
@@ -22,21 +22,95 @@
package org.opendc.compute.api
+import java.util.UUID
+
/**
* A client interface for the OpenDC Compute service.
*/
public interface ComputeClient : AutoCloseable {
/**
+ * Obtain the list of [Flavor]s accessible by the requesting user.
+ */
+ public suspend fun queryFlavors(): List<Flavor>
+
+ /**
+ * Obtain a [Flavor] by its unique identifier.
+ *
+ * @param id The identifier of the flavor.
+ */
+ public suspend fun findFlavor(id: UUID): Flavor?
+
+ /**
+ * Create a new [Flavor] instance at this compute service.
+ *
+ * @param name The name of the flavor.
+ * @param cpuCount The amount of CPU cores for this flavor.
+ * @param memorySize The size of the memory.
+ * @param labels The identifying labels of the image.
+ * @param meta The non-identifying meta-data of the image.
+ */
+ public suspend fun newFlavor(
+ name: String,
+ cpuCount: Int,
+ memorySize: Long,
+ labels: Map<String, String> = emptyMap(),
+ meta: Map<String, Any> = emptyMap()
+ ): Flavor
+
+ /**
+ * Obtain the list of [Image]s accessible by the requesting user.
+ */
+ public suspend fun queryImages(): List<Image>
+
+ /**
+ * Obtain a [Image] by its unique identifier.
+ *
+ * @param id The identifier of the image.
+ */
+ public suspend fun findImage(id: UUID): Image?
+
+ /**
+ * Create a new [Image] instance at this compute service.
+ *
+ * @param name The name of the image.
+ * @param labels The identifying labels of the image.
+ * @param meta The non-identifying meta-data of the image.
+ */
+ public suspend fun newImage(
+ name: String,
+ labels: Map<String, String> = emptyMap(),
+ meta: Map<String, Any> = emptyMap()
+ ): Image
+
+ /**
+ * Obtain the list of [Server]s accessible by the requesting user.
+ */
+ public suspend fun queryServers(): List<Server>
+
+ /**
+ * Obtain a [Server] by its unique identifier.
+ *
+ * @param id The identifier of the server.
+ */
+ public suspend fun findServer(id: UUID): Server?
+
+ /**
* Create a new [Server] instance at this compute service.
*
* @param name The name of the server to deploy.
* @param image The image to be deployed.
* @param flavor The flavor of the machine instance to run this [image] on.
+ * @param labels The identifying labels of the server.
+ * @param meta The non-identifying meta-data of the server.
+ * @param start A flag to indicate that the server should be started immediately.
*/
public suspend fun newServer(
name: String,
image: Image,
- flavor: Flavor
+ flavor: Flavor,
+ labels: Map<String, String> = emptyMap(),
+ meta: Map<String, Any> = emptyMap(),
+ start: Boolean = true
): Server
/**
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
index bf5f0ce4..5f511f91 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt
@@ -26,14 +26,19 @@ package org.opendc.compute.api
* Flavors define the compute and memory capacity of [Server] instance. To put it simply, a flavor is an available
* hardware configuration for a server. It defines the size of a virtual server that can be launched.
*/
-public data class Flavor(
+public interface Flavor : Resource {
/**
* The number of (virtual) processing cores to use.
*/
- public val cpuCount: Int,
+ public val cpuCount: Int
/**
* The amount of RAM available to the server (in MB).
*/
public val memorySize: Long
-)
+
+ /**
+ * Delete the flavor instance.
+ */
+ public suspend fun delete()
+}
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
index 280c4d89..83e63b81 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Image.kt
@@ -22,27 +22,12 @@
package org.opendc.compute.api
-import org.opendc.core.resource.Resource
-import org.opendc.core.resource.TagContainer
-import java.util.*
-
/**
* An image containing a bootable operating system that can directly be executed by physical or virtual server.
- *
- * OpenStack: A collection of files used to create or rebuild a server. Operators provide a number of pre-built OS
- * images by default. You may also create custom images from cloud servers you have launched. These custom images are
- * useful for backup purposes or for producing “gold” server images if you plan to deploy a particular server
- * configuration frequently.
*/
-public data class Image(
- public override val uid: UUID,
- public override val name: String,
- public override val tags: TagContainer
-) : Resource {
- public companion object {
- /**
- * An empty boot disk [Image] that exits immediately on start.
- */
- public val EMPTY: Image = Image(UUID.randomUUID(), "empty", emptyMap())
- }
+public interface Image : Resource {
+ /**
+ * Delete the image instance.
+ */
+ public suspend fun delete()
}
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt
index ca98dab0..8fbb7308 100644
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Metadata.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/InsufficientServerCapacityException.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,13 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.metal
-
-/*
- * Common metadata keys for bare-metal nodes.
- */
+package org.opendc.compute.api
/**
- * The cluster to which the node belongs.
+ * This exception is thrown to indicate that the compute service does not have enough capacity at the moment to
+ * fulfill a launch request.
*/
-public const val NODE_CLUSTER: String = "bare-metal:cluster"
+public class InsufficientServerCapacityException(override val cause: Throwable? = null) : Exception("There was insufficient capacity available to satisfy the launch request")
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt
index 64a47277..08120848 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ComputeWorkload.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Resource.kt
@@ -22,25 +22,34 @@
package org.opendc.compute.api
-import org.opendc.core.User
-import org.opendc.core.workload.Workload
import java.util.UUID
/**
- * A workload that represents a VM.
- *
- * @property uid A unique identified of this VM.
- * @property name The name of this VM.
- * @property owner The owner of the VM.
- * @property image The image of the VM.
+ * A generic resource provided by the OpenDC Compute service.
*/
-public data class ComputeWorkload(
- override val uid: UUID,
- override val name: String,
- override val owner: User,
- val image: Image
-) : Workload {
- override fun equals(other: Any?): Boolean = other is ComputeWorkload && uid == other.uid
+public interface Resource {
+ /**
+ * The unique identifier of the resource.
+ */
+ public val uid: UUID
+
+ /**
+ * The name of the resource.
+ */
+ public val name: String
+
+ /**
+ * The identifying labels attached to the resource.
+ */
+ public val labels: Map<String, String>
+
+ /**
+ * The non-identifying metadata attached to the resource.
+ */
+ public val meta: Map<String, Any>
- override fun hashCode(): Int = uid.hashCode()
+ /**
+ * Refresh the local state of the resource.
+ */
+ public suspend fun refresh()
}
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
index ab1eb860..b508a9f8 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Server.kt
@@ -22,18 +22,11 @@
package org.opendc.compute.api
-import org.opendc.core.resource.Resource
-
/**
* A stateful object representing a server instance that is running on some physical or virtual machine.
*/
public interface Server : Resource {
/**
- * The name of the server.
- */
- public override val name: String
-
- /**
* The flavor of the server.
*/
public val flavor: Flavor
@@ -44,14 +37,33 @@ public interface Server : Resource {
public val image: Image
/**
- * The tags assigned to the server.
+ * The last known state of the server.
*/
- public override val tags: Map<String, String>
+ public val state: ServerState
/**
- * The last known state of the server.
+ * Request the server to be started.
+ *
+ * This method is guaranteed to return after the request was acknowledged, but might return before the server was
+ * started.
*/
- public val state: ServerState
+ public suspend fun start()
+
+ /**
+ * Request the server to be stopped.
+ *
+ * This method is guaranteed to return after the request was acknowledged, but might return before the server was
+ * stopped.
+ */
+ public suspend fun stop()
+
+ /**
+ * Request the server to be deleted.
+ *
+ * This method is guaranteed to return after the request was acknowledged, but might return before the server was
+ * deleted.
+ */
+ public suspend fun delete()
/**
* Register the specified [ServerWatcher] to watch the state of the server.
@@ -66,9 +78,4 @@ public interface Server : Resource {
* @param watcher The watcher to de-register from the server.
*/
public fun unwatch(watcher: ServerWatcher)
-
- /**
- * Refresh the local state of the resource.
- */
- public suspend fun refresh()
}
diff --git a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
index 25d2e519..a4d7d7d7 100644
--- a/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
+++ b/simulator/opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/ServerState.kt
@@ -27,27 +27,27 @@ package org.opendc.compute.api
*/
public enum class ServerState {
/**
- * The server has not yet finished the original build process.
+ * Resources are being allocated for the instance. The instance is not running yet.
*/
- BUILD,
+ PROVISIONING,
/**
- * The server was powered down by the user.
+ * A user shut down the instance.
*/
- SHUTOFF,
+ TERMINATED,
/**
- * The server is active and running.
+ * The server instance is booting up or running.
*/
- ACTIVE,
+ RUNNING,
/**
- * The server is in error.
+ * The server is in an error state.
*/
ERROR,
/**
- * The state of the server is unknown.
+ * The server has been deleted and cannot be started later on.
*/
- UNKNOWN,
+ DELETED,
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
index 2cd91144..c3c39572 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/Host.kt
@@ -51,6 +51,11 @@ public interface Host {
public val state: HostState
/**
+ * Meta-data associated with the host.
+ */
+ public val meta: Map<String, Any>
+
+ /**
* The events emitted by the driver.
*/
public val events: Flow<HostEvent>
@@ -74,7 +79,7 @@ public interface Host {
public operator fun contains(server: Server): Boolean
/**
- * Stat the server [instance][server] if it is currently not running on this host.
+ * Start the server [instance][server] if it is currently not running on this host.
*
* @throws IllegalArgumentException if the server is not present on the host.
*/
@@ -88,9 +93,9 @@ public interface Host {
public suspend fun stop(server: Server)
/**
- * Terminate the specified [instance][server] on this host and cleanup all resources associated with it.
+ * Delete the specified [instance][server] on this host and cleanup all resources associated with it.
*/
- public suspend fun terminate(server: Server)
+ public suspend fun delete(server: Server)
/**
* Add a [HostListener] to this host.
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt
index 6548767e..29f10e27 100644
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/ProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientFlavor.kt
@@ -20,45 +20,43 @@
* SOFTWARE.
*/
-package org.opendc.metal.service
+package org.opendc.compute.service.internal
-import org.opendc.compute.api.Image
-import org.opendc.core.services.AbstractServiceKey
-import org.opendc.metal.Node
-import org.opendc.metal.driver.BareMetalDriver
+import org.opendc.compute.api.Flavor
import java.util.UUID
/**
- * A cloud platform service for provisioning bare-metal compute nodes on the platform.
+ * A [Flavor] implementation that is passed to clients but delegates its implementation to another class.
*/
-public interface ProvisioningService {
- /**
- * Create a new bare-metal compute node.
- */
- public suspend fun create(driver: BareMetalDriver): Node
+internal class ClientFlavor(private val delegate: Flavor) : Flavor {
+ override val uid: UUID = delegate.uid
- /**
- * Obtain the available nodes.
- */
- public suspend fun nodes(): Set<Node>
+ override var name: String = delegate.name
+ private set
- /**
- * Refresh the state of a compute node.
- */
- public suspend fun refresh(node: Node): Node
+ override var cpuCount: Int = delegate.cpuCount
+ private set
- /**
- * Deploy the specified [Image] on a compute node.
- */
- public suspend fun deploy(node: Node, image: Image): Node
+ override var memorySize: Long = delegate.memorySize
+ private set
- /**
- * Stop the specified [Node] .
- */
- public suspend fun stop(node: Node): Node
+ override var labels: Map<String, String> = delegate.labels.toMap()
+ private set
- /**
- * The service key of this service.
- */
- public companion object Key : AbstractServiceKey<ProvisioningService>(UUID.randomUUID(), "provisioner")
+ override var meta: Map<String, Any> = delegate.meta.toMap()
+ private set
+
+ override suspend fun delete() {
+ delegate.delete()
+ }
+
+ override suspend fun refresh() {
+ delegate.refresh()
+
+ name = delegate.name
+ cpuCount = delegate.cpuCount
+ memorySize = delegate.memorySize
+ labels = delegate.labels
+ meta = delegate.meta
+ }
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
index b24f80da..6c5b2ab0 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientImage.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,31 +20,36 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.compute.service.internal
-import kotlinx.coroutines.flow.Flow
-import org.opendc.core.services.AbstractServiceKey
-import org.opendc.workflows.workload.Job
+import org.opendc.compute.api.Image
import java.util.*
/**
- * A service for cloud workflow management.
- *
- * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al.
+ * An [Image] implementation that is passed to clients but delegates its implementation to another class.
*/
-public interface WorkflowService {
- /**
- * The events emitted by the workflow scheduler.
- */
- public val events: Flow<WorkflowEvent>
-
- /**
- * Submit the specified [Job] to the workflow service for scheduling.
- */
- public suspend fun submit(job: Job)
-
- /**
- * The service key for the workflow scheduler.
- */
- public companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows")
+internal class ClientImage(private val delegate: Image) : Image {
+ override val uid: UUID = delegate.uid
+
+ override var name: String = delegate.name
+ private set
+
+ override var labels: Map<String, String> = delegate.labels.toMap()
+ private set
+
+ override var meta: Map<String, Any> = delegate.meta.toMap()
+ private set
+
+ override suspend fun delete() {
+ delegate.delete()
+ refresh()
+ }
+
+ override suspend fun refresh() {
+ delegate.refresh()
+
+ name = delegate.name
+ labels = delegate.labels
+ meta = delegate.meta
+ }
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
index f84b7435..ae4cee3b 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ClientServer.kt
@@ -46,12 +46,30 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
override var image: Image = delegate.image
private set
- override var tags: Map<String, String> = delegate.tags.toMap()
+ override var labels: Map<String, String> = delegate.labels.toMap()
+ private set
+
+ override var meta: Map<String, Any> = delegate.meta.toMap()
private set
override var state: ServerState = delegate.state
private set
+ override suspend fun start() {
+ delegate.start()
+ refresh()
+ }
+
+ override suspend fun stop() {
+ delegate.stop()
+ refresh()
+ }
+
+ override suspend fun delete() {
+ delegate.delete()
+ refresh()
+ }
+
override fun watch(watcher: ServerWatcher) {
if (watchers.isEmpty()) {
delegate.watch(this)
@@ -69,10 +87,13 @@ internal class ClientServer(private val delegate: Server) : Server, ServerWatche
}
override suspend fun refresh() {
+ delegate.refresh()
+
name = delegate.name
flavor = delegate.flavor
image = delegate.image
- tags = delegate.tags
+ labels = delegate.labels
+ meta = delegate.meta
state = delegate.state
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 69d6bb59..2c38f7cb 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -22,11 +22,8 @@
package org.opendc.compute.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.cancel
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
@@ -41,9 +38,7 @@ import org.opendc.utils.TimerScheduler
import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.resume
import kotlin.math.max
/**
@@ -87,12 +82,27 @@ public class ComputeServiceImpl(
/**
* The servers that should be launched by the service.
*/
- private val queue: Deque<LaunchRequest> = ArrayDeque()
+ private val queue: Deque<SchedulingRequest> = ArrayDeque()
/**
* The active servers in the system.
*/
- private val activeServers: MutableSet<Server> = mutableSetOf()
+ private val activeServers: MutableMap<Server, Host> = mutableMapOf()
+
+ /**
+ * The registered flavors for this compute service.
+ */
+ internal val flavors = mutableMapOf<UUID, InternalFlavor>()
+
+ /**
+ * The registered images for this compute service.
+ */
+ internal val images = mutableMapOf<UUID, InternalImage>()
+
+ /**
+ * The registered servers for this compute service.
+ */
+ private val servers = mutableMapOf<UUID, InternalServer>()
public var submittedVms: Int = 0
public var queuedVms: Int = 0
@@ -126,7 +136,74 @@ public class ComputeServiceImpl(
override fun newClient(): ComputeClient = object : ComputeClient {
private var isClosed: Boolean = false
- override suspend fun newServer(name: String, image: Image, flavor: Flavor): Server {
+ override suspend fun queryFlavors(): List<Flavor> {
+ check(!isClosed) { "Client is already closed" }
+
+ return flavors.values.map { ClientFlavor(it) }
+ }
+
+ override suspend fun findFlavor(id: UUID): Flavor? {
+ check(!isClosed) { "Client is already closed" }
+
+ return flavors[id]?.let { ClientFlavor(it) }
+ }
+
+ override suspend fun newFlavor(
+ name: String,
+ cpuCount: Int,
+ memorySize: Long,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+ ): Flavor {
+ check(!isClosed) { "Client is already closed" }
+
+ val uid = UUID(clock.millis(), random.nextLong())
+ val flavor = InternalFlavor(
+ this@ComputeServiceImpl,
+ uid,
+ name,
+ cpuCount,
+ memorySize,
+ labels,
+ meta
+ )
+
+ flavors[uid] = flavor
+
+ return ClientFlavor(flavor)
+ }
+
+ override suspend fun queryImages(): List<Image> {
+ check(!isClosed) { "Client is already closed" }
+
+ return images.values.map { ClientImage(it) }
+ }
+
+ override suspend fun findImage(id: UUID): Image? {
+ check(!isClosed) { "Client is already closed" }
+
+ return images[id]?.let { ClientImage(it) }
+ }
+
+ override suspend fun newImage(name: String, labels: Map<String, String>, meta: Map<String, Any>): Image {
+ check(!isClosed) { "Client is already closed" }
+
+ val uid = UUID(clock.millis(), random.nextLong())
+ val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta)
+
+ images[uid] = image
+
+ return ClientImage(image)
+ }
+
+ override suspend fun newServer(
+ name: String,
+ image: Image,
+ flavor: Flavor,
+ labels: Map<String, String>,
+ meta: Map<String, Any>,
+ start: Boolean
+ ): Server {
check(!isClosed) { "Client is closed" }
tracer.commit(VmSubmissionEvent(name, image, flavor))
@@ -143,11 +220,36 @@ public class ComputeServiceImpl(
)
)
- return suspendCancellableCoroutine { cont ->
- val request = LaunchRequest(createServer(name, image, flavor), cont)
- queue += request
- requestCycle()
+ val uid = UUID(clock.millis(), random.nextLong())
+ val server = InternalServer(
+ this@ComputeServiceImpl,
+ uid,
+ name,
+ flavor,
+ image,
+ labels.toMutableMap(),
+ meta.toMutableMap()
+ )
+
+ servers[uid] = server
+
+ if (start) {
+ server.start()
}
+
+ return ClientServer(server)
+ }
+
+ override suspend fun findServer(id: UUID): Server? {
+ check(!isClosed) { "Client is already closed" }
+
+ return servers[id]?.let { ClientServer(it) }
+ }
+
+ override suspend fun queryServers(): List<Server> {
+ check(!isClosed) { "Client is already closed" }
+
+ return servers.values.map { ClientServer(it) }
}
override fun close() {
@@ -183,22 +285,31 @@ public class ComputeServiceImpl(
scope.cancel()
}
- private fun createServer(
- name: String,
- image: Image,
- flavor: Flavor
- ): Server {
- return ServerImpl(
- uid = UUID(random.nextLong(), random.nextLong()),
- name = name,
- flavor = flavor,
- image = image
- )
+ internal fun schedule(server: InternalServer) {
+ logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
+
+ queue.add(SchedulingRequest(server))
+ requestSchedulingCycle()
+ }
+
+ internal fun delete(flavor: InternalFlavor) {
+ checkNotNull(flavors.remove(flavor.uid)) { "Flavor was not known" }
}
- private fun requestCycle() {
- // Bail out in case we have already requested a new cycle.
- if (scheduler.isTimerActive(Unit)) {
+ internal fun delete(image: InternalImage) {
+ checkNotNull(images.remove(image.uid)) { "Image was not known" }
+ }
+
+ internal fun delete(server: InternalServer) {
+ checkNotNull(servers.remove(server.uid)) { "Server was not known" }
+ }
+
+ /**
+ * Indicate that a new scheduling cycle is needed due to a change to the service's state.
+ */
+ private fun requestSchedulingCycle() {
+ // Bail out in case we have already requested a new cycle or the queue is empty.
+ if (scheduler.isTimerActive(Unit) || queue.isEmpty()) {
return
}
@@ -208,20 +319,28 @@ public class ComputeServiceImpl(
val delay = schedulingQuantum - (clock.millis() % schedulingQuantum)
scheduler.startSingleTimer(Unit, delay) {
- schedule()
+ doSchedule()
}
}
- private fun schedule() {
+ /**
+ * Run a single scheduling iteration.
+ */
+ private fun doSchedule() {
while (queue.isNotEmpty()) {
- val (server, cont) = queue.peekFirst()
- val requiredMemory = server.flavor.memorySize
- val selectedHv = allocationLogic.select(availableHosts, server)
+ val request = queue.peek()
- if (selectedHv == null || !selectedHv.host.canFit(server)) {
+ if (request.isCancelled) {
+ queue.poll()
+ continue
+ }
+
+ val server = request.server
+ val hv = allocationLogic.select(availableHosts, request.server)
+ if (hv == null || !hv.host.canFit(server)) {
logger.trace { "Server $server selected for scheduling but no capacity available for it." }
- if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) {
+ if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
tracer.commit(VmSubmissionInvalidEvent(server.name))
_events.emit(
@@ -247,45 +366,62 @@ public class ComputeServiceImpl(
}
}
- logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.host.uid} ${selectedHv.host.name} ${selectedHv.host.model}" }
- queue.poll()
-
- // Speculatively update the hypervisor view information to prevent other images in the queue from
- // deciding on stale values.
- selectedHv.numberOfActiveServers++
- selectedHv.provisionedCores += server.flavor.cpuCount
- selectedHv.availableMemory -= requiredMemory // XXX Temporary hack
+ val host = hv.host
- scope.launch {
- try {
- cont.resume(ClientServer(server))
- selectedHv.host.spawn(server)
- activeServers += server
+ // Remove request from queue
+ queue.poll()
- tracer.commit(VmScheduledEvent(server.name))
- _events.emit(
- ComputeServiceEvent.MetricsAvailable(
- this@ComputeServiceImpl,
- hostCount,
- availableHosts.size,
- submittedVms,
- ++runningVms,
- finishedVms,
- --queuedVms,
- unscheduledVms
+ logger.info { "Assigned server $server to host $host." }
+ try {
+ // Speculatively update the hypervisor view information to prevent other images in the queue from
+ // deciding on stale values.
+ hv.numberOfActiveServers++
+ hv.provisionedCores += server.flavor.cpuCount
+ hv.availableMemory -= server.flavor.memorySize // XXX Temporary hack
+
+ scope.launch {
+ try {
+ server.assignHost(host)
+ host.spawn(server)
+ activeServers[server] = host
+
+ tracer.commit(VmScheduledEvent(server.name))
+ _events.emit(
+ ComputeServiceEvent.MetricsAvailable(
+ this@ComputeServiceImpl,
+ hostCount,
+ availableHosts.size,
+ submittedVms,
+ ++runningVms,
+ finishedVms,
+ --queuedVms,
+ unscheduledVms
+ )
)
- )
- } catch (e: Throwable) {
- logger.error("Failed to deploy VM", e)
+ } catch (e: Throwable) {
+ logger.error("Failed to deploy VM", e)
- selectedHv.numberOfActiveServers--
- selectedHv.provisionedCores -= server.flavor.cpuCount
- selectedHv.availableMemory += requiredMemory
+ hv.numberOfActiveServers--
+ hv.provisionedCores -= server.flavor.cpuCount
+ hv.availableMemory += server.flavor.memorySize
+ }
}
+ } catch (e: Exception) {
+ logger.warn(e) { "Failed to assign server $server to $host. " }
}
}
}
+ /**
+ * A request to schedule an [InternalServer] onto one of the [Host]s.
+ */
+ private data class SchedulingRequest(val server: InternalServer) {
+ /**
+ * A flag to indicate that the request is cancelled.
+ */
+ var isCancelled: Boolean = false
+ }
+
override fun onStateChanged(host: Host, newState: HostState) {
when (newState) {
HostState.UP -> {
@@ -313,9 +449,7 @@ public class ComputeServiceImpl(
)
// Re-schedule on the new machine
- if (queue.isNotEmpty()) {
- requestCycle()
- }
+ requestSchedulingCycle()
}
HostState.DOWN -> {
logger.debug { "[${clock.millis()}] Host ${host.uid} state changed: $newState" }
@@ -338,19 +472,23 @@ public class ComputeServiceImpl(
)
)
- if (queue.isNotEmpty()) {
- requestCycle()
- }
+ requestSchedulingCycle()
}
}
}
override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
- val serverImpl = server as ServerImpl
- serverImpl.state = newState
- serverImpl.watchers.forEach { it.onStateChanged(server, newState) }
+ require(server is InternalServer) { "Invalid server type passed to service" }
- if (newState == ServerState.SHUTOFF) {
+ if (server.host != host) {
+ // This can happen when a server is rescheduled and started on another machine, while being deleted from
+ // the old machine.
+ return
+ }
+
+ server.state = newState
+
+ if (newState == ServerState.TERMINATED || newState == ServerState.DELETED) {
logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
tracer.commit(VmStoppedEvent(server.name))
@@ -379,36 +517,7 @@ public class ComputeServiceImpl(
}
// Try to reschedule if needed
- if (queue.isNotEmpty()) {
- requestCycle()
- }
+ requestSchedulingCycle()
}
}
-
- public data class LaunchRequest(val server: Server, val cont: Continuation<Server>)
-
- private class ServerImpl(
- override val uid: UUID,
- override val name: String,
- override val flavor: Flavor,
- override val image: Image
- ) : Server {
- val watchers = mutableListOf<ServerWatcher>()
-
- override fun watch(watcher: ServerWatcher) {
- watchers += watcher
- }
-
- override fun unwatch(watcher: ServerWatcher) {
- watchers -= watcher
- }
-
- override suspend fun refresh() {
- // No-op: this object is the source-of-truth
- }
-
- override val tags: Map<String, String> = emptyMap()
-
- override var state: ServerState = ServerState.BUILD
- }
}
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt
index 1c5c7a8d..95e280df 100644
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/Node.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalFlavor.kt
@@ -20,53 +20,45 @@
* SOFTWARE.
*/
-package org.opendc.metal
+package org.opendc.compute.service.internal
-import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.core.Identity
-import java.util.UUID
+import java.util.*
/**
- * A bare-metal compute node.
+ * Internal stateful representation of a [Flavor].
*/
-public data class Node(
- /**
- * The unique identifier of the node.
- */
- public override val uid: UUID,
+internal class InternalFlavor(
+ private val service: ComputeServiceImpl,
+ override val uid: UUID,
+ name: String,
+ cpuCount: Int,
+ memorySize: Long,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+) : Flavor {
+ override var name: String = name
+ private set
- /**
- * The optional name of the node.
- */
- public override val name: String,
+ override var cpuCount: Int = cpuCount
+ private set
- /**
- * Metadata of the node.
- */
- public val metadata: Map<String, Any>,
+ override var memorySize: Long = memorySize
+ private set
- /**
- * The last known state of the compute node.
- */
- public val state: NodeState,
+ override val labels: MutableMap<String, String> = labels.toMutableMap()
- /**
- * The flavor of the node.
- */
- public val flavor: Flavor,
+ override val meta: MutableMap<String, Any> = meta.toMutableMap()
- /**
- * The boot image of the node.
- */
- public val image: Image,
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override suspend fun delete() {
+ service.delete(this)
+ }
+
+ override fun equals(other: Any?): Boolean = other is InternalFlavor && uid == other.uid
- /**
- * The events that are emitted by the node.
- */
- public val events: Flow<NodeEvent>
-) : Identity {
override fun hashCode(): Int = uid.hashCode()
- override fun equals(other: Any?): Boolean = other is Node && uid == other.uid
}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
index 9078ecdd..86f2f6b9 100644
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceKey.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalImage.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,26 +20,35 @@
* SOFTWARE.
*/
-package org.opendc.core.services
+package org.opendc.compute.service.internal
-import org.opendc.core.Identity
+import org.opendc.compute.api.Image
import java.util.*
/**
- * An interface for identifying service implementations of the same type (providing the same service).
- *
- * @param T The shape of the messages the service responds to.
+ * Internal stateful representation of an [Image].
*/
-public interface ServiceKey<T : Any> : Identity
+internal class InternalImage(
+ private val service: ComputeServiceImpl,
+ override val uid: UUID,
+ override val name: String,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+) : Image {
+
+ override val labels: MutableMap<String, String> = labels.toMutableMap()
+
+ override val meta: MutableMap<String, Any> = meta.toMutableMap()
+
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override suspend fun delete() {
+ service.delete(this)
+ }
+
+ override fun equals(other: Any?): Boolean = other is InternalImage && uid == other.uid
-/**
- * Helper class for constructing a [ServiceKey].
- *
- * @property uid The unique identifier of the service.
- * @property name The name of the service.
- */
-public abstract class AbstractServiceKey<T : Any>(override val uid: UUID, override val name: String) : ServiceKey<T> {
- override fun equals(other: Any?): Boolean = other is ServiceKey<*> && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
- override fun toString(): String = "ServiceKey[uid=$uid, name=$name]"
}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
new file mode 100644
index 00000000..ff7c1d15
--- /dev/null
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.internal
+
+import mu.KotlinLogging
+import org.opendc.compute.api.*
+import org.opendc.compute.service.driver.Host
+import java.util.UUID
+
+/**
+ * Internal implementation of the [Server] interface.
+ */
+internal class InternalServer(
+ private val service: ComputeServiceImpl,
+ override val uid: UUID,
+ override val name: String,
+ override val flavor: Flavor,
+ override val image: Image,
+ override val labels: MutableMap<String, String>,
+ override val meta: MutableMap<String, Any>
+) : Server {
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The watchers of this server object.
+ */
+ private val watchers = mutableListOf<ServerWatcher>()
+
+ /**
+ * The [Host] that has been assigned to host the server.
+ */
+ internal var host: Host? = null
+
+ override suspend fun start() {
+ when (state) {
+ ServerState.RUNNING -> {
+ logger.debug { "User tried to start server but server is already running" }
+ return
+ }
+ ServerState.PROVISIONING -> {
+ logger.debug { "User tried to start server but request is already pending: doing nothing" }
+ return
+ }
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
+ else -> {
+ logger.info { "User requested to start server $uid" }
+ state = ServerState.PROVISIONING
+ service.schedule(this)
+ }
+ }
+ }
+
+ override suspend fun stop() {
+ when (state) {
+ ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.RUNNING, ServerState.ERROR -> {
+ val host = checkNotNull(host) { "Server not running" }
+ host.stop(this)
+ }
+ ServerState.TERMINATED -> {} // No work needed
+ ServerState.DELETED -> throw IllegalStateException("Server is terminated")
+ }
+ }
+
+ override suspend fun delete() {
+ when (state) {
+ ServerState.PROVISIONING -> {} // TODO Find way to interrupt these
+ ServerState.RUNNING -> {
+ val host = checkNotNull(host) { "Server not running" }
+ host.delete(this)
+ service.delete(this)
+ }
+ else -> {} // No work needed
+ }
+ }
+
+ override fun watch(watcher: ServerWatcher) {
+ watchers += watcher
+ }
+
+ override fun unwatch(watcher: ServerWatcher) {
+ watchers -= watcher
+ }
+
+ override suspend fun refresh() {
+ // No-op: this object is the source-of-truth
+ }
+
+ override var state: ServerState = ServerState.TERMINATED
+ set(value) {
+ if (value != field) {
+ watchers.forEach { it.onStateChanged(this, value) }
+ }
+
+ field = value
+ }
+
+ internal fun assignHost(host: Host) {
+ this.host = host
+ }
+
+ override fun equals(other: Any?): Boolean = other is InternalServer && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}
diff --git a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
index 3facb182..ac7b351d 100644
--- a/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
+++ b/simulator/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/RandomAllocationPolicy.kt
@@ -38,7 +38,7 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al
): HostView? {
return hypervisors.asIterable()
.filter { hv ->
- val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long)
+ val fitsMemory = hv.availableMemory >= (server.image.meta["required-memory"] as Long)
val fitsCpu = hv.host.model.cpuCount >= server.flavor.cpuCount
fitsMemory && fitsCpu
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index d7d5f002..31fcda2f 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -31,7 +31,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
api(project(":opendc-compute:opendc-compute-service"))
- api(project(":opendc-metal"))
api(project(":opendc-simulator:opendc-simulator-compute"))
api(project(":opendc-simulator:opendc-simulator-failures"))
implementation(project(":opendc-utils"))
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
deleted file mode 100644
index 2405a8f9..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimBareMetalDriver.kt
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Image
-import org.opendc.compute.simulator.power.api.CpuPowerModel
-import org.opendc.compute.simulator.power.api.Powerable
-import org.opendc.compute.simulator.power.models.ConstantPowerModel
-import org.opendc.metal.Node
-import org.opendc.metal.NodeEvent
-import org.opendc.metal.NodeState
-import org.opendc.metal.driver.BareMetalDriver
-import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.failures.FailureDomain
-import org.opendc.utils.flow.EventFlow
-import org.opendc.utils.flow.StateFlow
-import java.time.Clock
-import java.util.UUID
-
-/**
- * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
- *
- * @param coroutineScope The [CoroutineScope] the driver runs in.
- * @param clock The virtual clock to keep track of time.
- * @param uid The unique identifier of the machine.
- * @param name An optional name of the machine.
- * @param metadata The initial metadata of the node.
- * @param machine The machine model to simulate.
- * @param cpuPowerModel The CPU power model of this machine.
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-public class SimBareMetalDriver(
- private val coroutineScope: CoroutineScope,
- private val clock: Clock,
- uid: UUID,
- name: String,
- metadata: Map<String, Any>,
- machine: SimMachineModel,
- cpuPowerModel: CpuPowerModel = ConstantPowerModel(0.0),
-) : BareMetalDriver, FailureDomain, Powerable {
- /**
- * The flavor that corresponds to this machine.
- */
- private val flavor = Flavor(
- machine.cpus.size,
- machine.memory.map { it.size }.sum()
- )
-
- /**
- * The events of the machine.
- */
- private val events = EventFlow<NodeEvent>()
-
- /**
- * The machine state.
- */
- private val nodeState =
- StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, flavor, Image.EMPTY, events))
-
- /**
- * The [SimBareMetalMachine] we use to run the workload.
- */
- private val machine = SimBareMetalMachine(coroutineScope, clock, machine)
-
- override val node: Flow<Node> = nodeState
-
- override val usage: Flow<Double>
- get() = this.machine.usage
-
- override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this)
-
- /**
- * The [Job] that runs the simulated workload.
- */
- private var job: Job? = null
-
- override suspend fun init(): Node {
- return nodeState.value
- }
-
- override suspend fun start(): Node {
- val node = nodeState.value
- if (node.state != NodeState.SHUTOFF) {
- return node
- }
-
- val workload = node.image.tags["workload"] as SimWorkload
-
- job = coroutineScope.launch {
- delay(1) // TODO Introduce boot time
- initMachine()
- try {
- machine.run(workload, mapOf("driver" to this@SimBareMetalDriver, "node" to node))
- exitMachine(null)
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- exitMachine(cause)
- }
- }
-
- setNode(node.copy(state = NodeState.BOOT))
- return nodeState.value
- }
-
- private fun initMachine() {
- setNode(nodeState.value.copy(state = NodeState.ACTIVE))
- }
-
- private fun exitMachine(cause: Throwable?) {
- val newNodeState =
- if (cause == null)
- NodeState.SHUTOFF
- else
- NodeState.ERROR
- setNode(nodeState.value.copy(state = newNodeState))
- }
-
- override suspend fun stop(): Node {
- val node = nodeState.value
- if (node.state == NodeState.SHUTOFF) {
- return node
- }
-
- job?.cancelAndJoin()
- setNode(node.copy(state = NodeState.SHUTOFF))
- return node
- }
-
- override suspend fun reboot(): Node {
- stop()
- return start()
- }
-
- override suspend fun setImage(image: Image): Node {
- setNode(nodeState.value.copy(image = image))
- return nodeState.value
- }
-
- override suspend fun refresh(): Node = nodeState.value
-
- private fun setNode(value: Node) {
- val field = nodeState.value
- if (field.state != value.state) {
- events.emit(NodeEvent.StateChanged(value, field.state))
- }
-
- nodeState.value = value
- }
-
- override val scope: CoroutineScope
- get() = coroutineScope
-
- override suspend fun fail() {
- setNode(nodeState.value.copy(state = NodeState.ERROR))
- }
-
- override suspend fun recover() {
- setNode(nodeState.value.copy(state = NodeState.ACTIVE))
- }
-
- override fun toString(): String = "SimBareMetalDriver(node = ${nodeState.value.uid})"
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index fd547d3d..19fa3e97 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -29,34 +29,43 @@ import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.driver.*
-import org.opendc.metal.Node
+import org.opendc.compute.simulator.power.api.CpuPowerModel
+import org.opendc.compute.simulator.power.api.Powerable
+import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.simulator.compute.*
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.workload.SimResourceCommand
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.failures.FailureDomain
import org.opendc.utils.flow.EventFlow
+import java.time.Clock
import java.util.*
+import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
*/
public class SimHost(
- public val node: Node,
- private val coroutineScope: CoroutineScope,
- hypervisor: SimHypervisorProvider
-) : Host, SimWorkload {
+ override val uid: UUID,
+ override val name: String,
+ model: SimMachineModel,
+ override val meta: Map<String, Any>,
+ context: CoroutineContext,
+ clock: Clock,
+ hypervisor: SimHypervisorProvider,
+ cpuPowerModel: CpuPowerModel = ConstantPowerModel(0.0),
+ private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper()
+) : Host, FailureDomain, Powerable, AutoCloseable {
/**
- * The logger instance of this server.
+ * The [CoroutineScope] of the host bounded by the lifecycle of the host.
*/
- private val logger = KotlinLogging.logger {}
+ override val scope: CoroutineScope = CoroutineScope(context)
/**
- * The execution context in which the [Host] runs.
+ * The logger instance of this server.
*/
- private lateinit var ctx: SimExecutionContext
+ private val logger = KotlinLogging.logger {}
override val events: Flow<HostEvent>
get() = _events
@@ -70,12 +79,17 @@ public class SimHost(
/**
* Current total memory use of the images on this hypervisor.
*/
- private var availableMemory: Long = 0
+ private var availableMemory: Long = model.memory.map { it.size }.sum()
+
+ /**
+ * The machine to run on.
+ */
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(scope, clock, model)
/**
* The hypervisor to run multiple workloads.
*/
- private val hypervisor = hypervisor.create(
+ public val hypervisor: SimHypervisor = hypervisor.create(
object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
@@ -107,26 +121,40 @@ public class SimHost(
*/
private val guests = HashMap<Server, Guest>()
- override val uid: UUID
- get() = node.uid
-
- override val name: String
- get() = node.name
-
- override val model: HostModel
- get() = HostModel(node.flavor.cpuCount, node.flavor.memorySize)
-
override val state: HostState
get() = _state
- private var _state: HostState = HostState.UP
+ private var _state: HostState = HostState.DOWN
set(value) {
- listeners.forEach { it.onStateChanged(this, value) }
+ if (value != field) {
+ listeners.forEach { it.onStateChanged(this, value) }
+ }
field = value
}
+ override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
+
+ override val powerDraw: Flow<Double> = cpuPowerModel.getPowerDraw(this)
+
+ init {
+ // Launch hypervisor onto machine
+ scope.launch {
+ try {
+ _state = HostState.UP
+ machine.run(this@SimHost.hypervisor, emptyMap())
+ } catch (_: CancellationException) {
+ // Ignored
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Host failed" }
+ throw cause
+ } finally {
+ _state = HostState.DOWN
+ }
+ }
+ }
+
override fun canFit(server: Server): Boolean {
val sufficientMemory = availableMemory > server.flavor.memorySize
- val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount
+ val enoughCpus = machine.model.cpus.size >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
return sufficientMemory && enoughCpus && canFit
@@ -146,7 +174,7 @@ public class SimHost(
guest.start()
}
- _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override fun contains(server: Server): Boolean {
@@ -163,7 +191,7 @@ public class SimHost(
guest.stop()
}
- override suspend fun terminate(server: Server) {
+ override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
guest.terminate()
}
@@ -176,11 +204,16 @@ public class SimHost(
listeners.remove(listener)
}
+ override fun close() {
+ scope.cancel()
+ _state = HostState.DOWN
+ }
+
/**
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): SimMachineModel {
- val originalCpu = ctx.machine.cpus[0]
+ val originalCpu = machine.model.cpus[0]
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
@@ -190,7 +223,7 @@ public class SimHost(
private fun onGuestStart(vm: Guest) {
guests.forEach { _, guest ->
- if (guest.state == ServerState.ACTIVE) {
+ if (guest.state == ServerState.RUNNING) {
vm.performanceInterferenceModel?.onStart(vm.server.image.name)
}
}
@@ -200,58 +233,71 @@ public class SimHost(
private fun onGuestStop(vm: Guest) {
guests.forEach { _, guest ->
- if (guest.state == ServerState.ACTIVE) {
+ if (guest.state == ServerState.RUNNING) {
vm.performanceInterferenceModel?.onStop(vm.server.image.name)
}
}
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
- _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory))
+ _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
+ }
+
+ override suspend fun fail() {
+ _state = HostState.DOWN
+ }
+
+ override suspend fun recover() {
+ _state = HostState.UP
}
/**
* A virtual machine instance that the driver manages.
*/
private inner class Guest(val server: Server, val machine: SimMachine) {
- val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ val performanceInterferenceModel: PerformanceInterferenceModel? = server.meta[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
- var state: ServerState = ServerState.SHUTOFF
+ var state: ServerState = ServerState.TERMINATED
suspend fun start() {
when (state) {
- ServerState.SHUTOFF -> {
+ ServerState.TERMINATED -> {
logger.info { "User requested to start server ${server.uid}" }
launch()
}
- ServerState.ACTIVE -> return
+ ServerState.RUNNING -> return
+ ServerState.DELETED -> {
+ logger.warn { "User tried to start terminated server" }
+ throw IllegalArgumentException("Server is terminated")
+ }
else -> assert(false) { "Invalid state transition" }
}
}
suspend fun stop() {
when (state) {
- ServerState.ACTIVE, ServerState.ERROR -> {
+ ServerState.RUNNING, ServerState.ERROR -> {
val job = job ?: throw IllegalStateException("Server should be active")
job.cancel()
job.join()
}
- ServerState.SHUTOFF -> return
+ ServerState.TERMINATED, ServerState.DELETED -> return
else -> assert(false) { "Invalid state transition" }
}
}
suspend fun terminate() {
stop()
+ state = ServerState.DELETED
}
private var job: Job? = null
private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont ->
assert(job == null) { "Concurrent job running" }
- val workload = server.image.tags["workload"] as SimWorkload
+ val workload = mapper.createWorkload(server)
- val job = coroutineScope.launch {
+ val job = scope.launch {
delay(1) // TODO Introduce boot time
init()
cont.resume(Unit)
@@ -271,14 +317,14 @@ public class SimHost(
}
private fun init() {
- state = ServerState.ACTIVE
+ state = ServerState.RUNNING
onGuestStart(this)
}
private fun exit(cause: Throwable?) {
state =
if (cause == null)
- ServerState.SHUTOFF
+ ServerState.TERMINATED
else
ServerState.ERROR
@@ -286,18 +332,4 @@ public class SimHost(
onGuestStop(this)
}
}
-
- override fun onStart(ctx: SimExecutionContext) {
- this.ctx = ctx
- this.availableMemory = ctx.machine.memory.map { it.size }.sum()
- this.hypervisor.onStart(ctx)
- }
-
- override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
- return hypervisor.onStart(ctx, cpu)
- }
-
- override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
- return hypervisor.onNext(ctx, cpu, remainingWork)
- }
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt
deleted file mode 100644
index bb03777b..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHostProvisioner.kt
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.*
-import org.opendc.compute.api.Image
-import org.opendc.compute.service.driver.Host
-import org.opendc.metal.Node
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.simulator.compute.SimHypervisorProvider
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A helper class to provision [SimHost]s on top of bare-metal machines using the [ProvisioningService].
- *
- * @param context The [CoroutineContext] to use.
- * @param metal The [ProvisioningService] to use.
- * @param hypervisor The type of hypervisor to use.
- */
-public class SimHostProvisioner(
- private val context: CoroutineContext,
- private val metal: ProvisioningService,
- private val hypervisor: SimHypervisorProvider
-) : AutoCloseable {
- /**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context)
-
- /**
- * Provision all machines with a host.
- */
- public suspend fun provisionAll(): List<Host> = coroutineScope {
- metal.nodes().map { node -> async { provision(node) } }.awaitAll()
- }
-
- /**
- * Provision the specified [Node].
- */
- public suspend fun provision(node: Node): Host = coroutineScope {
- val host = SimHost(node, scope, hypervisor)
- metal.deploy(node, Image(node.uid, node.name, mapOf("workload" to host)))
- host
- }
-
- override fun close() {
- scope.cancel()
- }
-}
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt
index 30ce423c..c05f1a2c 100644
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeEvent.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimMetaWorkloadMapper.kt
@@ -20,22 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.metal
+package org.opendc.compute.simulator
+
+import org.opendc.compute.api.Server
+import org.opendc.simulator.compute.workload.SimWorkload
/**
- * An event that is emitted by a [Node].
+ * A [SimWorkloadMapper] that maps a [Server] to a workload via the meta-data.
*/
-public sealed class NodeEvent {
- /**
- * The node that emitted the event.
- */
- public abstract val node: Node
-
- /**
- * This event is emitted when the state of [node] changes.
- *
- * @property node The node of which the state changed.
- * @property previousState The previous state of the node.
- */
- public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent()
+public class SimMetaWorkloadMapper(private val key: String = "workload") : SimWorkloadMapper {
+ override fun createWorkload(server: Server): SimWorkload {
+ return requireNotNull(server.meta[key] ?: server.image.meta[key]) as SimWorkload
+ }
}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt
index 5bb2c2ce..7082c5cf 100644
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/Resource.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimWorkloadMapper.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.core.resource
+package org.opendc.compute.simulator
-import org.opendc.core.Identity
+import org.opendc.compute.api.Server
+import org.opendc.simulator.compute.workload.SimWorkload
/**
- * Represents a generic cloud resource.
+ * A [SimWorkloadMapper] is responsible for mapping a [Server] and [Image] to a [SimWorkload] that can be simulated.
*/
-public interface Resource : Identity {
+public fun interface SimWorkloadMapper {
/**
- * The tags of this cloud resource.
+ * Map the specified [server] to a [SimWorkload] that can be simulated.
*/
- public val tags: TagContainer
+ public fun createWorkload(server: Server): SimWorkload
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt
index 0141bc8c..604b69c0 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/power/api/CpuPowerModel.kt
@@ -2,7 +2,7 @@ package org.opendc.compute.simulator.power.api
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
-import org.opendc.metal.driver.BareMetalDriver
+import org.opendc.compute.simulator.SimHost
public interface CpuPowerModel {
/**
@@ -18,14 +18,14 @@ public interface CpuPowerModel {
/**
* Emits the values of power consumption for servers.
*
- * @param driver A [BareMetalDriver] that offers host CPU utilization.
+ * @param host A [SimHost] that offers host CPU utilization.
* @param withoutIdle A [Boolean] flag indicates whether (false) add a constant
* power consumption value when the server is idle or (true) not
* with a default value being false.
* @return A [Flow] of values representing the server power draw.
*/
- public fun getPowerDraw(driver: BareMetalDriver, withoutIdle: Boolean = false): Flow<Double> =
- driver.usage.map {
+ public fun getPowerDraw(host: SimHost, withoutIdle: Boolean = false): Flow<Double> =
+ host.machine.usage.map {
computeCpuPower(it)
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
deleted file mode 100644
index 0d90376e..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimBareMetalDriverTest.kt
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import kotlinx.coroutines.withContext
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.Image
-import org.opendc.metal.NodeEvent
-import org.opendc.metal.NodeState
-import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.util.UUID
-
-@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimBareMetalDriverTest {
- private lateinit var machineModel: SimMachineModel
-
- @BeforeEach
- fun setUp() {
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
-
- machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 2000.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
- )
- }
-
- @Test
- fun testFlopsWorkload() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
-
- var finalState: NodeState = NodeState.UNKNOWN
- var finalTime = 0L
-
- testScope.launch {
- val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
- val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to SimFlopsWorkload(4_000, utilization = 1.0)))
- // Batch driver commands
- withContext(coroutineContext) {
- driver.init()
- driver.setImage(image)
- val node = driver.start()
- node.events.collect { event ->
- when (event) {
- is NodeEvent.StateChanged -> {
- finalState = event.node.state
- finalTime = clock.millis()
- }
- }
- }
- }
- }
-
- testScope.advanceUntilIdle()
- assertAll(
- { assertEquals(NodeState.SHUTOFF, finalState) },
- { assertEquals(501, finalTime) }
- )
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 61bff39f..e1a1d87e 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -24,7 +24,6 @@ package org.opendc.compute.simulator
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@@ -39,8 +38,6 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.driver.HostEvent
-import org.opendc.metal.Node
-import org.opendc.metal.NodeState
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -82,18 +79,13 @@ internal class SimHostTest {
var grantedWork = 0L
var overcommittedWork = 0L
- val node = Node(
- UUID.randomUUID(), "name", emptyMap(), NodeState.SHUTOFF,
- Flavor(machineModel.cpus.size, machineModel.memory.map { it.size }.sum()), Image.EMPTY, emptyFlow()
- )
-
scope.launch {
- val virtDriver = SimHost(node, this, SimFairShareHypervisorProvider())
- val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver))
+ val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider())
val duration = 5 * 60L
- val vmImageA = Image(
+ val vmImageA = MockImage(
UUID.randomUUID(),
"<unnamed>",
+ emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
sequenceOf(
@@ -105,9 +97,10 @@ internal class SimHostTest {
)
)
)
- val vmImageB = Image(
+ val vmImageB = MockImage(
UUID.randomUUID(),
"<unnamed>",
+ emptyMap(),
mapOf(
"workload" to SimTraceWorkload(
sequenceOf(
@@ -120,16 +113,9 @@ internal class SimHostTest {
)
)
- val metalDriver =
- SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
-
- metalDriver.init()
- metalDriver.setImage(vmm)
- metalDriver.start()
-
delay(5)
- val flavor = Flavor(2, 0)
+ val flavor = MockFlavor(2, 0)
virtDriver.events
.onEach { event ->
when (event) {
@@ -157,14 +143,56 @@ internal class SimHostTest {
)
}
+ private class MockFlavor(
+ override val cpuCount: Int,
+ override val memorySize: Long
+ ) : Flavor {
+ override val uid: UUID = UUID.randomUUID()
+ override val name: String = "test"
+ override val labels: Map<String, String> = emptyMap()
+ override val meta: Map<String, Any> = emptyMap()
+
+ override suspend fun delete() {
+ throw NotImplementedError()
+ }
+
+ override suspend fun refresh() {
+ throw NotImplementedError()
+ }
+ }
+
+ private class MockImage(
+ override val uid: UUID,
+ override val name: String,
+ override val labels: Map<String, String>,
+ override val meta: Map<String, Any>
+ ) : Image {
+ override suspend fun delete() {
+ throw NotImplementedError()
+ }
+
+ override suspend fun refresh() {
+ throw NotImplementedError()
+ }
+ }
+
private class MockServer(
override val uid: UUID,
override val name: String,
override val flavor: Flavor,
override val image: Image
) : Server {
- override val tags: Map<String, String> = emptyMap()
- override val state: ServerState = ServerState.BUILD
+ override val labels: Map<String, String> = emptyMap()
+
+ override val meta: Map<String, Any> = emptyMap()
+
+ override val state: ServerState = ServerState.TERMINATED
+
+ override suspend fun start() {}
+
+ override suspend fun stop() {}
+
+ override suspend fun delete() {}
override fun watch(watcher: ServerWatcher) {}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
deleted file mode 100644
index 33b3db94..00000000
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimProvisioningServiceTest.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.simulator
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.collect
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-import org.opendc.compute.api.Image
-import org.opendc.metal.service.SimpleProvisioningService
-import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.compute.model.ProcessingNode
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.util.UUID
-
-/**
- * Test suite for the [SimpleProvisioningService].
- */
-@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimProvisioningServiceTest {
- private lateinit var machineModel: SimMachineModel
-
- @BeforeEach
- fun setUp() {
- val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
-
- machineModel = SimMachineModel(
- cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 2000.0) },
- memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
- )
- }
-
- /**
- * A basic smoke test.
- */
- @Test
- fun testSmoke() {
- val testScope = TestCoroutineScope()
- val clock = DelayControllerClockAdapter(testScope)
-
- testScope.launch {
- val image = Image(UUID.randomUUID(), "<unnamed>", mapOf("machine" to SimFlopsWorkload(1000)))
- val driver = SimBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), machineModel)
-
- val provisioner = SimpleProvisioningService()
- provisioner.create(driver)
- delay(5)
- val nodes = provisioner.nodes()
- val node = provisioner.deploy(nodes.first(), image)
- node.events.collect { println(it) }
- }
-
- testScope.advanceUntilIdle()
- }
-}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt
index d4d88fb1..9d034a5d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/power/CpuPowerModelTest.kt
@@ -1,18 +1,21 @@
package org.opendc.compute.simulator.power
import io.mockk.*
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
+import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.power.api.CpuPowerModel
import org.opendc.compute.simulator.power.models.*
-import org.opendc.metal.driver.BareMetalDriver
+import org.opendc.simulator.compute.SimBareMetalMachine
import java.util.stream.Stream
import kotlin.math.pow
+@OptIn(ExperimentalCoroutinesApi::class)
internal class CpuPowerModelTest {
private val epsilon = 10.0.pow(-3)
private val cpuUtil = .9
@@ -44,21 +47,24 @@ internal class CpuPowerModelTest {
powerModel: CpuPowerModel,
expectedPowerConsumption: Double
) {
- val cpuLoads = flowOf(cpuUtil, cpuUtil, cpuUtil)
- val bareMetalDriver = mockkClass(BareMetalDriver::class)
- every { bareMetalDriver.usage } returns cpuLoads
+ runBlockingTest {
+ val cpuLoads = flowOf(cpuUtil, cpuUtil, cpuUtil).stateIn(this)
+ val bareMetalDriver = mockkClass(SimHost::class)
+ val machine = mockkClass(SimBareMetalMachine::class)
+ every { bareMetalDriver.machine } returns machine
+ every { machine.usage } returns cpuLoads
- runBlocking {
val serverPowerDraw = powerModel.getPowerDraw(bareMetalDriver)
- assertEquals(serverPowerDraw.count(), cpuLoads.count())
assertEquals(
serverPowerDraw.first().toDouble(),
flowOf(expectedPowerConsumption).first().toDouble(),
epsilon
)
+
+ verify(exactly = 1) { bareMetalDriver.machine }
+ verify(exactly = 1) { machine.usage }
}
- verify(exactly = 1) { bareMetalDriver.usage }
}
@Suppress("unused")
diff --git a/simulator/opendc-core/build.gradle.kts b/simulator/opendc-core/build.gradle.kts
deleted file mode 100644
index 7e1a4b97..00000000
--- a/simulator/opendc-core/build.gradle.kts
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2017 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Base model for datacenter simulation"
-
-/* Build configuration */
-plugins {
- `kotlin-library-conventions`
-}
-
-dependencies {
- api(platform(project(":opendc-platform")))
- api("org.jetbrains.kotlinx:kotlinx-coroutines-core")
-}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt
deleted file mode 100644
index a5055cff..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Environment.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core
-
-/**
- * A description of a large-scale computing environment. This description includes including key size and topology
- * information of the environment, types of resources, but also various operational and management rules such as
- * scheduled maintenance, allocation and other constraints.
- *
- * @property name The name of the environment.
- * @property description A small textual description about the environment that is being modeled.
- * @property platforms The cloud platforms (such as AWS or GCE) in this environment.
- */
-public data class Environment(val name: String, val description: String?, val platforms: List<Platform>)
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt
deleted file mode 100644
index 5550ffed..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Platform.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core
-
-import java.util.*
-
-/**
- * A representation of a cloud platform such as Amazon Web Services (AWS), Microsoft Azure or Google Cloud.
- *
- * @property uid The unique identifier of this topology.
- * @property name the name of the platform.
- * @property zones The availability zones available on this platform.
- */
-public data class Platform(override val uid: UUID, override val name: String, val zones: List<Zone>) : Identity
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt
deleted file mode 100644
index 834f6cf2..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Zone.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core
-
-import org.opendc.core.services.ServiceRegistry
-import java.util.*
-
-/**
- * An isolated location within a topology region from which public cloud services operate, roughly equivalent to a
- * single topology. Zones contain one or more clusters and secondary storage.
- *
- * This class models *only* the static information of a zone, with dynamic information being contained within the zone's
- * actor. During runtime, it's actor acts as a registry for all the cloud services provided by the zone.
- *
- * @property uid The unique identifier of this availability zone.
- * @property name The name of the zone within its platform.
- * @property services The service registry containing the services of the zone.
- */
-public data class Zone(
- override val uid: UUID,
- override val name: String,
- val services: ServiceRegistry
-) : Identity {
- override fun equals(other: Any?): Boolean = other is Zone && uid == other.uid
- override fun hashCode(): Int = uid.hashCode()
-}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt
deleted file mode 100644
index 6a4ff102..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/resource/TagContainer.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core.resource
-
-/**
- * An immutable map containing the tags of some resource.
- */
-public typealias TagContainer = Map<String, Any>
-
-/**
- * Obtain the value of the tag with the specified [key] of type [T]. If the tag does not exist or the tag is of
- * different type, `null` is returned.
- */
-public inline fun <reified T : Any> TagContainer.typed(key: String): T? = this[key] as? T
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt
deleted file mode 100644
index 7434d91c..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistry.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core.services
-
-/**
- * An immutable service registry interface.
- */
-public interface ServiceRegistry {
- /**
- * The keys in this registry.
- */
- public val keys: Collection<ServiceKey<*>>
-
- /**
- * Determine if this map contains the service with the specified [ServiceKey].
- *
- * @param key The key of the service to check for.
- * @return `true` if the service is in the map, `false` otherwise.
- */
- public operator fun contains(key: ServiceKey<*>): Boolean
-
- /**
- * Obtain the service with the specified [ServiceKey].
- *
- * @param key The key of the service to obtain.
- * @return The references to the service.
- * @throws IllegalArgumentException if the key does not exist in the map.
- */
- public operator fun <T : Any> get(key: ServiceKey<T>): T
-
- /**
- * Return the result of associating the specified [service] with the given [key] in this registry.
- */
- public fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry
-}
-
-/**
- * Construct an empty [ServiceRegistry].
- */
-@Suppress("FunctionName")
-public fun ServiceRegistry(): ServiceRegistry = ServiceRegistryImpl(emptyMap())
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt
deleted file mode 100644
index e117bec6..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/services/ServiceRegistryImpl.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core.services
-
-/**
- * Default implementation of the [ServiceRegistry] interface.
- */
-internal class ServiceRegistryImpl(private val map: Map<ServiceKey<*>, Any>) : ServiceRegistry {
- override val keys: Collection<ServiceKey<*>>
- get() = map.keys
-
- override fun contains(key: ServiceKey<*>): Boolean = key in map
-
- override fun <T : Any> get(key: ServiceKey<T>): T {
- @Suppress("UNCHECKED_CAST")
- return map[key] as T
- }
-
- override fun <T : Any> put(key: ServiceKey<T>, service: T): ServiceRegistry =
- ServiceRegistryImpl(map.plus(key to service))
-
- override fun toString(): String = map.toString()
-}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt b/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt
deleted file mode 100644
index f0bd1137..00000000
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/workload/Workload.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.core.workload
-
-import org.opendc.core.Identity
-import org.opendc.core.User
-
-/**
- * A high-level abstraction that represents the actual work that a set of compute resources perform, such
- * as running an application on a machine or a whole workflow running multiple tasks on numerous machines.
- */
-public interface Workload : Identity {
- /**
- * The owner of this workload.
- */
- public val owner: User
-}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 636f291c..2d0da1bf 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -31,7 +31,6 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-harness"))
implementation(project(":opendc-format"))
implementation(project(":opendc-simulator:opendc-simulator-core"))
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index a5cf4fc0..f327b55d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.experiments.capelin.experiment
+package org.opendc.experiments.capelin
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -32,30 +32,24 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Flavor
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.api.ServerWatcher
+import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.ComputeServiceEvent
+import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.driver.HostListener
+import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AllocationPolicy
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.SimHostProvisioner
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20StreamingParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.trace.TraceReader
-import org.opendc.metal.NODE_CLUSTER
-import org.opendc.metal.NodeEvent
-import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.failures.CorrelatedFaultInjector
-import org.opendc.simulator.failures.FailureDomain
import org.opendc.simulator.failures.FaultInjector
import org.opendc.trace.core.EventTracer
import java.io.File
@@ -72,20 +66,20 @@ private val logger = KotlinLogging.logger {}
/**
* Construct the failure domain for the experiments.
*/
-public suspend fun createFailureDomain(
+public fun createFailureDomain(
coroutineScope: CoroutineScope,
clock: Clock,
seed: Int,
failureInterval: Double,
- bareMetalProvisioner: ProvisioningService,
+ service: ComputeService,
chan: Channel<Unit>
): CoroutineScope {
val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
- for (node in bareMetalProvisioner.nodes()) {
- val cluster = node.metadata[NODE_CLUSTER] as String
+ for (host in service.hosts) {
+ val cluster = host.meta["cluster"] as String
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
@@ -95,7 +89,7 @@ public suspend fun createFailureDomain(
failureInterval
)
}
- injector.enqueue(node.metadata["driver"] as FailureDomain)
+ injector.enqueue(host as SimHost)
}
}
return CoroutineScope(coroutineScope.coroutineContext + job)
@@ -139,41 +133,39 @@ public fun createTraceReader(
)
}
-public data class ProvisionerResult(
- val metal: ProvisioningService,
- val provisioner: SimHostProvisioner,
- val compute: ComputeServiceImpl
-)
-
/**
- * Construct the environment for a VM provisioner and return the provisioner instance.
+ * Construct the environment for a simulated compute service..
*/
-public suspend fun createProvisioner(
+public fun createComputeService(
coroutineScope: CoroutineScope,
clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy,
eventTracer: EventTracer
-): ProvisionerResult {
- val environment = environmentReader.use { it.construct(coroutineScope, clock) }
- val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
-
- // Wait for the bare metal nodes to be spawned
- delay(10)
-
- val provisioner = SimHostProvisioner(coroutineScope.coroutineContext, bareMetalProvisioner, SimFairShareHypervisorProvider())
- val hosts = provisioner.provisionAll()
+): ComputeServiceImpl {
+ val hosts = environmentReader
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ coroutineScope.coroutineContext,
+ clock,
+ SimFairShareHypervisorProvider(),
+ def.powerModel
+ )
+ }
- val scheduler = ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
+ val scheduler =
+ ComputeService(coroutineScope.coroutineContext, clock, eventTracer, allocationPolicy) as ComputeServiceImpl
for (host in hosts) {
scheduler.addHost(host)
}
- // Wait for the hypervisors to be spawned
- delay(10)
-
- return ProvisionerResult(bareMetalProvisioner, provisioner, scheduler)
+ return scheduler
}
/**
@@ -186,25 +178,16 @@ public fun attachMonitor(
scheduler: ComputeService,
monitor: ExperimentMonitor
) {
-
- val hypervisors = scheduler.hosts
-
- // Monitor hypervisor events
- for (hypervisor in hypervisors) {
- // TODO Do not expose Host directly but use Hypervisor class.
- val server = (hypervisor as SimHost).node
- monitor.reportHostStateChange(clock.millis(), hypervisor, server)
- server.events
- .onEach { event ->
- val time = clock.millis()
- when (event) {
- is NodeEvent.StateChanged -> {
- monitor.reportHostStateChange(time, hypervisor, event.node)
- }
- }
+ // Monitor host events
+ for (host in scheduler.hosts) {
+ monitor.reportHostStateChange(clock.millis(), host, HostState.UP)
+ host.addListener(object : HostListener {
+ override fun onStateChanged(host: Host, newState: HostState) {
+ monitor.reportHostStateChange(clock.millis(), host, newState)
}
- .launchIn(coroutineScope)
- hypervisor.events
+ })
+
+ host.events
.onEach { event ->
when (event) {
is HostEvent.SliceFinished -> monitor.reportHostSlice(
@@ -216,15 +199,14 @@ public fun attachMonitor(
event.cpuUsage,
event.cpuDemand,
event.numberOfDeployedImages,
- (event.driver as SimHost).node
+ event.driver
)
}
}
.launchIn(coroutineScope)
- val driver = server.metadata["driver"] as SimBareMetalDriver
- driver.powerDraw
- .onEach { monitor.reportPowerConsumption(server, it) }
+ (host as SimHost).powerDraw
+ .onEach { monitor.reportPowerConsumption(host, it) }
.launchIn(coroutineScope)
}
@@ -244,29 +226,32 @@ public fun attachMonitor(
public suspend fun processTrace(
coroutineScope: CoroutineScope,
clock: Clock,
- reader: TraceReader<ComputeWorkload>,
+ reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
chan: Channel<Unit>,
monitor: ExperimentMonitor
) {
val client = scheduler.newClient()
+ val image = client.newImage("vm-image")
try {
var submitted = 0
while (reader.hasNext()) {
- val (time, workload) = reader.next()
+ val entry = reader.next()
submitted++
- delay(max(0, time - clock.millis()))
+ delay(max(0, entry.start - clock.millis()))
coroutineScope.launch {
chan.send(Unit)
val server = client.newServer(
- workload.image.name,
- workload.image,
- Flavor(
- workload.image.tags["cores"] as Int,
- workload.image.tags["required-memory"] as Long
- )
+ entry.name,
+ image,
+ client.newFlavor(
+ entry.name,
+ entry.meta["cores"] as Int,
+ entry.meta["required-memory"] as Long
+ ),
+ meta = entry.meta
)
server.watch(object : ServerWatcher {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index ff0a026d..f9c96bb6 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -35,10 +35,6 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
-import org.opendc.experiments.capelin.experiment.attachMonitor
-import org.opendc.experiments.capelin.experiment.createFailureDomain
-import org.opendc.experiments.capelin.experiment.createProvisioner
-import org.opendc.experiments.capelin.experiment.processTrace
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -157,7 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
)
testScope.launch {
- val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
+ val scheduler = createComputeService(
this,
clock,
environment,
@@ -172,7 +168,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
clock,
seeder.nextInt(),
operationalPhenomena.failureFrequency,
- bareMetalProvisioner,
+ scheduler,
chan
)
} else {
@@ -197,7 +193,6 @@ public abstract class Portfolio(name: String) : Experiment(name) {
failureDomain?.cancel()
scheduler.close()
- provisioner.close()
}
try {
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 1e42cf56..14cc06dc 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -26,7 +26,7 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
-import org.opendc.metal.Node
+import org.opendc.compute.service.driver.HostState
import java.io.Closeable
/**
@@ -41,17 +41,12 @@ public interface ExperimentMonitor : Closeable {
/**
* This method is invoked when the state of a host changes.
*/
- public fun reportHostStateChange(
- time: Long,
- driver: Host,
- host: Node
- ) {
- }
+ public fun reportHostStateChange(time: Long, host: Host, newState: HostState) {}
/**
* Report the power consumption of a host.
*/
- public fun reportPowerConsumption(host: Node, draw: Double) {}
+ public fun reportPowerConsumption(host: Host, draw: Double) {}
/**
* This method is invoked for a host for each slice that is finishes.
@@ -65,7 +60,7 @@ public interface ExperimentMonitor : Closeable {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long = 5 * 60 * 1000L
) {
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index 98052214..c9d57a98 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -27,11 +27,11 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.telemetry.HostEvent
import org.opendc.experiments.capelin.telemetry.ProvisionerEvent
import org.opendc.experiments.capelin.telemetry.parquet.ParquetHostEventWriter
import org.opendc.experiments.capelin.telemetry.parquet.ParquetProvisionerEventWriter
-import org.opendc.metal.Node
import java.io.File
/**
@@ -51,7 +51,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
File(base, "provisioner-metrics/$partition/data.parquet"),
bufferSize
)
- private val currentHostEvent = mutableMapOf<Node, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Host, HostEvent>()
private var startTime = -1L
override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
@@ -63,12 +63,8 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
}
}
- override fun reportHostStateChange(
- time: Long,
- driver: Host,
- host: Node
- ) {
- logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
+ override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
val previousEvent = currentHostEvent[host]
@@ -97,9 +93,9 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
)
}
- private val lastPowerConsumption = mutableMapOf<Node, Double>()
+ private val lastPowerConsumption = mutableMapOf<Host, Double>()
- override fun reportPowerConsumption(host: Node, draw: Double) {
+ override fun reportPowerConsumption(host: Host, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -112,7 +108,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long
) {
val previousEvent = currentHostEvent[host]
@@ -130,7 +126,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -148,7 +144,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -168,7 +164,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
index e7b6a7bb..899fc9b1 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/HostEvent.kt
@@ -22,7 +22,7 @@
package org.opendc.experiments.capelin.telemetry
-import org.opendc.metal.Node
+import org.opendc.compute.service.driver.Host
/**
* A periodic report of the host machine metrics.
@@ -30,7 +30,7 @@ import org.opendc.metal.Node
public data class HostEvent(
override val timestamp: Long,
public val duration: Long,
- public val node: Node,
+ public val host: Host,
public val vmCount: Int,
public val requestedBurst: Long,
public val grantedBurst: Long,
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
index b4fdd66a..4a3e7963 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetHostEventWriter.kt
@@ -41,8 +41,8 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
// record.put("portfolio_id", event.run.parent.parent.id)
// record.put("scenario_id", event.run.parent.id)
// record.put("run_id", event.run.id)
- record.put("host_id", event.node.name)
- record.put("state", event.node.state.name)
+ record.put("host_id", event.host.name)
+ record.put("state", event.host.state.name)
record.put("timestamp", event.timestamp)
record.put("duration", event.duration)
record.put("vm_count", event.vmCount)
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index f9630078..a8462a51 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -22,14 +22,13 @@
package org.opendc.experiments.capelin.trace
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.workload.SimWorkload
import java.util.TreeSet
/**
@@ -45,11 +44,11 @@ public class Sc20ParquetTraceReader(
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
workload: Workload,
seed: Int
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The iterator over the actual trace.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>> =
+ private val iterator: Iterator<TraceEntry<SimWorkload>> =
rawReaders
.map { it.read() }
.run {
@@ -67,19 +66,11 @@ public class Sc20ParquetTraceReader(
this
else {
map { entry ->
- val image = entry.workload.image
- val id = image.name
+ val id = entry.name
val relevantPerformanceInterferenceModelItems =
performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
- val newImage =
- Image(
- image.uid,
- image.name,
- image.tags + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems),
- )
- val newWorkload = entry.workload.copy(image = newImage)
- Sc20RawParquetTraceReader.TraceEntryImpl(entry.submissionTime, newWorkload)
+ entry.copy(meta = entry.meta + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems))
}
}
}
@@ -87,7 +78,7 @@ public class Sc20ParquetTraceReader(
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index b29bdc54..7ea5efe5 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -26,12 +26,10 @@ import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
import java.util.UUID
@@ -48,6 +46,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the fragments into memory.
*/
private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
.disableCompatibility()
.build()
@@ -59,11 +58,9 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val record = reader.read() ?: break
val id = record["id"].toString()
- val tick = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
- val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
duration,
@@ -83,13 +80,14 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the metadata into a workload.
*/
- private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntryImpl> {
+ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
+ @Suppress("DEPRECATION")
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
.disableCompatibility()
.build()
var counter = 0
- val entries = mutableListOf<TraceEntryImpl>()
+ val entries = mutableListOf<TraceEntry<SimWorkload>>()
return try {
while (true) {
@@ -109,13 +107,9 @@ public class Sc20RawParquetTraceReader(private val path: File) {
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val workload = SimTraceWorkload(vmFragments)
- val vmWorkload = ComputeWorkload(
- uid,
- id,
- UnnamedUser,
- Image(
- uid,
- id,
+ entries.add(
+ TraceEntry(
+ uid, id, submissionTime, workload,
mapOf(
"submit-time" to submissionTime,
"end-time" to endTime,
@@ -126,7 +120,6 @@ public class Sc20RawParquetTraceReader(private val path: File) {
)
)
)
- entries.add(TraceEntryImpl(submissionTime, vmWorkload))
}
entries
@@ -141,7 +134,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* The entries in the trace.
*/
- private val entries: List<TraceEntryImpl>
+ private val entries: List<TraceEntry<SimWorkload>>
init {
val fragments = parseFragments(path)
@@ -151,21 +144,5 @@ public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the entries in the trace.
*/
- public fun read(): List<TraceEntry<ComputeWorkload>> = entries
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- internal data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
+ public fun read(): List<TraceEntry<SimWorkload>> = entries
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index c588fda3..9ab69572 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -31,14 +31,12 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
import java.io.Serializable
import java.util.SortedSet
@@ -62,11 +60,11 @@ public class Sc20StreamingParquetTraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* The intermediate buffer to store the read records in.
@@ -98,6 +96,7 @@ public class Sc20StreamingParquetTraceReader(
* The thread to read the records in.
*/
private val readerThread = thread(start = true, name = "sc20-reader") {
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
@@ -113,11 +112,9 @@ public class Sc20StreamingParquetTraceReader(
}
val id = record["id"].toString()
- val tick = record["time"] as Long
val duration = record["duration"] as Long
val cores = record["cores"] as Int
val cpuUsage = record["cpuUsage"] as Double
- val flops = record["flops"] as Long
val fragment = SimTraceWorkload.Fragment(
duration,
@@ -167,6 +164,7 @@ public class Sc20StreamingParquetTraceReader(
val entries = mutableMapOf<String, GenericData.Record>()
val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
+ @Suppress("DEPRECATION")
val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
.disableCompatibility()
.run { if (filter != null) withFilter(filter) else this }
@@ -236,35 +234,25 @@ public class Sc20StreamingParquetTraceReader(
Random(random.nextInt())
)
val workload = SimTraceWorkload(fragments)
- val vmWorkload = ComputeWorkload(
- uid,
- "VM Workload $id",
- UnnamedUser,
- Image(
- uid,
- id,
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to maxCores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- TraceEntryImpl(
- submissionTime,
- vmWorkload
+ TraceEntry(
+ uid, id, submissionTime, workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to maxCores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
}
- .sortedBy { it.submissionTime }
+ .sortedBy { it.start }
.toList()
.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {
readerThread.interrupt()
@@ -287,20 +275,4 @@ public class Sc20StreamingParquetTraceReader(
return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty()
}
}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
index 881652f6..5c8727ea 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/WorkloadSampler.kt
@@ -23,12 +23,11 @@
package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.SamplingStrategy
import org.opendc.experiments.capelin.model.Workload
import org.opendc.format.trace.TraceEntry
+import org.opendc.simulator.compute.workload.SimWorkload
import java.util.*
import kotlin.random.Random
@@ -38,11 +37,11 @@ private val logger = KotlinLogging.logger {}
* Sample the workload for the specified [run].
*/
public fun sampleWorkload(
- trace: List<TraceEntry<ComputeWorkload>>,
+ trace: List<TraceEntry<SimWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<ComputeWorkload>> {
+): List<TraceEntry<SimWorkload>> {
return when {
workload is CompositeWorkload -> sampleRegularWorkload(trace, workload, subWorkload, seed)
workload.samplingStrategy == SamplingStrategy.HPC ->
@@ -58,24 +57,24 @@ public fun sampleWorkload(
* Sample a regular (non-HPC) workload.
*/
public fun sampleRegularWorkload(
- trace: List<TraceEntry<ComputeWorkload>>,
+ trace: List<TraceEntry<SimWorkload>>,
workload: Workload,
subWorkload: Workload,
seed: Int
-): List<TraceEntry<ComputeWorkload>> {
+): List<TraceEntry<SimWorkload>> {
val fraction = subWorkload.fraction
val shuffled = trace.shuffled(Random(seed))
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
val totalLoad = if (workload is CompositeWorkload) {
workload.totalLoad
} else {
- shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ shuffled.sumByDouble { it.meta.getValue("total-load") as Double }
}
var currentLoad = 0.0
for (entry in shuffled) {
- val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ val entryLoad = entry.meta.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -93,23 +92,23 @@ public fun sampleRegularWorkload(
* Sample a HPC workload.
*/
public fun sampleHpcWorkload(
- trace: List<TraceEntry<ComputeWorkload>>,
+ trace: List<TraceEntry<SimWorkload>>,
workload: Workload,
seed: Int,
sampleOnLoad: Boolean
-): List<TraceEntry<ComputeWorkload>> {
+): List<TraceEntry<SimWorkload>> {
val pattern = Regex("^vm__workload__(ComputeNode|cn).*")
val random = Random(seed)
val fraction = workload.fraction
val (hpc, nonHpc) = trace.partition { entry ->
- val name = entry.workload.image.name
+ val name = entry.name
name.matches(pattern)
}
val hpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
hpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -118,7 +117,7 @@ public fun sampleHpcWorkload(
val nonHpcSequence = generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
nonHpc.mapTo(res) { sample(it, index) }
res.shuffle(random)
res
@@ -130,7 +129,7 @@ public fun sampleHpcWorkload(
val totalLoad = if (workload is CompositeWorkload) {
workload.totalLoad
} else {
- trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ trace.sumByDouble { it.meta.getValue("total-load") as Double }
}
logger.debug { "Total trace load: $totalLoad" }
@@ -139,12 +138,12 @@ public fun sampleHpcWorkload(
var nonHpcCount = 0
var nonHpcLoad = 0.0
- val res = mutableListOf<TraceEntry<ComputeWorkload>>()
+ val res = mutableListOf<TraceEntry<SimWorkload>>()
if (sampleOnLoad) {
var currentLoad = 0.0
for (entry in hpcSequence) {
- val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ val entryLoad = entry.meta.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > fraction) {
break
}
@@ -156,7 +155,7 @@ public fun sampleHpcWorkload(
}
for (entry in nonHpcSequence) {
- val entryLoad = entry.workload.image.tags.getValue("total-load") as Double
+ val entryLoad = entry.meta.getValue("total-load") as Double
if ((currentLoad + entryLoad) / totalLoad > 1) {
break
}
@@ -170,7 +169,7 @@ public fun sampleHpcWorkload(
hpcSequence
.take((fraction * trace.size).toInt())
.forEach { entry ->
- hpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ hpcLoad += entry.meta.getValue("total-load") as Double
hpcCount += 1
res.add(entry)
}
@@ -178,7 +177,7 @@ public fun sampleHpcWorkload(
nonHpcSequence
.take(((1 - fraction) * trace.size).toInt())
.forEach { entry ->
- nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double
+ nonHpcLoad += entry.meta.getValue("total-load") as Double
nonHpcCount += 1
res.add(entry)
}
@@ -194,16 +193,7 @@ public fun sampleHpcWorkload(
/**
* Sample a random trace entry.
*/
-private fun sample(entry: TraceEntry<ComputeWorkload>, i: Int): TraceEntry<ComputeWorkload> {
- val id = UUID.nameUUIDFromBytes("${entry.workload.image.uid}-$i".toByteArray())
- val image = Image(
- id,
- entry.workload.image.name,
- entry.workload.image.tags
- )
- val vmWorkload = entry.workload.copy(uid = id, image = image, name = entry.workload.name)
- return VmTraceEntry(vmWorkload, entry.submissionTime)
+private fun sample(entry: TraceEntry<SimWorkload>, i: Int): TraceEntry<SimWorkload> {
+ val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
+ return entry.copy(uid = uid)
}
-
-private class VmTraceEntry(override val workload: ComputeWorkload, override val submissionTime: Long) :
- TraceEntry<ComputeWorkload>
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index dfc6b90b..4e6cfddc 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,13 +32,9 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.api.ComputeWorkload
+import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
-import org.opendc.experiments.capelin.experiment.attachMonitor
-import org.opendc.experiments.capelin.experiment.createFailureDomain
-import org.opendc.experiments.capelin.experiment.createProvisioner
-import org.opendc.experiments.capelin.experiment.processTrace
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
@@ -46,7 +42,7 @@ import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.environment.EnvironmentReader
import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import org.opendc.format.trace.TraceReader
-import org.opendc.metal.Node
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import java.io.File
@@ -101,15 +97,13 @@ class CapelinIntegrationTest {
val tracer = EventTracer(clock)
testScope.launch {
- val res = createProvisioner(
+ scheduler = createComputeService(
this,
clock,
environmentReader,
allocationPolicy,
tracer
)
- val bareMetalProvisioner = res.metal
- scheduler = res.compute
val failureDomain = if (failures) {
println("ENABLING failures")
@@ -118,7 +112,7 @@ class CapelinIntegrationTest {
clock,
seed,
24.0 * 7,
- bareMetalProvisioner,
+ scheduler,
chan
)
} else {
@@ -140,7 +134,6 @@ class CapelinIntegrationTest {
failureDomain?.cancel()
scheduler.close()
monitor.close()
- res.provisioner.close()
}
runSimulation()
@@ -166,7 +159,7 @@ class CapelinIntegrationTest {
val tracer = EventTracer(clock)
testScope.launch {
- val (_, provisioner, scheduler) = createProvisioner(
+ val scheduler = createComputeService(
this,
clock,
environmentReader,
@@ -187,7 +180,6 @@ class CapelinIntegrationTest {
scheduler.close()
monitor.close()
- provisioner.close()
}
runSimulation()
@@ -209,7 +201,7 @@ class CapelinIntegrationTest {
/**
* Obtain the trace reader for the test.
*/
- private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<ComputeWorkload> {
+ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> {
return Sc20ParquetTraceReader(
listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
emptyMap(),
@@ -241,7 +233,7 @@ class CapelinIntegrationTest {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long
) {
totalRequestedBurst += requestedBurst
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
index 00aa0395..02e77c7c 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts
@@ -30,10 +30,9 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-harness"))
implementation(project(":opendc-format"))
- implementation(project(":opendc-workflows"))
+ implementation(project(":opendc-workflow:opendc-workflow-service"))
implementation(project(":opendc-simulator:opendc-simulator-core"))
implementation(project(":opendc-compute:opendc-compute-simulator"))
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
index 7b9d70ed..9e305b3d 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt
@@ -26,23 +26,22 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.test.TestCoroutineScope
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.simulator.SimHostProvisioner
+import org.opendc.compute.simulator.SimHost
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
-import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.enable
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.WorkflowEvent
-import org.opendc.workflows.service.WorkflowSchedulerMode
-import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
-import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
-import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import org.opendc.workflow.service.WorkflowEvent
+import org.opendc.workflow.service.WorkflowService
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
import java.io.File
import java.io.FileInputStream
import kotlin.math.max
@@ -84,16 +83,20 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
}
testScope.launch {
- val environment = Sc18EnvironmentReader(FileInputStream(File(environment)))
- .use { it.construct(testScope, clock) }
+ val hosts = Sc18EnvironmentReader(FileInputStream(File(environment)))
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ testScope.coroutineContext,
+ clock,
+ SimSpaceSharedHypervisorProvider()
+ )
+ }
- val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
-
- // Wait for the bare metal nodes to be spawned
- delay(10)
-
- val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
- val hosts = provisioner.provisionAll()
val compute = ComputeService(
testScope.coroutineContext,
clock,
@@ -103,11 +106,8 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
hosts.forEach { compute.addHost(it) }
- // Wait for the hypervisors to be spawned
- delay(10)
-
- val scheduler = StageWorkflowService(
- testScope,
+ val scheduler = WorkflowService(
+ testScope.coroutineContext,
clock,
tracer,
compute.newClient(),
@@ -121,9 +121,9 @@ public class UnderspecificationExperiment : Experiment("underspecification") {
val reader = GwfTraceReader(File(trace))
while (reader.hasNext()) {
- val (time, job) = reader.next()
- delay(max(0, time * 1000 - clock.millis()))
- scheduler.submit(job)
+ val entry = reader.next()
+ delay(max(0, entry.start * 1000 - clock.millis()))
+ scheduler.submit(entry.workload)
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
index dbd04b87..a8356888 100644
--- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
+++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.sc18
import org.opendc.trace.core.EventStream
import org.opendc.trace.core.onEvent
-import org.opendc.workflows.service.WorkflowEvent
+import org.opendc.workflow.service.WorkflowEvent
import java.util.*
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts
index 37e9c9c8..385e556d 100644
--- a/simulator/opendc-format/build.gradle.kts
+++ b/simulator/opendc-format/build.gradle.kts
@@ -30,9 +30,8 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-compute:opendc-compute-api"))
- api(project(":opendc-workflows"))
+ api(project(":opendc-workflow:opendc-workflow-api"))
implementation(project(":opendc-simulator:opendc-simulator-compute"))
implementation(project(":opendc-compute:opendc-compute-simulator"))
api("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}")
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
index 1f73bb61..97d6f239 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt
@@ -22,17 +22,14 @@
package org.opendc.format.environment
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.core.Environment
import java.io.Closeable
-import java.time.Clock
/**
- * An interface for reading descriptions of topology environments into memory as [Environment].
+ * An interface for reading descriptions of topology environments into memory.
*/
public interface EnvironmentReader : Closeable {
/**
- * Construct an [Environment] in the specified [CoroutineScope].
+ * Read the environment into a list.
*/
- public suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment
+ public fun read(): List<MachineDef>
}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
index 252c40f5..b5b3b84b 100644
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/Identity.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,21 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.core
+package org.opendc.format.environment
+import org.opendc.compute.simulator.power.api.CpuPowerModel
+import org.opendc.simulator.compute.SimMachineModel
import java.util.*
-/**
- * An object that has a unique identity.
- */
-public interface Identity {
- /**
- * A unique, opaque, system-generated value, representing the object.
- */
- public val uid: UUID
-
- /**
- * A non-empty, human-readable string representing the object.
- */
- public val name: String
-}
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: SimMachineModel,
+ val powerModel: CpuPowerModel
+)
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index bbbbe87c..3da8d0b3 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -25,21 +25,14 @@ package org.opendc.format.environment.sc18
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.simulator.SimBareMetalDriver
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
+import org.opendc.compute.simulator.power.models.ConstantPowerModel
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.InputStream
-import java.time.Clock
import java.util.*
/**
@@ -55,9 +48,12 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
+ /**
+ * Read the environment.
+ */
+ public override fun read(): List<MachineDef> {
var counter = 0
- val nodes = setup.rooms.flatMap { room ->
+ return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
when (roomObject) {
is RoomObject.Rack -> {
@@ -75,35 +71,18 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimBareMetalDriver(
- coroutineScope,
- clock,
- UUID.randomUUID(),
- "node-${counter++}",
+ MachineDef(
+ UUID(0L, counter++.toLong()),
+ "node-$counter",
emptyMap(),
- SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000)))
+ SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))),
+ ConstantPowerModel(0.0)
)
}
}
}
}
}
-
- val provisioningService = SimpleProvisioningService()
- for (node in nodes) {
- provisioningService.create(node)
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
- val platform = Platform(
- UUID.randomUUID(),
- "sc18-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment(setup.name, null, listOf(platform))
}
override fun close() {}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 998f9cd6..9a06a40f 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -22,17 +22,9 @@
package org.opendc.format.environment.sc20
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.NODE_CLUSTER
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -40,7 +32,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
-import java.time.Clock
import java.util.*
/**
@@ -54,8 +45,7 @@ public class Sc20ClusterEnvironmentReader(
public constructor(file: File) : this(FileInputStream(file))
- @Suppress("BlockingMethodInNonBlockingContext")
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
+ public override fun read(): List<MachineDef> {
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
@@ -69,7 +59,7 @@ public class Sc20ClusterEnvironmentReader(
var memoryPerHost: Long
var coresPerHost: Int
- val nodes = mutableListOf<SimBareMetalDriver>()
+ val nodes = mutableListOf<MachineDef>()
val random = Random(0)
input.bufferedReader().use { reader ->
@@ -103,12 +93,10 @@ public class Sc20ClusterEnvironmentReader(
repeat(numberOfHosts) {
nodes.add(
- SimBareMetalDriver(
- coroutineScope,
- clock,
+ MachineDef(
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$it",
- mapOf(NODE_CLUSTER to clusterId),
+ mapOf("cluster" to clusterId),
SimMachineModel(
List(coresPerHost) { coreId ->
ProcessingUnit(unknownProcessingNode, coreId, speed)
@@ -125,22 +113,7 @@ public class Sc20ClusterEnvironmentReader(
}
}
- val provisioningService = SimpleProvisioningService()
- for (node in nodes) {
- provisioningService.create(node)
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
-
- val platform = Platform(
- UUID.randomUUID(),
- "sc20-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment("SC20 Environment", null, listOf(platform))
+ return nodes
}
override fun close() {}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index 6cf65f7f..effd0286 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -25,22 +25,14 @@ package org.opendc.format.environment.sc20
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import kotlinx.coroutines.CoroutineScope
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import java.io.InputStream
-import java.time.Clock
import java.util.*
/**
@@ -55,9 +47,12 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
+ /**
+ * Read the environment.
+ */
+ public override fun read(): List<MachineDef> {
var counter = 0
- val nodes = setup.rooms.flatMap { room ->
+ return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
when (roomObject) {
is RoomObject.Rack -> {
@@ -81,11 +76,9 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
else -> throw IllegalArgumentException("The cpu id $id is not recognized")
}
}
- SimBareMetalDriver(
- coroutineScope,
- clock,
- UUID.randomUUID(),
- "node-${counter++}",
+ MachineDef(
+ UUID(0L, counter++.toLong()),
+ "node-$counter",
emptyMap(),
SimMachineModel(cores, memories),
// For now we assume a simple linear load model with an idle draw of ~200W and a maximum
@@ -98,23 +91,6 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja
}
}
}
-
- val provisioningService = SimpleProvisioningService()
- for (node in nodes) {
- provisioningService.create(node)
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
-
- val platform = Platform(
- UUID.randomUUID(),
- "sc20-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment(setup.name, null, listOf(platform))
}
override fun close() {}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
index ec547e84..3ce79d69 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
@@ -24,31 +24,21 @@
package org.opendc.format.trace
-import org.opendc.core.workload.Workload
+import java.util.UUID
/**
* An entry in a workload trace.
*
- * @param T The shape of the workload in this entry.
+ * @param uid The unique identifier of the entry.
+ * @param name The name of the entry.
+ * @param start The start time of the workload.
+ * @param workload The workload of the entry.
+ * @param meta The meta-data associated with the workload.
*/
-public interface TraceEntry<T : Workload> {
- /**
- * The time of submission of the workload.
- */
- public val submissionTime: Long
-
- /**
- * The workload in this trace entry.
- */
- public val workload: T
-
- /**
- * Extract the submission time from this entry.
- */
- public operator fun component1(): Long = submissionTime
-
- /**
- * Extract the workload from this entry.
- */
- public operator fun component2(): T = workload
-}
+public data class TraceEntry<out T>(
+ val uid: UUID,
+ val name: String,
+ val start: Long,
+ val workload: T,
+ val meta: Map<String, Any>
+)
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
index a0beec3e..7df1acd3 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
@@ -22,14 +22,13 @@
package org.opendc.format.trace
-import org.opendc.core.workload.Workload
import java.io.Closeable
/**
- * An interface for reading [Workload]s into memory.
+ * An interface for reading workloads into memory.
*
* This interface must guarantee that the entries are delivered in order of submission time.
*
* @param T The shape of the workloads supported by this reader.
*/
-public interface TraceReader<T : Workload> : Iterator<TraceEntry<T>>, Closeable
+public interface TraceReader<T> : Iterator<TraceEntry<T>>, Closeable
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt
deleted file mode 100644
index 54fb6214..00000000
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceWriter.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace
-
-import org.opendc.core.workload.Workload
-import java.io.Closeable
-
-/**
- * An interface for persisting workload traces (e.g. to disk).
- *
- * @param T The type of [Workload] supported by this writer.
- */
-public interface TraceWriter<T : Workload> : Closeable {
- /**
- * Write an entry to the trace.
- *
- * Entries must be written in order of submission time. Failing to do so results in a [IllegalArgumentException].
- *
- * @param submissionTime The time of submission of the workload.
- * @param workload The workload to write to the trace.
- */
- public fun write(submissionTime: Long, workload: T)
-}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 1571b17d..769b2b13 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -22,14 +22,12 @@
package org.opendc.format.trace.bitbrains
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -45,17 +43,17 @@ import kotlin.math.min
public class BitbrainsTraceReader(
traceDirectory: File,
performanceInterferenceModel: PerformanceInterferenceModel
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>()
+ val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
var timestampCol = 0
var coreCol = 0
@@ -132,50 +130,27 @@ public class BitbrainsTraceReader(
)
val workload = SimTraceWorkload(flopsHistory.asSequence())
- val vmWorkload = ComputeWorkload(
+ entries[vmId] = TraceEntry(
uuid,
- "VM Workload $vmId",
- UnnamedUser,
- Image(
- uuid,
- vmId.toString(),
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to cores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- entries[vmId] = TraceEntryImpl(
+ vmId.toString(),
startTime,
- vmWorkload
+ workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to cores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = entries.values.sortedBy { it.start }.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
index cd7aff3c..e68afeb7 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -22,15 +22,13 @@
package org.opendc.format.trace.gwf
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.Task
-import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
-import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import java.io.BufferedReader
import java.io.File
import java.io.InputStream
@@ -88,7 +86,8 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntryImpl>()
+ val workflows = mutableMapOf<Long, Job>()
+ val starts = mutableMapOf<Long, Long>()
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
@@ -131,22 +130,21 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
val flops: Long = 4000 * runtime * cores
- val entry = entries.getOrPut(workflowId) {
- TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet()))
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
}
- val workflow = entry.workload
val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- Image(UUID.randomUUID(), "<unnamed>", mapOf("workload" to workload)),
HashSet(),
mapOf(
+ "workload" to workload,
WORKFLOW_TASK_CORES to cores,
WORKFLOW_TASK_DEADLINE to (runtime * 1000)
),
)
- entry.submissionTime = min(entry.submissionTime, submitTime)
+ starts.merge(workflowId, submitTime, ::min)
(workflow.tasks as MutableSet<Task>).add(task)
tasks[taskId] = task
taskDependencies[task] = dependencies
@@ -165,7 +163,9 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
+ .sortedBy { it.start }
+ .iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
@@ -173,20 +173,4 @@ public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
override fun next(): TraceEntry<Job> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: Job
- ) : TraceEntry<Job>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
index 07785632..1eb4bac2 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -22,14 +22,12 @@
package org.opendc.format.trace.sc20
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -49,17 +47,17 @@ public class Sc20TraceReader(
performanceInterferenceModel: PerformanceInterferenceModel,
selectedVms: List<String>,
random: Random
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<UUID, TraceEntry<ComputeWorkload>>()
+ val entries = mutableMapOf<UUID, TraceEntry<SimWorkload>>()
val timestampCol = 0
val cpuUsageCol = 1
@@ -85,7 +83,7 @@ public class Sc20TraceReader(
var vmId = ""
var maxCores = -1
var requiredMemory = -1L
- var timestamp = -1L
+ var timestamp: Long
var cores = -1
var minTime = Long.MAX_VALUE
@@ -157,50 +155,27 @@ public class Sc20TraceReader(
Random(random.nextInt())
)
val workload = SimTraceWorkload(flopsFragments.asSequence())
- val vmWorkload = ComputeWorkload(
+ entries[uuid] = TraceEntry(
uuid,
- "VM Workload $vmId",
- UnnamedUser,
- Image(
- uuid,
- vmId,
- mapOf(
- IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
- "cores" to cores,
- "required-memory" to requiredMemory,
- "workload" to workload
- )
- )
- )
- entries[uuid] = TraceEntryImpl(
+ vmId,
minTime,
- vmWorkload
+ workload,
+ mapOf(
+ IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems,
+ "cores" to cores,
+ "required-memory" to requiredMemory,
+ "workload" to workload
+ )
)
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = entries.values.sortedBy { it.start }.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
index ead20c35..0d1f3cea 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt
@@ -22,12 +22,10 @@
package org.opendc.format.trace.swf
-import org.opendc.compute.api.ComputeWorkload
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -43,17 +41,17 @@ import java.util.*
public class SwfTraceReader(
file: File,
maxNumCores: Int = -1
-) : TraceReader<ComputeWorkload> {
+) : TraceReader<SimWorkload> {
/**
* The internal iterator to use for this reader.
*/
- private val iterator: Iterator<TraceEntry<ComputeWorkload>>
+ private val iterator: Iterator<TraceEntry<SimWorkload>>
/**
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntry<ComputeWorkload>>()
+ val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
val jobNumberCol = 0
val submitTimeCol = 1 // seconds (begin of trace is 0)
@@ -73,7 +71,6 @@ public class SwfTraceReader(
var slicedWaitTime: Long
var flopsPerSecond: Long
var flopsPartialSlice: Long
- var flopsFullSlice: Long
var runtimePartialSliceRemainder: Long
BufferedReader(FileReader(file)).use { reader ->
@@ -127,7 +124,6 @@ public class SwfTraceReader(
flopsPerSecond = 4_000L * cores
runtimePartialSliceRemainder = runTime % sliceDuration
flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder
- flopsFullSlice = flopsPerSecond * runTime - flopsPartialSlice
for (
tick in (submitTime + slicedWaitTime)
@@ -155,48 +151,27 @@ public class SwfTraceReader(
val uuid = UUID(0L, jobNumber)
val workload = SimTraceWorkload(flopsHistory.asSequence())
- val vmWorkload = ComputeWorkload(
+ entries[jobNumber] = TraceEntry(
uuid,
- "SWF Workload $jobNumber",
- UnnamedUser,
- Image(
- uuid,
- jobNumber.toString(),
- mapOf(
- "cores" to cores,
- "required-memory" to memory,
- "workload" to workload
- )
+ jobNumber.toString(),
+ submitTime,
+ workload,
+ mapOf(
+ "cores" to cores,
+ "required-memory" to memory,
+ "workload" to workload
)
)
-
- entries[jobNumber] = TraceEntryImpl(submitTime, vmWorkload)
}
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = entries.values.sortedBy { it.start }.iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
- override fun next(): TraceEntry<ComputeWorkload> = iterator.next()
+ override fun next(): TraceEntry<SimWorkload> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: ComputeWorkload
- ) : TraceEntry<ComputeWorkload>
}
diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index 5a271fab..feadf61f 100644
--- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -25,15 +25,13 @@ package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import org.opendc.compute.api.Image
-import org.opendc.core.User
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.Task
-import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
-import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import java.util.UUID
import kotlin.math.min
@@ -53,10 +51,12 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
* Initialize the reader.
*/
init {
- val entries = mutableMapOf<Long, TraceEntryImpl>()
+ val workflows = mutableMapOf<Long, Job>()
+ val starts = mutableMapOf<Long, Long>()
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
+ @Suppress("DEPRECATION")
val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build()
while (true) {
@@ -74,29 +74,22 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val flops: Long = 4100 * (runtime / 1000) * cores
- val entry = entries.getOrPut(workflowId) {
- TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet()))
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
}
- val workflow = entry.workload
val workload = SimFlopsWorkload(flops)
val task = Task(
UUID(0L, taskId),
"<unnamed>",
- Image(
- UUID.randomUUID(),
- "<unnamed>",
- mapOf(
- "workload" to workload
- )
- ),
HashSet(),
mapOf(
+ "workload" to workload,
WORKFLOW_TASK_CORES to cores,
WORKFLOW_TASK_DEADLINE to runtime
)
)
- entry.submissionTime = min(entry.submissionTime, submitTime)
+ starts.merge(workflowId, submitTime, ::min)
(workflow.tasks as MutableSet<Task>).add(task)
tasks[taskId] = task
taskDependencies[task] = dependencies
@@ -112,7 +105,9 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
}
// Create the entry iterator
- iterator = entries.values.sortedBy { it.submissionTime }.iterator()
+ iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
+ .sortedBy { it.start }
+ .iterator()
}
override fun hasNext(): Boolean = iterator.hasNext()
@@ -120,20 +115,4 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
override fun next(): TraceEntry<Job> = iterator.next()
override fun close() {}
-
- /**
- * An unnamed user.
- */
- private object UnnamedUser : User {
- override val name: String = "<unnamed>"
- override val uid: UUID = UUID.randomUUID()
- }
-
- /**
- * An entry in the trace.
- */
- private data class TraceEntryImpl(
- override var submissionTime: Long,
- override val workload: Job
- ) : TraceEntry<Job>
}
diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 7e3d2623..e0e049cf 100644
--- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -32,14 +32,14 @@ class SwfTraceReaderTest {
internal fun testParseSwf() {
val reader = SwfTraceReader(File(SwfTraceReaderTest::class.java.getResource("/swf_trace.txt").toURI()))
var entry = reader.next()
- assertEquals(0, entry.submissionTime)
+ assertEquals(0, entry.start)
// 1961 slices for waiting, 3 full and 1 partial running slices
- assertEquals(1965, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size)
+ assertEquals(1965, (entry.workload as SimTraceWorkload).trace.toList().size)
entry = reader.next()
- assertEquals(164472, entry.submissionTime)
+ assertEquals(164472, entry.start)
// 1188 slices for waiting, 0 full and 1 partial running slices
- assertEquals(1189, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().size)
- assertEquals(0.25, (entry.workload.image.tags["workload"] as SimTraceWorkload).trace.toList().last().usage)
+ assertEquals(1189, (entry.workload as SimTraceWorkload).trace.toList().size)
+ assertEquals(0.25, (entry.workload as SimTraceWorkload).trace.toList().last().usage)
}
}
diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
index 58d96657..bcfa7553 100644
--- a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
+++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
@@ -36,11 +36,11 @@ class WtfTraceReaderTest {
fun testParseWtf() {
val reader = WtfTraceReader("src/test/resources/wtf-trace")
var entry = reader.next()
- assertEquals(0, entry.submissionTime)
+ assertEquals(0, entry.start)
assertEquals(23, entry.workload.tasks.size)
entry = reader.next()
- assertEquals(333387, entry.submissionTime)
+ assertEquals(333387, entry.start)
assertEquals(23, entry.workload.tasks.size)
}
}
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt
deleted file mode 100644
index f1d4ea2e..00000000
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/NodeState.kt
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.metal
-
-/**
- * An enumeration describing the possible states of a bare-metal compute node.
- */
-public enum class NodeState {
- /**
- * The node is booting.
- */
- BOOT,
-
- /**
- * The node is powered off.
- */
- SHUTOFF,
-
- /**
- * The node is active and running.
- */
- ACTIVE,
-
- /**
- * The node is in error.
- */
- ERROR,
-
- /**
- * The state of the node is unknown.
- */
- UNKNOWN,
-}
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt
deleted file mode 100644
index 3b15be94..00000000
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/driver/BareMetalDriver.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.metal.driver
-
-import kotlinx.coroutines.flow.Flow
-import org.opendc.compute.api.Image
-import org.opendc.compute.api.Server
-import org.opendc.core.services.AbstractServiceKey
-import org.opendc.metal.Node
-import java.util.UUID
-
-/**
- * A driver interface for the management interface of a bare-metal compute node.
- */
-public interface BareMetalDriver {
- /**
- * The [Node] that is controlled by this driver.
- */
- public val node: Flow<Node>
-
- /**
- * The amount of work done by the machine in percentage with respect to the total amount of processing power
- * available.
- */
- public val usage: Flow<Double>
-
- /**
- * Initialize the driver.
- */
- public suspend fun init(): Node
-
- /**
- * Start the bare metal node with the specified boot disk image.
- */
- public suspend fun start(): Node
-
- /**
- * Stop the bare metal node if it is running.
- */
- public suspend fun stop(): Node
-
- /**
- * Reboot the bare metal node.
- */
- public suspend fun reboot(): Node
-
- /**
- * Update the boot disk image of the compute node.
- *
- * Changing the boot disk image of node does not affect it while the node is running. In order to start the new boot
- * disk image, the compute node must be restarted.
- */
- public suspend fun setImage(image: Image): Node
-
- /**
- * Obtain the state of the compute node.
- */
- public suspend fun refresh(): Node
-
- /**
- * A key that allows access to the [BareMetalDriver] instance from a [Server] that runs on the bare-metal machine.
- */
- public companion object Key : AbstractServiceKey<BareMetalDriver>(UUID.randomUUID(), "bare-metal:driver")
-}
diff --git a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt b/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt
deleted file mode 100644
index 2d6353c8..00000000
--- a/simulator/opendc-metal/src/main/kotlin/org/opendc/metal/service/SimpleProvisioningService.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.metal.service
-
-import kotlinx.coroutines.CancellationException
-import org.opendc.compute.api.Image
-import org.opendc.metal.Node
-import org.opendc.metal.driver.BareMetalDriver
-
-/**
- * A very basic implementation of the [ProvisioningService].
- */
-public class SimpleProvisioningService : ProvisioningService {
- /**
- * The active nodes in this service.
- */
- private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf()
-
- override suspend fun create(driver: BareMetalDriver): Node {
- val node = driver.init()
- nodes[node] = driver
- return node
- }
-
- override suspend fun nodes(): Set<Node> = nodes.keys
-
- override suspend fun refresh(node: Node): Node {
- return nodes[node]!!.refresh()
- }
-
- override suspend fun deploy(node: Node, image: Image): Node {
- val driver = nodes[node]!!
- driver.setImage(image)
- return driver.reboot()
- }
-
- override suspend fun stop(node: Node): Node {
- val driver = nodes[node]!!
- return try {
- driver.stop()
- } catch (e: CancellationException) {
- node
- }
- }
-}
diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts
index d0b80cc7..d07fe7a6 100644
--- a/simulator/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc-runner-web/build.gradle.kts
@@ -34,7 +34,6 @@ application {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
implementation(project(":opendc-compute:opendc-compute-simulator"))
implementation(project(":opendc-format"))
implementation(project(":opendc-experiments:opendc-experiments-capelin"))
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 482fe754..b9aeecb8 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -46,11 +46,11 @@ import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolic
import org.opendc.compute.service.scheduler.ProvisionedCoresAllocationPolicy
import org.opendc.compute.service.scheduler.RandomAllocationPolicy
import org.opendc.compute.simulator.allocation.*
-import org.opendc.experiments.capelin.experiment.attachMonitor
-import org.opendc.experiments.capelin.experiment.createFailureDomain
-import org.opendc.experiments.capelin.experiment.createProvisioner
-import org.opendc.experiments.capelin.experiment.processTrace
+import org.opendc.experiments.capelin.attachMonitor
+import org.opendc.experiments.capelin.createComputeService
+import org.opendc.experiments.capelin.createFailureDomain
import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.processTrace
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader
import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
@@ -247,7 +247,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
val tracer = EventTracer(clock)
testScope.launch {
- val (bareMetalProvisioner, provisioner, scheduler) = createProvisioner(
+ val scheduler = createComputeService(
this,
clock,
environment,
@@ -262,7 +262,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
clock,
seeder.nextInt(),
operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
- bareMetalProvisioner,
+ scheduler,
chan
)
} else {
@@ -287,7 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") {
failureDomain?.cancel()
scheduler.close()
- provisioner.close()
}
try {
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
index 2f11347d..e7e99a3d 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt
@@ -28,36 +28,24 @@ import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Field
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Projections
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.launch
import org.bson.Document
import org.bson.types.ObjectId
-import org.opendc.compute.simulator.SimBareMetalDriver
import org.opendc.compute.simulator.power.models.LinearPowerModel
-import org.opendc.core.Environment
-import org.opendc.core.Platform
-import org.opendc.core.Zone
-import org.opendc.core.services.ServiceRegistry
import org.opendc.format.environment.EnvironmentReader
-import org.opendc.metal.NODE_CLUSTER
-import org.opendc.metal.service.ProvisioningService
-import org.opendc.metal.service.SimpleProvisioningService
+import org.opendc.format.environment.MachineDef
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
-import java.time.Clock
import java.util.*
/**
* A helper class that converts the MongoDB topology into an OpenDC environment.
*/
public class TopologyParser(private val collection: MongoCollection<Document>, private val id: ObjectId) : EnvironmentReader {
- /**
- * Parse the topology with the specified [id].
- */
- override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
- val nodes = mutableListOf<SimBareMetalDriver>()
+
+ public override fun read(): List<MachineDef> {
+ val nodes = mutableListOf<MachineDef>()
val random = Random(0)
for (machine in fetchMachines(id)) {
@@ -85,36 +73,17 @@ public class TopologyParser(private val collection: MongoCollection<Document>, p
val energyConsumptionW = machine.getList("cpus", Document::class.java).sumBy { it.getInteger("energyConsumptionW") }.toDouble()
nodes.add(
- SimBareMetalDriver(
- coroutineScope,
- clock,
+ MachineDef(
UUID(random.nextLong(), random.nextLong()),
"node-$clusterId-$position",
- mapOf(NODE_CLUSTER to clusterId),
+ mapOf("cluster" to clusterId),
SimMachineModel(processors, memoryUnits),
LinearPowerModel(2 * energyConsumptionW, .5)
)
)
}
- val provisioningService = SimpleProvisioningService()
- coroutineScope.launch {
- for (node in nodes) {
- provisioningService.create(node)
- }
- }
-
- val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
-
- val platform = Platform(
- UUID.randomUUID(),
- "opendc-platform",
- listOf(
- Zone(UUID.randomUUID(), "zone", serviceRegistry)
- )
- )
-
- return Environment(fetchName(id), null, listOf(platform))
+ return nodes
}
override fun close() {}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
index fe814c76..a8ac6c10 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -27,10 +27,9 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.service.ComputeServiceEvent
import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.telemetry.HostEvent
-import org.opendc.metal.Node
-import org.opendc.metal.NodeState
import kotlin.math.max
/**
@@ -38,7 +37,7 @@ import kotlin.math.max
*/
public class WebExperimentMonitor : ExperimentMonitor {
private val logger = KotlinLogging.logger {}
- private val currentHostEvent = mutableMapOf<Node, HostEvent>()
+ private val currentHostEvent = mutableMapOf<Host, HostEvent>()
private var startTime = -1L
override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {
@@ -50,12 +49,8 @@ public class WebExperimentMonitor : ExperimentMonitor {
}
}
- override fun reportHostStateChange(
- time: Long,
- driver: Host,
- host: Node
- ) {
- logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" }
+ override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
val previousEvent = currentHostEvent[host]
@@ -84,9 +79,9 @@ public class WebExperimentMonitor : ExperimentMonitor {
)
}
- private val lastPowerConsumption = mutableMapOf<Node, Double>()
+ private val lastPowerConsumption = mutableMapOf<Host, Double>()
- override fun reportPowerConsumption(host: Node, draw: Double) {
+ override fun reportPowerConsumption(host: Host, draw: Double) {
lastPowerConsumption[host] = draw
}
@@ -99,7 +94,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
cpuUsage: Double,
cpuDemand: Double,
numberOfDeployedImages: Int,
- host: Node,
+ host: Host,
duration: Long
) {
val previousEvent = currentHostEvent[host]
@@ -117,7 +112,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -135,7 +130,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -155,7 +150,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
cpuUsage,
cpuDemand,
lastPowerConsumption[host] ?: 200.0,
- host.flavor.cpuCount
+ host.model.cpuCount
)
currentHostEvent[host] = event
@@ -164,7 +159,7 @@ public class WebExperimentMonitor : ExperimentMonitor {
}
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Node, HostMetrics> = mutableMapOf()
+ private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
private fun processHostEvent(event: HostEvent) {
val slices = event.duration / SLICE_LENGTH
@@ -175,14 +170,14 @@ public class WebExperimentMonitor : ExperimentMonitor {
hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)),
- hostAggregateMetrics.totalFailureSlices + if (event.node.state != NodeState.ACTIVE) slices.toLong() else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (event.node.state != NodeState.ACTIVE) event.vmCount * slices.toLong() else 0
+ hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0
)
- hostMetrics.compute(event.node) { _, prev ->
+ hostMetrics.compute(event.host) { _, prev ->
HostMetrics(
- (event.cpuUsage.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (event.cpuDemand.takeIf { event.node.state == NodeState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
event.vmCount + (prev?.vmCount ?: 0),
1 + (prev?.count ?: 0)
)
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt
deleted file mode 100644
index 996e7700..00000000
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/flow/StateFlow.kt
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.utils.flow
-
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.channels.BroadcastChannel
-import kotlinx.coroutines.channels.ConflatedBroadcastChannel
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.FlowCollector
-import kotlinx.coroutines.flow.asFlow
-
-/**
- * A [Flow] that contains a single value that changes over time.
- *
- * This class exists to implement the DataFlow/StateFlow functionality that will be implemented in `kotlinx-coroutines`
- * in the future, but is not available yet.
- * See: https://github.com/Kotlin/kotlinx.coroutines/pull/1354
- */
-public interface StateFlow<T> : Flow<T> {
- /**
- * The current value of this flow.
- *
- * Setting a value that is [equal][Any.equals] to the previous one does nothing.
- */
- public var value: T
-}
-
-/**
- * Creates a [StateFlow] with a given initial [value].
- */
-@Suppress("FunctionName")
-public fun <T> StateFlow(value: T): StateFlow<T> = StateFlowImpl(value)
-
-/**
- * Internal implementation of the [StateFlow] interface.
- */
-@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
-private class StateFlowImpl<T>(initialValue: T) : StateFlow<T> {
- /**
- * The [BroadcastChannel] to back this flow.
- */
- private val chan = ConflatedBroadcastChannel(initialValue)
-
- /**
- * The internal [Flow] backing this flow.
- */
- private val flow = chan.asFlow()
-
- public override var value: T = initialValue
- set(value) {
- chan.offer(value)
- field = value
- }
-
- @InternalCoroutinesApi
- override suspend fun collect(collector: FlowCollector<T>) = flow.collect(collector)
-}
diff --git a/simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt b/simulator/opendc-workflow/build.gradle.kts
index fc542cef..3cefa409 100644
--- a/simulator/opendc-core/src/main/kotlin/org/opendc/core/User.kt
+++ b/simulator/opendc-workflow/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,14 +20,4 @@
* SOFTWARE.
*/
-package org.opendc.core
-
-/**
- * A user of the cloud network.
- */
-public interface User : Identity {
- /**
- * The name of the user.
- */
- override val name: String
-}
+description = "Workflow orchestration for OpenDC"
diff --git a/simulator/opendc-metal/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts
index 9207de18..d3e67bee 100644
--- a/simulator/opendc-metal/build.gradle.kts
+++ b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-description = "Bare-metal provisioning in OpenDC"
+description = "Workflow orchestration service API for OpenDC"
/* Build configuration */
plugins {
@@ -29,10 +29,7 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
api(project(":opendc-compute:opendc-compute-api"))
- api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
-
implementation("io.github.microutils:kotlin-logging")
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt
index f1cfdf65..5e8b0b9e 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt
+++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,10 +20,8 @@
* SOFTWARE.
*/
-package org.opendc.workflows.workload
+package org.opendc.workflow.api
-import org.opendc.core.User
-import org.opendc.core.workload.Workload
import java.util.*
/**
@@ -31,17 +29,15 @@ import java.util.*
*
* @property uid A unique identified of this workflow.
* @property name The name of this workflow.
- * @property owner The owner of the workflow.
* @property tasks The tasks that are part of this workflow.
* @property metadata Additional metadata for the job.
*/
public data class Job(
- override val uid: UUID,
- override val name: String,
- override val owner: User,
- public val tasks: Set<Task>,
- public val metadata: Map<String, Any> = emptyMap()
-) : Workload {
+ val uid: UUID,
+ val name: String,
+ val tasks: Set<Task>,
+ val metadata: Map<String, Any> = emptyMap()
+) {
override fun equals(other: Any?): Boolean = other is Job && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt
index 4305aa57..db208998 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
+++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt
@@ -22,7 +22,7 @@
* SOFTWARE.
*/
-package org.opendc.workflows.workload
+package org.opendc.workflow.api
/**
* Meta-data key for the deadline of a task.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt
index 4c6d2842..d91f9879 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt
+++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,10 +20,8 @@
* SOFTWARE.
*/
-package org.opendc.workflows.workload
+package org.opendc.workflow.api
-import org.opendc.compute.api.Image
-import org.opendc.core.Identity
import java.util.*
/**
@@ -38,12 +34,11 @@ import java.util.*
* @property metadata Additional metadata for this task.
*/
public data class Task(
- override val uid: UUID,
- override val name: String,
- public val image: Image,
- public val dependencies: Set<Task>,
- public val metadata: Map<String, Any> = emptyMap()
-) : Identity {
+ val uid: UUID,
+ val name: String,
+ val dependencies: Set<Task>,
+ val metadata: Map<String, Any> = emptyMap()
+) {
override fun equals(other: Any?): Boolean = other is Task && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts
index b6a2fc45..12a54235 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2019 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-description = "Workflow service for OpenDC"
+description = "Workflow orchestration service for OpenDC"
/* Build configuration */
plugins {
@@ -30,7 +30,7 @@ plugins {
dependencies {
api(platform(project(":opendc-platform")))
- api(project(":opendc-core"))
+ api(project(":opendc-workflow:opendc-workflow-api"))
api(project(":opendc-compute:opendc-compute-api"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt
index bcf93562..bb2ad6c6 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,11 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service
import org.opendc.trace.core.Event
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.Task
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
/**
* An event emitted by the [WorkflowService].
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
new file mode 100644
index 00000000..2f83e376
--- /dev/null
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service
+
+import kotlinx.coroutines.flow.Flow
+import org.opendc.compute.api.ComputeClient
+import org.opendc.trace.core.EventTracer
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A service for cloud workflow management.
+ *
+ * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al.
+ */
+public interface WorkflowService : AutoCloseable {
+ /**
+ * The events emitted by the workflow scheduler.
+ */
+ public val events: Flow<WorkflowEvent>
+
+ /**
+ * Submit the specified [Job] to the workflow service for scheduling.
+ */
+ public suspend fun submit(job: Job)
+
+ /**
+ * Terminate the lifecycle of the workflow service, stopping all running workflows.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [WorkflowService] implementation.
+ *
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param tracer The event tracer to use.
+ * @param compute The compute client to use.
+ * @param mode The scheduling mode to use.
+ * @param jobAdmissionPolicy The job admission policy to use.
+ * @param jobOrderPolicy The job order policy to use.
+ * @param taskEligibilityPolicy The task eligibility policy to use.
+ * @param taskOrderPolicy The task order policy to use.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ tracer: EventTracer,
+ compute: ComputeClient,
+ mode: WorkflowSchedulerMode,
+ jobAdmissionPolicy: JobAdmissionPolicy,
+ jobOrderPolicy: JobOrderPolicy,
+ taskEligibilityPolicy: TaskEligibilityPolicy,
+ taskOrderPolicy: TaskOrderPolicy
+ ): WorkflowService {
+ return WorkflowServiceImpl(
+ context,
+ clock,
+ tracer,
+ compute,
+ mode,
+ jobAdmissionPolicy,
+ jobOrderPolicy,
+ taskEligibilityPolicy,
+ taskOrderPolicy
+ )
+ }
+ }
+}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
index 89849f6a..1bb67169 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,9 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service.internal
-import org.opendc.workflows.workload.Job
+import org.opendc.workflow.api.Job
public class JobState(public val job: Job, public val submittedAt: Long) {
/**
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt
index ef9714c2..c3ce1492 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service.internal
import org.opendc.compute.api.Server
-import org.opendc.workflows.workload.Task
+import org.opendc.workflow.api.Task
public class TaskState(public val job: JobState, public val task: Task) {
/**
@@ -39,12 +39,12 @@ public class TaskState(public val job: JobState, public val task: Task) {
/**
* The dependencies of this task.
*/
- public val dependencies: HashSet<TaskState> = HashSet<TaskState>()
+ public val dependencies: HashSet<TaskState> = HashSet()
/**
* The dependents of this task.
*/
- public val dependents: HashSet<TaskState> = HashSet<TaskState>()
+ public val dependents: HashSet<TaskState> = HashSet()
/**
* A flag to indicate whether this workflow task instance is a workflow root.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
index 99f5bb87..fe941d09 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service.internal
/**
* The state of a workflow task.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
index 18721889..29c6aeea 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,11 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service.internal
-public interface StageWorkflowSchedulerListener {
- public fun cycleStarted(scheduler: StageWorkflowService) {}
- public fun cycleFinished(scheduler: StageWorkflowService) {}
+public interface WorkflowSchedulerListener {
+ public fun cycleStarted(scheduler: WorkflowServiceImpl) {}
+ public fun cycleFinished(scheduler: WorkflowServiceImpl) {}
public fun jobSubmitted(job: JobState) {}
public fun jobStarted(job: JobState) {}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 6b348ed4..85a88acd 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,10 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service.internal
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
@@ -32,21 +33,24 @@ import org.opendc.compute.api.*
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
-import org.opendc.workflows.service.stage.job.JobAdmissionPolicy
-import org.opendc.workflows.service.stage.job.JobOrderPolicy
-import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy
-import org.opendc.workflows.service.stage.task.TaskOrderPolicy
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.service.*
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
import java.util.*
+import kotlin.coroutines.CoroutineContext
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
* Datacenter Scheduling.
*/
-public class StageWorkflowService(
- internal val coroutineScope: CoroutineScope,
+public class WorkflowServiceImpl(
+ context: CoroutineContext,
internal val clock: Clock,
internal val tracer: EventTracer,
private val computeClient: ComputeClient,
@@ -57,6 +61,11 @@ public class StageWorkflowService(
taskOrderPolicy: TaskOrderPolicy
) : WorkflowService, ServerWatcher {
/**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ internal val scope = CoroutineScope(context)
+
+ /**
* The logger instance to use.
*/
private val logger = KotlinLogging.logger {}
@@ -99,17 +108,17 @@ public class StageWorkflowService(
/**
* The root listener of this scheduler.
*/
- private val rootListener = object : StageWorkflowSchedulerListener {
+ private val rootListener = object : WorkflowSchedulerListener {
/**
* The listeners to delegate to.
*/
- val listeners = mutableSetOf<StageWorkflowSchedulerListener>()
+ val listeners = mutableSetOf<WorkflowSchedulerListener>()
- override fun cycleStarted(scheduler: StageWorkflowService) {
+ override fun cycleStarted(scheduler: WorkflowServiceImpl) {
listeners.forEach { it.cycleStarted(scheduler) }
}
- override fun cycleFinished(scheduler: StageWorkflowService) {
+ override fun cycleFinished(scheduler: WorkflowServiceImpl) {
listeners.forEach { it.cycleFinished(scheduler) }
}
@@ -145,6 +154,7 @@ public class StageWorkflowService(
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
+ private lateinit var image: Image
init {
this.mode = mode(this)
@@ -152,6 +162,9 @@ public class StageWorkflowService(
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
+ scope.launch {
+ image = computeClient.newImage("workflow-runner")
+ }
}
override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
@@ -190,6 +203,10 @@ public class StageWorkflowService(
requestCycle()
}
+ override fun close() {
+ scope.cancel()
+ }
+
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
@@ -214,7 +231,12 @@ public class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job))
+ tracer.commit(
+ WorkflowEvent.JobStarted(
+ this,
+ jobInstance.job
+ )
+ )
rootListener.jobStarted(jobInstance)
}
@@ -258,16 +280,27 @@ public class StageWorkflowService(
val instance = taskQueue.peek()
val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
- val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
- val image = instance.task.image
- coroutineScope.launch {
- val server = computeClient.newServer(instance.task.name, image, flavor)
+ val image = image
+ scope.launch {
+ val flavor = computeClient.newFlavor(
+ instance.task.name,
+ cores,
+ 1000
+ ) // TODO How to determine memory usage for workflow task
+ val server = computeClient.newServer(
+ instance.task.name,
+ image,
+ flavor,
+ start = false,
+ meta = instance.task.metadata
+ )
instance.state = TaskStatus.ACTIVE
instance.server = server
taskByServer[server] = instance
- server.watch(this@StageWorkflowService)
+ server.watch(this@WorkflowServiceImpl)
+ server.start()
}
activeTasks += instance
@@ -278,20 +311,28 @@ public class StageWorkflowService(
public override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
- ServerState.ACTIVE -> {
+ ServerState.PROVISIONING -> {
+ }
+ ServerState.RUNNING -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
tracer.commit(
WorkflowEvent.TaskStarted(
- this@StageWorkflowService,
+ this@WorkflowServiceImpl,
task.job.job,
task.task
)
)
rootListener.taskStarted(task)
}
- ServerState.SHUTOFF, ServerState.ERROR -> {
+ ServerState.TERMINATED, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
+
+ scope.launch {
+ server.delete()
+ server.flavor.delete()
+ }
+
val job = task.job
task.state = TaskStatus.FINISHED
task.finishedAt = clock.millis()
@@ -299,7 +340,7 @@ public class StageWorkflowService(
activeTasks -= task
tracer.commit(
WorkflowEvent.TaskFinished(
- this@StageWorkflowService,
+ this@WorkflowServiceImpl,
task.job.job,
task.task
)
@@ -322,6 +363,8 @@ public class StageWorkflowService(
requestCycle()
}
+ ServerState.DELETED -> {
+ }
else -> throw IllegalStateException()
}
}
@@ -332,11 +375,11 @@ public class StageWorkflowService(
rootListener.jobFinished(job)
}
- public fun addListener(listener: StageWorkflowSchedulerListener) {
+ public fun addListener(listener: WorkflowSchedulerListener) {
rootListener.listeners += listener
}
- public fun removeListener(listener: StageWorkflowSchedulerListener) {
+ public fun removeListener(listener: WorkflowSchedulerListener) {
rootListener.listeners -= listener
}
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt
index d76579f9..359fc223 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,9 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage
+package org.opendc.workflow.service.scheduler
-import org.opendc.workflows.service.StageWorkflowService
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
import java.io.Serializable
/**
@@ -32,5 +32,5 @@ public interface StagePolicy<T : Any> : Serializable {
/**
* Build the logic of the stage policy.
*/
- public operator fun invoke(scheduler: StageWorkflowService): T
+ public operator fun invoke(scheduler: WorkflowServiceImpl): T
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
index cf8f92e0..58e7893f 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,11 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service.scheduler
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
-import org.opendc.workflows.service.stage.StagePolicy
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* The operating mode of a workflow scheduler.
@@ -44,9 +44,9 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
* An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
*/
public object Interactive : WorkflowSchedulerMode() {
- override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
override fun requestCycle() {
- scheduler.coroutineScope.launch { scheduler.schedule() }
+ scheduler.scope.launch { scheduler.schedule() }
}
}
@@ -59,14 +59,14 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
public data class Batch(val quantum: Long) : WorkflowSchedulerMode() {
private var next: kotlinx.coroutines.Job? = null
- override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
override fun requestCycle() {
if (next == null) {
// In batch mode, we assume that the scheduler runs at a fixed slot every time
// quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
val delay = quantum - (scheduler.clock.millis() % quantum)
- val job = scheduler.coroutineScope.launch {
+ val job = scheduler.scope.launch {
delay(delay)
next = null
scheduler.schedule()
@@ -85,12 +85,12 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo
public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() {
private var next: kotlinx.coroutines.Job? = null
- override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
override fun requestCycle() {
if (next == null) {
val delay = random.nextInt(200).toLong()
- val job = scheduler.coroutineScope.launch {
+ val job = scheduler.scope.launch {
delay(delay)
next = null
scheduler.schedule()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
index 1190a408..1b5b91b9 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,21 +20,23 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.workload.Job
-import org.opendc.workflows.workload.Task
-import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [JobOrderPolicy] that orders jobs based on its critical path length.
*/
public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
- object : Comparator<JobState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
+ object :
+ Comparator<JobState>,
+ WorkflowSchedulerListener {
private val results = HashMap<Job, Long>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
index 0e5a42c0..ed3acff7 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.stage.StagePolicy
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.scheduler.StagePolicy
/**
* A policy interface for admitting [JobState]s to a scheduling cycle.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt
index 83d42b2d..adaa6671 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.stage.StagePolicy
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.scheduler.StagePolicy
/**
* A policy interface for ordering admitted workflows in the scheduling queue.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
index 6f6ccb50..6a0bfeb9 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [JobAdmissionPolicy] that limits the amount of active jobs in the system.
@@ -31,7 +31,7 @@ import org.opendc.workflows.service.StageWorkflowService
* @property limit The maximum number of concurrent jobs in the system.
*/
public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy {
- override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
override fun invoke(
job: JobState
): JobAdmissionPolicy.Advice =
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
index ac74f090..31f8f8db 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [JobAdmissionPolicy] that admits all jobs.
*/
public object NullJobAdmissionPolicy : JobAdmissionPolicy {
- override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
index 6c747261..1b359125 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,23 +20,23 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.workload.Job
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
import java.util.*
import kotlin.collections.HashMap
-import kotlin.collections.getValue
-import kotlin.collections.set
/**
* A [JobOrderPolicy] that randomly orders jobs.
*/
public object RandomJobOrderPolicy : JobOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
- object : Comparator<JobState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
+ object :
+ Comparator<JobState>,
+ WorkflowSchedulerListener {
private val random = Random(123)
private val ids = HashMap<Job, Int>()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt
index c1c244c3..6998606d 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has.
*/
public data class SizeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
compareBy { it.tasks.size.let { if (ascending) it else -it } }
override fun toString(): String {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt
index 005f8153..53d06023 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.job
+package org.opendc.workflow.service.scheduler.job
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowService
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [JobOrderPolicy] orders jobs in FIFO order.
*/
public data class SubmissionTimeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
compareBy { it.submittedAt.let { if (ascending) it else -it } }
override fun toString(): String {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
index 6a465746..821d4964 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,19 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system.
*/
public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
- object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
private val active = mutableMapOf<JobState, Int>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
index f3f19ef5..42804f5a 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,12 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
import kotlin.math.max
/**
@@ -36,8 +36,8 @@ import kotlin.math.max
* the average.
*/
public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = 1.5) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
- object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener {
private val active = mutableMapOf<JobState, Int>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
index 0020023f..dae7ad99 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,19 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks.
*/
public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
- object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
private val finished = mutableMapOf<JobState, Int>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
index a9f5eb84..7786f6ec 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has.
*/
public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
it.task.dependencies.size.let { if (ascending) it else -it }
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
index e5a9f159..4fb835d7 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has.
*/
public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
it.dependents.size.let { if (ascending) it else -it }
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
index 7ce8ccce..3a634de7 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,19 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job.
*/
public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
- object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
private val results = HashMap<JobState, MutableList<Long>>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
index 3674eb01..d9fde53a 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
import java.util.*
import kotlin.collections.HashMap
import kotlin.collections.getValue
-import kotlin.collections.minusAssign
import kotlin.collections.set
/**
@@ -37,8 +36,8 @@ import kotlin.collections.set
*/
public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
- object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
private val results = HashMap<UUID, Long>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
index 2dddbc7c..229460df 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,19 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.JobState
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system.
*/
public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
- object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener {
private val active = mutableMapOf<JobState, Int>()
init {
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
index fdc1fd5e..57aa0d58 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskEligibilityPolicy] that limits the total number of active tasks in the system.
*/
public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
override fun invoke(
task: TaskState
): TaskEligibilityPolicy.Advice =
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
index b40f9823..cfe2aeed 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskEligibilityPolicy] that always allows new tasks to enter.
*/
public object NullTaskEligibilityPolicy : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic
private object Logic : TaskEligibilityPolicy.Logic {
override fun invoke(
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
index a0691b23..a01439c2 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 AtLarge Research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,17 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
import java.util.*
/**
* A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability].
*/
public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy {
- override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
val random = Random(123)
override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
index 890e7165..c12d6a66 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,20 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowSchedulerListener
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.workload.Task
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
import kotlin.random.Random
/**
* A [TaskOrderPolicy] that orders the tasks randomly.
*/
public object RandomTaskOrderPolicy : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
- object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
private val random = Random(123)
private val ids = HashMap<Task, Int>()
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
index 6b0199b8..e9bbf815 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,16 +20,16 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
/**
* A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue.
*/
public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
it.job.submittedAt.let { if (ascending) it else -it }
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
index 37597709..ee31aee2 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.service.stage.StagePolicy
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.scheduler.StagePolicy
/**
* A policy interface for determining the eligibility of tasks in a scheduling cycle.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt
index 5feac6d0..fffcb765 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,10 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service.stage.task
+package org.opendc.workflow.service.scheduler.task
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.service.stage.StagePolicy
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.scheduler.StagePolicy
/**
* This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
index 4207cdfd..2161f5f2 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2021 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,10 +20,9 @@
* SOFTWARE.
*/
-package org.opendc.workflows.service
+package org.opendc.workflow.service
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
@@ -38,23 +35,24 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.NumberOfActiveServersAllocationPolicy
-import org.opendc.compute.simulator.SimHostProvisioner
+import org.opendc.compute.simulator.SimHost
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
-import org.opendc.metal.service.ProvisioningService
import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
-import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
-import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
-import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
import kotlin.math.max
/**
- * Integration test suite for the [StageWorkflowService].
+ * Integration test suite for the [WorkflowServiceImpl].
*/
-@DisplayName("StageWorkflowService")
+@DisplayName("WorkflowServiceImpl")
@OptIn(ExperimentalCoroutinesApi::class)
internal class StageWorkflowSchedulerIntegrationTest {
/**
@@ -72,26 +70,27 @@ internal class StageWorkflowSchedulerIntegrationTest {
val clock = DelayControllerClockAdapter(testScope)
val tracer = EventTracer(clock)
- val schedulerAsync = testScope.async {
- val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(testScope, clock) }
-
- val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
-
- // Wait for the bare metal nodes to be spawned
- delay(10)
+ val scheduler = let {
+ val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
+ .use { it.read() }
+ .map { def ->
+ SimHost(
+ def.uid,
+ def.name,
+ def.model,
+ def.meta,
+ testScope.coroutineContext,
+ clock,
+ SimSpaceSharedHypervisorProvider()
+ )
+ }
- val provisioner = SimHostProvisioner(testScope.coroutineContext, bareMetal, SimSpaceSharedHypervisorProvider())
- val hosts = provisioner.provisionAll()
val compute = ComputeService(testScope.coroutineContext, clock, tracer, NumberOfActiveServersAllocationPolicy(), schedulingQuantum = 1000)
hosts.forEach { compute.addHost(it) }
- // Wait for the hypervisors to be spawned
- delay(10)
-
- StageWorkflowService(
- testScope,
+ WorkflowService(
+ testScope.coroutineContext,
clock,
tracer,
compute.newClient(),
@@ -104,7 +103,6 @@ internal class StageWorkflowSchedulerIntegrationTest {
}
testScope.launch {
- val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
when (event) {
@@ -119,13 +117,12 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope.launch {
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
- val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
- val (time, job) = reader.next()
+ val entry = reader.next()
jobsSubmitted++
- delay(max(0, time - clock.millis()))
- scheduler.submit(job)
+ delay(max(0, entry.start - clock.millis()))
+ scheduler.submit(entry.workload)
}
}
diff --git a/simulator/opendc-workflows/src/test/resources/environment.json b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json
index 0965b250..0965b250 100644
--- a/simulator/opendc-workflows/src/test/resources/environment.json
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json
diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml
index 70a0eacc..70a0eacc 100644
--- a/simulator/opendc-workflows/src/test/resources/log4j2.xml
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml
diff --git a/simulator/opendc-workflows/src/test/resources/trace.gwf b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf
index d264b9c3..d264b9c3 100644
--- a/simulator/opendc-workflows/src/test/resources/trace.gwf
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index 7a82adcd..e87dd4d8 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -22,12 +22,11 @@
rootProject.name = "opendc-simulator"
include(":opendc-platform")
-include(":opendc-core")
include(":opendc-compute:opendc-compute-api")
include(":opendc-compute:opendc-compute-service")
include(":opendc-compute:opendc-compute-simulator")
-include(":opendc-metal")
-include(":opendc-workflows")
+include(":opendc-workflow:opendc-workflow-api")
+include(":opendc-workflow:opendc-workflow-service")
include(":opendc-format")
include(":opendc-experiments:opendc-experiments-sc18")
include(":opendc-experiments:opendc-experiments-capelin")