summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorjc0b <j@jc0b.computer>2020-07-21 21:22:44 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-08-24 19:48:14 +0200
commit67b6ec800df8e023efadb60ae5f7919030b19789 (patch)
tree9aa496408a4097857b6a032b84dd0a396321e1d3 /simulator
parent5b4ab37ac7be2b2c34e2fad928b0cd7f3a837263 (diff)
parent04686bf5cef4aea51fd613a158aa8b155763d0e7 (diff)
Merge branch 'master' onto local working copy
Preserve working copy while updating
Diffstat (limited to 'simulator')
-rw-r--r--simulator/.dockerignore10
-rw-r--r--simulator/.editorconfig2
-rw-r--r--simulator/.gitattributes7
-rw-r--r--simulator/.gitignore2
-rw-r--r--simulator/.gitlab-ci.yml34
-rw-r--r--simulator/.travis.yml1
-rw-r--r--simulator/Dockerfile31
-rw-r--r--simulator/build.gradle.kts4
-rw-r--r--simulator/gradle/wrapper/gradle-wrapper.jarbin55190 -> 58910 bytes
-rw-r--r--simulator/gradle/wrapper/gradle-wrapper.properties2
-rwxr-xr-xsimulator/gradlew53
-rw-r--r--simulator/gradlew.bat22
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt20
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt12
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt8
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt2
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt4
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt6
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt8
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt25
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt12
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt14
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt10
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt3
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt7
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt10
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt4
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt16
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt12
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt2
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt2
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt4
-rw-r--r--simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt2
-rw-r--r--simulator/opendc/opendc-runner-web/build.gradle.kts55
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt338
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt187
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt93
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt127
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml52
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt4
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt4
-rw-r--r--simulator/settings.gradle.kts1
60 files changed, 1075 insertions, 183 deletions
diff --git a/simulator/.dockerignore b/simulator/.dockerignore
new file mode 100644
index 00000000..816d338c
--- /dev/null
+++ b/simulator/.dockerignore
@@ -0,0 +1,10 @@
+.git
+
+.idea/
+**/out
+*.iml
+.idea_modules/
+
+.gradle
+**/build/
+
diff --git a/simulator/.editorconfig b/simulator/.editorconfig
index a17544c9..a5584e95 100644
--- a/simulator/.editorconfig
+++ b/simulator/.editorconfig
@@ -4,4 +4,4 @@
# ktlint
[*.{kt, kts}]
-disabled_rules = import-ordering
+disabled_rules = no-wildcard-imports
diff --git a/simulator/.gitattributes b/simulator/.gitattributes
deleted file mode 100644
index 12924725..00000000
--- a/simulator/.gitattributes
+++ /dev/null
@@ -1,7 +0,0 @@
-# https://help.github.com/articles/dealing-with-line-endings/
-#
-# These are explicitly windows files and should use crlf
-*.bat text eol=crlf
-
-# See https://github.com/gradle/gradle/issues/12248
-buildSrc/src/main/**/*.gradle.kts text eol=lf
diff --git a/simulator/.gitignore b/simulator/.gitignore
index 4ec6f778..917f2e6a 100644
--- a/simulator/.gitignore
+++ b/simulator/.gitignore
@@ -38,7 +38,7 @@ data/
# IntelliJ
/out/
.idea/
-*/out
+**/out
*.iml
# mpeltonen/sbt-idea plugin
diff --git a/simulator/.gitlab-ci.yml b/simulator/.gitlab-ci.yml
deleted file mode 100644
index a095f7e7..00000000
--- a/simulator/.gitlab-ci.yml
+++ /dev/null
@@ -1,34 +0,0 @@
-image: gradle:6.1-jdk13
-
-variables:
- GRADLE_OPTS: "-Dorg.gradle.daemon=false"
-
-before_script:
- - export GRADLE_USER_HOME=`pwd`/.gradle
-
-stages:
- - build
- - test
-
-build:
- stage: build
- script:
- - gradle --build-cache assemble
- allow_failure: false
- cache:
- key: "$CI_COMMIT_REF_NAME"
- policy: push
- paths:
- - build
- - .gradle
-
-test:
- stage: test
- script:
- - gradle check
- cache:
- key: "$CI_COMMIT_REF_NAME"
- policy: pull
- paths:
- - build
- - .gradle
diff --git a/simulator/.travis.yml b/simulator/.travis.yml
deleted file mode 100644
index dff5f3a5..00000000
--- a/simulator/.travis.yml
+++ /dev/null
@@ -1 +0,0 @@
-language: java
diff --git a/simulator/Dockerfile b/simulator/Dockerfile
new file mode 100644
index 00000000..e42c7f14
--- /dev/null
+++ b/simulator/Dockerfile
@@ -0,0 +1,31 @@
+FROM openjdk:14-slim AS staging
+MAINTAINER OpenDC Maintainers <opendc@atlarge-research.com>
+
+# Build staging artifacts for dependency caching
+COPY ./ /app
+WORKDIR /app
+RUN mkdir /staging \
+ && cp -r buildSrc/ /staging \
+ && cp gradle.properties /staging 2>/dev/null | true \
+ && find -name "*.gradle.kts" | xargs cp --parents -t /staging
+
+FROM openjdk:14-slim AS builder
+
+# Obtain (cache) Gradle wrapper
+COPY gradlew /app/
+COPY gradle /app/gradle
+WORKDIR /app
+RUN ./gradlew --version
+
+# Install (cache) project dependencies only
+COPY --from=staging /staging/ /app/
+RUN ./gradlew clean build --no-daemon > /dev/null 2>&1 || true
+
+# Build project
+COPY ./ /app/
+RUN ./gradlew --no-daemon :opendc:opendc-runner-web:installDist
+
+FROM openjdk:14-slim
+COPY --from=builder /app/opendc/opendc-runner-web/build/install /app
+WORKDIR /app
+CMD opendc-runner-web/bin/opendc-runner-web
diff --git a/simulator/build.gradle.kts b/simulator/build.gradle.kts
index 90f43749..4775369b 100644
--- a/simulator/build.gradle.kts
+++ b/simulator/build.gradle.kts
@@ -30,7 +30,3 @@ allprojects {
group = "com.atlarge.opendc"
version = "2.0.0"
}
-
-tasks.wrapper {
- gradleVersion = "6.0"
-}
diff --git a/simulator/gradle/wrapper/gradle-wrapper.jar b/simulator/gradle/wrapper/gradle-wrapper.jar
index 87b738cb..62d4c053 100644
--- a/simulator/gradle/wrapper/gradle-wrapper.jar
+++ b/simulator/gradle/wrapper/gradle-wrapper.jar
Binary files differ
diff --git a/simulator/gradle/wrapper/gradle-wrapper.properties b/simulator/gradle/wrapper/gradle-wrapper.properties
index a4b44297..bb8b2fc2 100644
--- a/simulator/gradle/wrapper/gradle-wrapper.properties
+++ b/simulator/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/simulator/gradlew b/simulator/gradlew
index af6708ff..fbd7c515 100755
--- a/simulator/gradlew
+++ b/simulator/gradlew
@@ -1,5 +1,21 @@
#!/usr/bin/env sh
+#
+# Copyright 2015 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
##############################################################################
##
## Gradle start up script for UN*X
@@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-DEFAULT_JVM_OPTS='"-Xmx64m"'
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
@@ -66,6 +82,7 @@ esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
@@ -109,10 +126,11 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin ; then
+# For Cygwin or MSYS, switch paths to Windows format before running java
+if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
@@ -138,19 +156,19 @@ if $cygwin ; then
else
eval `echo args$i`="\"$arg\""
fi
- i=$((i+1))
+ i=`expr $i + 1`
done
case $i in
- (0) set -- ;;
- (1) set -- "$args0" ;;
- (2) set -- "$args0" "$args1" ;;
- (3) set -- "$args0" "$args1" "$args2" ;;
- (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
- (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
- (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
- (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
- (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
- (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+ 0) set -- ;;
+ 1) set -- "$args0" ;;
+ 2) set -- "$args0" "$args1" ;;
+ 3) set -- "$args0" "$args1" "$args2" ;;
+ 4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+ 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+ 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+ 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+ 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+ 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
@@ -159,14 +177,9 @@ save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
-APP_ARGS=$(save "$@")
+APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
-# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
-if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
- cd "$(dirname "$0")"
-fi
-
exec "$JAVACMD" "$@"
diff --git a/simulator/gradlew.bat b/simulator/gradlew.bat
index 6d57edc7..5093609d 100644
--- a/simulator/gradlew.bat
+++ b/simulator/gradlew.bat
@@ -1,3 +1,19 @@
+@rem
+@rem Copyright 2015 the original author or authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem https://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+@rem
+
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@@ -13,8 +29,11 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-set DEFAULT_JVM_OPTS="-Xmx64m"
+set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
@@ -65,6 +84,7 @@ set CMD_LINE_ARGS=%*
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
index 5d9af9ec..0e18f82f 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
@@ -24,6 +24,7 @@
package com.atlarge.odcsim.flow
+import java.util.WeakHashMap
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.InternalCoroutinesApi
@@ -32,7 +33,6 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.consumeAsFlow
-import java.util.WeakHashMap
/**
* A [Flow] that can be used to emit events.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
index 01968cd8..fd0fc836 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.resource.Resource
import com.atlarge.opendc.core.resource.TagContainer
import com.atlarge.opendc.core.services.ServiceRegistry
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A server instance that is running on some physical or virtual machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
index 7cb4c0c5..cb637aea 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt
@@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.metal
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.core.Identity
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
index 41cec291..17d8ee53 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.core.failure.FailureDomain
import com.atlarge.opendc.core.power.Powerable
import com.atlarge.opendc.core.services.AbstractServiceKey
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A driver interface for the management interface of a bare-metal compute node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 6a77415c..a453e459 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -28,10 +28,10 @@ import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.flow.EventFlow
import com.atlarge.odcsim.flow.StateFlow
-import com.atlarge.opendc.compute.core.ProcessingUnit
-import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
@@ -46,6 +46,14 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel
import com.atlarge.opendc.core.power.PowerModel
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
+import java.lang.Exception
+import java.time.Clock
+import java.util.UUID
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
+import kotlin.random.Random
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
@@ -59,15 +67,7 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
import kotlinx.coroutines.withContext
-import java.lang.Exception
-import java.time.Clock
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.random.Random
/**
* A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
index 69b0124d..1e7e351f 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt
@@ -25,8 +25,8 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.core.Identity
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index bd395f0d..607759a8 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -29,9 +29,9 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.core.resource.TagContainer
+import java.util.UUID
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
-import java.util.UUID
/**
* A hypervisor managing the VMs of a node.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 3c41f52e..192db413 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -35,11 +35,15 @@ import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import java.util.UUID
+import kotlin.math.ceil
+import kotlin.math.max
+import kotlin.math.min
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
@@ -55,10 +59,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
-import java.util.UUID
-import kotlin.math.ceil
-import kotlin.math.max
-import kotlin.math.min
/**
* A [VirtDriver] that is backed by a simple hypervisor implementation.
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index 1002d382..b1844f67 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -29,8 +29,8 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.AbstractServiceKey
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A driver interface for a hypervisor running on some host server and communicating with the central compute service to
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index ff4aa3d7..79388bc3 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -11,11 +11,14 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.HypervisorEvent
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException
+import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy
import com.atlarge.opendc.core.services.ServiceKey
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.math.max
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
@@ -27,9 +30,6 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import mu.KotlinLogging
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.math.max
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
index 417db77d..1c7b751c 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImageTest.kt
@@ -24,10 +24,10 @@
package com.atlarge.opendc.compute.core.image
+import java.util.UUID
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import java.util.UUID
/**
* Test suite for [FlopsApplicationImage]
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 071c0626..af9d3421 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -31,6 +31,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
+import java.util.ServiceLoader
+import java.util.UUID
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@@ -39,8 +41,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
-import java.util.UUID
internal class SimpleBareMetalDriverTest {
/**
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index f8bd786e..ed2256c0 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -29,13 +29,13 @@ import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import java.util.ServiceLoader
+import java.util.UUID
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
-import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index ca00fc94..622b185e 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -25,14 +25,16 @@
package com.atlarge.opendc.compute.virt
import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import java.util.ServiceLoader
+import java.util.UUID
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
@@ -43,8 +45,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import java.util.ServiceLoader
-import java.util.UUID
/**
* Basic test-suite for the hypervisor.
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
index 50261db5..f77a581e 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -26,14 +26,14 @@ package com.atlarge.opendc.core.failure
import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.simulationContext
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.ensureActive
-import kotlinx.coroutines.launch
import kotlin.math.exp
import kotlin.math.max
import kotlin.random.Random
import kotlin.random.asJavaRandom
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.ensureActive
+import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects fault in the system which are correlated to each other. Failures do not occur in
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 1b896858..0f62667f 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -25,11 +25,11 @@
package com.atlarge.opendc.core.failure
import com.atlarge.odcsim.simulationContext
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlin.math.ln1p
import kotlin.math.pow
import kotlin.random.Random
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
/**
* A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index b0182ab3..7659b18e 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -38,6 +38,8 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import java.io.File
+import java.util.ServiceLoader
import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
@@ -46,8 +48,6 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import java.io.File
-import java.util.ServiceLoader
/**
* Main entry point of the experiment.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index 677af381..faa68e34 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -48,9 +48,9 @@ import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.choice
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.int
-import mu.KotlinLogging
import java.io.File
import java.io.InputStream
+import mu.KotlinLogging
/**
* The logger for this experiment.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index a70297d2..b09c0dbb 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -45,19 +45,20 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import kotlin.math.ln
+import kotlin.math.max
+import kotlin.random.Random
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging
-import java.io.File
-import kotlin.math.ln
-import kotlin.math.max
-import kotlin.random.Random
/**
* The logger for this experiment.
@@ -209,7 +210,6 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
try {
var submitted = 0
- val finished = Channel<Unit>(Channel.CONFLATED)
while (reader.hasNext()) {
val (time, workload) = reader.next()
@@ -228,17 +228,20 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
if (it is ServerEvent.StateChanged) {
monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
}
-
- delay(1)
- finished.send(Unit)
}
.collect()
}
}
- while (scheduler.finishedVms + scheduler.unscheduledVms != submitted) {
- finished.receive()
- }
+ scheduler.events
+ .takeWhile {
+ when (it) {
+ is VirtProvisioningEvent.MetricsAvailable ->
+ it.inactiveVmCount + it.failedVmCount != submitted
+ }
+ }
+ .collect()
+ delay(1)
} finally {
reader.close()
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 5d1c29e2..1580e4dd 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -38,13 +38,13 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import java.io.File
+import java.util.ServiceLoader
+import kotlin.random.Random
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import mu.KotlinLogging
-import java.io.File
-import java.util.ServiceLoader
-import kotlin.random.Random
/**
* The logger for the experiment scenario.
@@ -106,7 +106,11 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
?.construct(seeder) ?: emptyMap()
val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed)
- val monitor = ParquetExperimentMonitor(this)
+ val monitor = ParquetExperimentMonitor(
+ parent.parent.parent.output,
+ "portfolio_id=${parent.parent.id}/scenario_id=${parent.id}/run_id=$id",
+ parent.parent.parent.bufferSize
+ )
root.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index be60e5b7..b931fef9 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -27,13 +27,12 @@ package com.atlarge.opendc.experiments.sc20.experiment.monitor
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.service.VirtProvisioningEvent
-import com.atlarge.opendc.experiments.sc20.experiment.Run
import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetHostEventWriter
import com.atlarge.opendc.experiments.sc20.telemetry.parquet.ParquetProvisionerEventWriter
-import mu.KotlinLogging
import java.io.File
+import mu.KotlinLogging
/**
* The logger instance to use.
@@ -43,15 +42,14 @@ private val logger = KotlinLogging.logger {}
/**
* An [ExperimentMonitor] that logs the events to a Parquet file.
*/
-class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
- private val partition = "portfolio_id=${run.parent.parent.id}/scenario_id=${run.parent.id}/run_id=${run.id}"
+class ParquetExperimentMonitor(base: File, partition: String, bufferSize: Int) : ExperimentMonitor {
private val hostWriter = ParquetHostEventWriter(
- File(run.parent.parent.parent.output, "host-metrics/$partition/data.parquet"),
- run.parent.parent.parent.bufferSize
+ File(base, "host-metrics/$partition/data.parquet"),
+ bufferSize
)
private val provisionerWriter = ParquetProvisionerEventWriter(
- File(run.parent.parent.parent.output, "provisioner-metrics/$partition/data.parquet"),
- run.parent.parent.parent.bufferSize
+ File(base, "provisioner-metrics/$partition/data.parquet"),
+ bufferSize
)
private val currentHostEvent = mutableMapOf<Server, HostEvent>()
private var startTime = -1L
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
index f59402d5..b446abc8 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/reporter/ConsoleExperimentReporter.kt
@@ -30,6 +30,7 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionL
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
import me.tongfei.progressbar.ProgressBar
import me.tongfei.progressbar.ProgressBarBuilder
+import mu.KotlinLogging
/**
* A reporter that reports the experiment progress to the console.
@@ -46,6 +47,11 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener {
private var total = 0
/**
+ * The logger for this reporter.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The progress bar to keep track of the progress.
*/
private val pb: ProgressBar = ProgressBarBuilder()
@@ -69,6 +75,10 @@ public class ConsoleExperimentReporter : ExperimentExecutionListener {
pb.close()
}
}
+
+ if (result is ExperimentExecutionResult.Failed) {
+ logger.warn(result.throwable) { "Descriptor $descriptor failed" }
+ }
}
override fun executionStarted(descriptor: ExperimentDescriptor) {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
index 0346a7f8..96678abf 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ExperimentScheduler.kt
@@ -49,11 +49,10 @@ interface ExperimentScheduler : Closeable {
*
* @param descriptor The descriptor to execute.
* @param context The context to execute the descriptor in.
- * @return The results of the experiment trial.
*/
suspend operator fun invoke(
descriptor: ExperimentDescriptor,
context: ExperimentExecutionContext
- ): ExperimentExecutionResult
+ )
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
index 31632b8c..a8ee59a8 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/execution/ThreadPoolExperimentScheduler.kt
@@ -25,12 +25,12 @@
package com.atlarge.opendc.experiments.sc20.runner.execution
import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
+import java.util.concurrent.Executors
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.withContext
-import java.util.concurrent.Executors
/**
* An [ExperimentScheduler] that runs experiments using a local thread pool.
@@ -47,7 +47,7 @@ class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().avai
override suspend fun invoke(
descriptor: ExperimentDescriptor,
context: ExperimentExecutionContext
- ): ExperimentExecutionResult = supervisorScope {
+ ) = supervisorScope {
val listener =
object : ExperimentExecutionListener {
override fun descriptorRegistered(descriptor: ExperimentDescriptor) {
@@ -70,10 +70,7 @@ class ThreadPoolExperimentScheduler(parallelism: Int = Runtime.getRuntime().avai
try {
withContext(dispatcher) {
descriptor(newContext)
- ExperimentExecutionResult.Success
}
- } catch (e: Throwable) {
- ExperimentExecutionResult.Failed(e)
} finally {
tickets.release()
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
index 3b80276f..28a19172 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/runner/internal/DefaultExperimentRunner.kt
@@ -30,8 +30,8 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionListener
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionResult
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentScheduler
-import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
+import kotlinx.coroutines.runBlocking
/**
* The default implementation of the [ExperimentRunner] interface.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
index a69bd4b2..e42ac654 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetEventWriter.kt
@@ -25,17 +25,17 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.Event
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import java.io.Closeable
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
/**
* The logging instance to use.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
index 3bc09435..9fa4e0fb 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.HostEvent
+import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import java.io.File
/**
* A Parquet event writer for [HostEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
index 1f3b0472..3d28860c 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetProvisionerEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.ProvisionerEvent
+import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import java.io.File
/**
* A Parquet event writer for [ProvisionerEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
index 1549b8d2..c1724369 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetRunEventWriter.kt
@@ -25,10 +25,10 @@
package com.atlarge.opendc.experiments.sc20.telemetry.parquet
import com.atlarge.opendc.experiments.sc20.telemetry.RunEvent
+import java.io.File
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import java.io.File
/**
* A Parquet event writer for [RunEvent]s.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index 652f7746..f9709b9f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -30,12 +30,12 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import java.util.UUID
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
-import java.io.File
-import java.util.UUID
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
index f6d6e6fd..8b7b222f 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20StreamingParquetTraceReader.kt
@@ -32,6 +32,14 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import java.io.Serializable
+import java.util.SortedSet
+import java.util.TreeSet
+import java.util.UUID
+import java.util.concurrent.ArrayBlockingQueue
+import kotlin.concurrent.thread
+import kotlin.random.Random
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
@@ -41,14 +49,6 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
-import java.io.File
-import java.io.Serializable
-import java.util.SortedSet
-import java.util.TreeSet
-import java.util.UUID
-import java.util.concurrent.ArrayBlockingQueue
-import kotlin.concurrent.thread
-import kotlin.random.Random
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 0877ad52..d6726910 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -25,6 +25,12 @@
package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
+import java.io.BufferedReader
+import java.io.File
+import java.io.FileReader
+import java.util.Random
+import kotlin.math.max
+import kotlin.math.min
import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
@@ -33,12 +39,6 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import java.io.BufferedReader
-import java.io.File
-import java.io.FileReader
-import java.util.Random
-import kotlin.math.max
-import kotlin.math.min
/**
* A script to convert a trace in text format into a Parquet trace.
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
index dd70d4f1..f2a0e627 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
import com.atlarge.opendc.format.trace.TraceEntry
-import mu.KotlinLogging
import kotlin.random.Random
+import mu.KotlinLogging
private val logger = KotlinLogging.logger {}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5ecf7605..a79e9a5a 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -42,6 +42,8 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import java.io.File
+import java.util.ServiceLoader
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
@@ -52,8 +54,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import java.io.File
-import java.util.ServiceLoader
/**
* An integration test suite for the SC20 experiments.
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 5f220ad0..a9aa3337 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -25,9 +25,9 @@
package com.atlarge.opendc.format.environment.sc18
import com.atlarge.odcsim.Domain
-import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index 2a8fefeb..1cabc8bc 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -26,10 +26,10 @@ package com.atlarge.opendc.format.trace.bitbrains
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.core.User
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
index 076274d5..8e34505a 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt
@@ -26,10 +26,10 @@ package com.atlarge.opendc.format.trace.sc20
import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment
import com.atlarge.opendc.compute.core.image.VmImage
-import com.atlarge.opendc.compute.core.workload.VmWorkload
-import com.atlarge.opendc.core.User
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.core.User
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import java.io.BufferedReader
diff --git a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
index 41ad8aba..94e4b0fc 100644
--- a/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
+++ b/simulator/opendc/opendc-format/src/test/kotlin/com/atlarge/opendc/format/trace/swf/SwfTraceReaderTest.kt
@@ -1,8 +1,8 @@
package com.atlarge.opendc.format.trace.swf
+import java.io.File
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import java.io.File
class SwfTraceReaderTest {
@Test
diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts
new file mode 100644
index 00000000..6f725de1
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/build.gradle.kts
@@ -0,0 +1,55 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Experiment runner for OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-convention`
+ application
+}
+
+application {
+ mainClassName = "com.atlarge.opendc.runner.web.MainKt"
+}
+
+dependencies {
+ api(project(":opendc:opendc-core"))
+ implementation(project(":opendc:opendc-compute"))
+ implementation(project(":opendc:opendc-format"))
+ implementation(project(":opendc:opendc-experiments-sc20"))
+
+ implementation("com.github.ajalt:clikt:2.8.0")
+ implementation("io.github.microutils:kotlin-logging:1.7.10")
+
+ implementation("org.mongodb:mongodb-driver-sync:4.0.5")
+ implementation("org.apache.spark:spark-sql_2.12:3.0.0") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ }
+
+ runtimeOnly(project(":odcsim:odcsim-engine-omega"))
+ runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
+ runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1")
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
new file mode 100644
index 00000000..0ff9b870
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -0,0 +1,338 @@
+package com.atlarge.opendc.runner.web
+
+import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.opendc.compute.virt.service.allocation.*
+import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
+import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner
+import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
+import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
+import com.atlarge.opendc.experiments.sc20.experiment.processTrace
+import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
+import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
+import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.int
+import com.mongodb.MongoClientSettings
+import com.mongodb.MongoCredential
+import com.mongodb.ServerAddress
+import com.mongodb.client.MongoClients
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.MongoDatabase
+import com.mongodb.client.model.Filters
+import java.io.File
+import java.util.*
+import kotlin.random.Random
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import mu.KotlinLogging
+import org.bson.Document
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * The provider for the simulation engine to use.
+ */
+private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
+
+/**
+ * Represents the CLI command for starting the OpenDC web runner.
+ */
+class RunnerCli : CliktCommand(name = "runner") {
+ /**
+ * The name of the database to use.
+ */
+ private val mongoDb by option(
+ "--mongo-db",
+ help = "name of the database to use",
+ envvar = "OPENDC_DB"
+ )
+ .default("opendc")
+
+ /**
+ * The database host to connect to.
+ */
+ private val mongoHost by option(
+ "--mongo-host",
+ help = "database host to connect to",
+ envvar = "OPENDC_DB_HOST"
+ )
+ .default("localhost")
+
+ /**
+ * The database port to connect to.
+ */
+ private val mongoPort by option(
+ "--mongo-port",
+ help = "database port to connect to",
+ envvar = "OPENDC_DB_PORT"
+ )
+ .int()
+ .default(27017)
+
+ /**
+ * The database user to connect with.
+ */
+ private val mongoUser by option(
+ "--mongo-user",
+ help = "database user to connect with",
+ envvar = "OPENDC_DB_USER"
+ )
+ .default("opendc")
+
+ /**
+ * The database password to connect with.
+ */
+ private val mongoPassword by option(
+ "--mongo-password",
+ help = "database password to connect with",
+ envvar = "OPENDC_DB_PASSWORD"
+ )
+ .convert { it.toCharArray() }
+ .required()
+
+ /**
+ * The path to the traces directory.
+ */
+ private val tracePath by option(
+ "--traces",
+ help = "path to the directory containing the traces",
+ envvar = "OPENDC_TRACES"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("traces/") }
+
+ /**
+ * The path to the output directory.
+ */
+ private val outputPath by option(
+ "--output",
+ help = "path to the results directory",
+ envvar = "OPENDC_OUTPUT"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("results/") }
+
+ /**
+ * The Spark master to connect to.
+ */
+ private val spark by option(
+ "--spark",
+ help = "Spark master to connect to",
+ envvar = "OPENDC_SPARK"
+ )
+ .default("local[*]")
+
+ /**
+ * Connect to the user-specified database.
+ */
+ private fun createDatabase(): MongoDatabase {
+ val credential = MongoCredential.createScramSha1Credential(
+ mongoUser,
+ mongoDb,
+ mongoPassword
+ )
+
+ val settings = MongoClientSettings.builder()
+ .credential(credential)
+ .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) }
+ .build()
+ val client = MongoClients.create(settings)
+ return client.getDatabase(mongoDb)
+ }
+
+ /**
+ * Run a single scenario.
+ */
+ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) {
+ val id = scenario.getString("_id")
+
+ logger.info { "Constructing performance interference model" }
+
+ val traceDir = File(
+ tracePath,
+ scenario.getEmbedded(listOf("trace", "traceId"), String::class.java)
+ )
+ val traceReader = Sc20RawParquetTraceReader(traceDir)
+ val performanceInterferenceReader = let {
+ val path = File(traceDir, "performance-interference-model.json")
+ val operational = scenario.get("operational", Document::class.java)
+ val enabled = operational.getBoolean("performanceInterferenceEnabled")
+
+ if (!enabled || !path.exists()) {
+ return@let null
+ }
+
+ path.inputStream().use { Sc20PerformanceInterferenceReader(it) }
+ }
+
+ val targets = portfolio.get("targets", Document::class.java)
+
+ repeat(targets.getInteger("repeatsPerScenario")) {
+ logger.info { "Starting repeat $it" }
+ runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader)
+ }
+
+ logger.info { "Finished simulation for scenario $id" }
+ }
+
+ /**
+ * Run a single repeat.
+ */
+ private suspend fun runRepeat(
+ scenario: Document,
+ repeat: Int,
+ topologies: MongoCollection<Document>,
+ traceReader: Sc20RawParquetTraceReader,
+ performanceInterferenceReader: Sc20PerformanceInterferenceReader?
+ ) {
+ val id = scenario.getString("_id")
+ val seed = repeat
+ val traceDocument = scenario.get("trace", Document::class.java)
+ val workloadName = traceDocument.getString("traceId")
+ val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
+
+ val seeder = Random(seed)
+ val system = provider("experiment-$id")
+ val root = system.newDomain("root")
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val operational = scenario.get("operational", Document::class.java)
+ val allocationPolicy =
+ when (val policyName = operational.getString("schedulerName")) {
+ "mem" -> AvailableMemoryAllocationPolicy()
+ "mem-inv" -> AvailableMemoryAllocationPolicy(true)
+ "core-mem" -> AvailableCoreMemoryAllocationPolicy()
+ "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
+ "active-servers" -> NumberOfActiveServersAllocationPolicy()
+ "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
+ "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
+ "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
+ "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap()
+ val trace = Sc20ParquetTraceReader(
+ listOf(traceReader),
+ performanceInterferenceModel,
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java)
+ val environment = TopologyParser(topologies, topologyId)
+ val monitor = ParquetExperimentMonitor(
+ outputPath,
+ "scenario_id=$id/run_id=$repeat",
+ 4096
+ )
+
+ root.launch {
+ val (bareMetalProvisioner, scheduler) = createProvisioner(
+ root,
+ environment,
+ allocationPolicy
+ )
+
+ val failureDomain = if (operational.getBoolean("failuresEnabled")) {
+ logger.debug("ENABLING failures")
+ createFailureDomain(
+ seeder.nextInt(),
+ operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
+ bareMetalProvisioner,
+ chan
+ )
+ } else {
+ null
+ }
+
+ attachMonitor(scheduler, monitor)
+ processTrace(
+ trace,
+ scheduler,
+ chan,
+ monitor,
+ emptyMap()
+ )
+
+ logger.debug("SUBMIT=${scheduler.submittedVms}")
+ logger.debug("FAIL=${scheduler.unscheduledVms}")
+ logger.debug("QUEUED=${scheduler.queuedVms}")
+ logger.debug("RUNNING=${scheduler.runningVms}")
+ logger.debug("FINISHED=${scheduler.finishedVms}")
+
+ failureDomain?.cancel()
+ scheduler.terminate()
+ }
+
+ try {
+ system.run()
+ } finally {
+ system.terminate()
+ monitor.close()
+ }
+ }
+
+ override fun run() = runBlocking(Dispatchers.Default) {
+ logger.info { "Starting OpenDC web runner" }
+ logger.info { "Connecting to MongoDB instance" }
+ val database = createDatabase()
+ val manager = ScenarioManager(database.getCollection("scenarios"))
+ val portfolios = database.getCollection("portfolios")
+ val topologies = database.getCollection("topologies")
+
+ logger.info { "Launching Spark" }
+ val resultProcessor = ResultProcessor(spark, outputPath)
+
+ logger.info { "Watching for queued scenarios" }
+
+ while (true) {
+ val scenario = manager.findNext()
+
+ if (scenario == null) {
+ delay(5000)
+ continue
+ }
+
+ val id = scenario.getString("_id")
+
+ logger.info { "Found queued scenario $id: attempting to claim" }
+
+ if (!manager.claim(id)) {
+ logger.info { "Failed to claim scenario" }
+ continue
+ }
+
+ coroutineScope {
+ // Launch heartbeat process
+ launch {
+ delay(60000)
+ manager.heartbeat(id)
+ }
+
+ try {
+ val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!!
+ runScenario(portfolio, scenario, topologies)
+
+ logger.info { "Starting result processing" }
+
+ val result = resultProcessor.process(id)
+ manager.finish(id, result)
+
+ logger.info { "Successfully finished scenario $id" }
+ } catch (e: Exception) {
+ logger.warn(e) { "Scenario failed to finish" }
+ manager.fail(id)
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Main entry point of the runner.
+ */
+fun main(args: Array<String>) = RunnerCli().main(args)
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
new file mode 100644
index 00000000..39092653
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
@@ -0,0 +1,187 @@
+package com.atlarge.opendc.runner.web
+
+import java.io.File
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.*
+
+/**
+ * A helper class for processing the experiment results using Apache Spark.
+ */
+class ResultProcessor(private val master: String, private val outputPath: File) {
+ /**
+ * Process the results of the scenario with the given [id].
+ */
+ fun process(id: String): Result {
+ val spark = SparkSession.builder()
+ .master(master)
+ .appName("opendc-simulator-$id")
+ .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver
+ .orCreate
+
+ try {
+ val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path)
+ val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path)
+ val res = aggregate(hostMetrics, provisionerMetrics).first()
+
+ return Result(
+ res.getList<Long>(1),
+ res.getList<Long>(2),
+ res.getList<Long>(3),
+ res.getList<Long>(4),
+ res.getList<Double>(5),
+ res.getList<Double>(6),
+ res.getList<Double>(7),
+ res.getList<Int>(8),
+ res.getList<Long>(9),
+ res.getList<Long>(10),
+ res.getList<Long>(11),
+ res.getList<Int>(12),
+ res.getList<Int>(13),
+ res.getList<Int>(14),
+ res.getList<Int>(15)
+ )
+ } finally {
+ spark.close()
+ }
+ }
+
+ data class Result(
+ val totalRequestedBurst: List<Long>,
+ val totalGrantedBurst: List<Long>,
+ val totalOvercommittedBurst: List<Long>,
+ val totalInterferedBurst: List<Long>,
+ val meanCpuUsage: List<Double>,
+ val meanCpuDemand: List<Double>,
+ val meanNumDeployedImages: List<Double>,
+ val maxNumDeployedImages: List<Int>,
+ val totalPowerDraw: List<Long>,
+ val totalFailureSlices: List<Long>,
+ val totalFailureVmSlices: List<Long>,
+ val totalVmsSubmitted: List<Int>,
+ val totalVmsQueued: List<Int>,
+ val totalVmsFinished: List<Int>,
+ val totalVmsFailed: List<Int>
+ )
+
+ /**
+ * Perform aggregation of the experiment results.
+ */
+ private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> {
+ // Extrapolate the duration of the entries to span the entire trace
+ val hostMetricsExtra = hostMetrics
+ .withColumn("slice_counts", floor(col("duration") / lit(sliceLength)))
+ .withColumn("power_draw", col("power_draw") * col("slice_counts"))
+ .withColumn("state_int", states[col("state")])
+ .withColumn("state_opposite_int", oppositeStates[col("state")])
+ .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int"))
+ .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts"))
+ .withColumn("failure_slice_count", col("slice_counts") * col("state_int"))
+ .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count"))
+
+ // Process all data in a single run
+ val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id")
+
+ // Aggregate the summed total metrics
+ val systemMetrics = hostMetricsGrouped.agg(
+ sum("requested_burst").alias("total_requested_burst"),
+ sum("granted_burst").alias("total_granted_burst"),
+ sum("overcommissioned_burst").alias("total_overcommitted_burst"),
+ sum("interfered_burst").alias("total_interfered_burst"),
+ sum("power_draw").alias("total_power_draw"),
+ sum("failure_slice_count").alias("total_failure_slices"),
+ sum("failure_vm_slice_count").alias("total_failure_vm_slices")
+ )
+
+ // Aggregate metrics per host
+ val hvMetrics = hostMetrics
+ .groupBy("run_id", "host_id")
+ .agg(
+ sum("cpu_usage").alias("mean_cpu_usage"),
+ sum("cpu_demand").alias("mean_cpu_demand"),
+ avg("vm_count").alias("mean_num_deployed_images"),
+ count(lit(1)).alias("num_rows")
+ )
+ .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows"))
+ .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows"))
+ .groupBy("run_id")
+ .agg(
+ avg("mean_cpu_usage").alias("mean_cpu_usage"),
+ avg("mean_cpu_demand").alias("mean_cpu_demand"),
+ avg("mean_num_deployed_images").alias("mean_num_deployed_images"),
+ max("mean_num_deployed_images").alias("max_num_deployed_images")
+ )
+
+ // Group the provisioner metrics per run
+ val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id")
+
+ // Aggregate the provisioner metrics
+ val provisionerMetricsAggregated = provisionerMetricsGrouped.agg(
+ max("vm_total_count").alias("total_vms_submitted"),
+ max("vm_waiting_count").alias("total_vms_queued"),
+ max("vm_active_count").alias("total_vms_running"),
+ max("vm_inactive_count").alias("total_vms_finished"),
+ max("vm_failed_count").alias("total_vms_failed")
+ )
+
+ // Join the results into a single data frame
+ return systemMetrics
+ .join(hvMetrics, "run_id")
+ .join(provisionerMetricsAggregated, "run_id")
+ .select(
+ col("total_requested_burst"),
+ col("total_granted_burst"),
+ col("total_overcommitted_burst"),
+ col("total_interfered_burst"),
+ col("mean_cpu_usage"),
+ col("mean_cpu_demand"),
+ col("mean_num_deployed_images"),
+ col("max_num_deployed_images"),
+ col("total_power_draw"),
+ col("total_failure_slices"),
+ col("total_failure_vm_slices"),
+ col("total_vms_submitted"),
+ col("total_vms_queued"),
+ col("total_vms_finished"),
+ col("total_vms_failed")
+ )
+ .groupBy(lit(1))
+ .agg(
+ // TODO Check if order of values is correct
+ collect_list(col("total_requested_burst")).alias("total_requested_burst"),
+ collect_list(col("total_granted_burst")).alias("total_granted_burst"),
+ collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"),
+ collect_list(col("total_interfered_burst")).alias("total_interfered_burst"),
+ collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"),
+ collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"),
+ collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"),
+ collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"),
+ collect_list(col("total_power_draw")).alias("total_power_draw"),
+ collect_list(col("total_failure_slices")).alias("total_failure_slices"),
+ collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"),
+ collect_list(col("total_vms_submitted")).alias("total_vms_submitted"),
+ collect_list(col("total_vms_queued")).alias("total_vms_queued"),
+ collect_list(col("total_vms_finished")).alias("total_vms_finished"),
+ collect_list(col("total_vms_failed")).alias("total_vms_failed")
+ )
+ }
+
+ // Spark helper functions
+ operator fun Column.times(other: Column): Column = `$times`(other)
+ operator fun Column.div(other: Column): Column = `$div`(other)
+ operator fun Column.get(other: Column): Column = this.apply(other)
+
+ val sliceLength = 5 * 60 * 1000
+ val states = map(
+ lit("ERROR"), lit(1),
+ lit("ACTIVE"), lit(0),
+ lit("SHUTOFF"), lit(0)
+ )
+ val oppositeStates = map(
+ lit("ERROR"), lit(0),
+ lit("ACTIVE"), lit(1),
+ lit("SHUTOFF"), lit(1)
+ )
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
new file mode 100644
index 00000000..40ffd282
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
@@ -0,0 +1,93 @@
+package com.atlarge.opendc.runner.web
+
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Updates
+import java.time.Instant
+import org.bson.Document
+
+/**
+ * Manages the queue of scenarios that need to be processed.
+ */
+class ScenarioManager(private val collection: MongoCollection<Document>) {
+ /**
+ * Find the next scenario that the simulator needs to process.
+ */
+ fun findNext(): Document? {
+ return collection
+ .find(Filters.eq("simulation.state", "QUEUED"))
+ .first()
+ }
+
+ /**
+ * Claim the scenario in the database with the specified id.
+ */
+ fun claim(id: String): Boolean {
+ val res = collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "QUEUED")
+ ),
+ Updates.combine(
+ Updates.set("simulation.state", "RUNNING"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ )
+ return res != null
+ }
+
+ /**
+ * Update the heartbeat of the specified scenario.
+ */
+ fun heartbeat(id: String) {
+ collection.findOneAndUpdate(
+ Filters.and(
+ Filters.eq("_id", id),
+ Filters.eq("simulation.state", "RUNNING")
+ ),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ }
+
+ /**
+ * Mark the scenario as failed.
+ */
+ fun fail(id: String) {
+ collection.findOneAndUpdate(
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FAILED"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
+ )
+ }
+
+ /**
+ * Persist the specified results.
+ */
+ fun finish(id: String, result: ResultProcessor.Result) {
+ collection.findOneAndUpdate(
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FINISHED"),
+ Updates.unset("simulation.time"),
+ Updates.set("results.total_requested_burst", result.totalRequestedBurst),
+ Updates.set("results.total_granted_burst", result.totalGrantedBurst),
+ Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst),
+ Updates.set("results.total_interfered_burst", result.totalInterferedBurst),
+ Updates.set("results.mean_cpu_usage", result.meanCpuUsage),
+ Updates.set("results.mean_cpu_demand", result.meanCpuDemand),
+ Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages),
+ Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
+ Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
+ Updates.set("results.total_power_draw", result.totalPowerDraw),
+ Updates.set("results.total_failure_slices", result.totalFailureSlices),
+ Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices),
+ Updates.set("results.total_vms_submitted", result.totalVmsSubmitted),
+ Updates.set("results.total_vms_queued", result.totalVmsQueued),
+ Updates.set("results.total_vms_finished", result.totalVmsFinished),
+ Updates.set("results.total_vms_failed", result.totalVmsFailed)
+ )
+ )
+ }
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
new file mode 100644
index 00000000..499585ec
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -0,0 +1,127 @@
+package com.atlarge.opendc.runner.web
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.opendc.compute.core.MemoryUnit
+import com.atlarge.opendc.compute.core.ProcessingNode
+import com.atlarge.opendc.compute.core.ProcessingUnit
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
+import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService
+import com.atlarge.opendc.core.Environment
+import com.atlarge.opendc.core.Platform
+import com.atlarge.opendc.core.Zone
+import com.atlarge.opendc.core.services.ServiceRegistry
+import com.atlarge.opendc.format.environment.EnvironmentReader
+import com.mongodb.client.AggregateIterable
+import com.mongodb.client.MongoCollection
+import com.mongodb.client.model.Aggregates
+import com.mongodb.client.model.Field
+import com.mongodb.client.model.Filters
+import com.mongodb.client.model.Projections
+import java.util.*
+import kotlinx.coroutines.launch
+import org.bson.Document
+
+/**
+ * A helper class that converts the MongoDB topology into an OpenDC environment.
+ */
+class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader {
+ /**
+ * Parse the topology with the specified [id].
+ */
+ override suspend fun construct(dom: Domain): Environment {
+ val nodes = mutableListOf<SimpleBareMetalDriver>()
+ val random = Random(0)
+
+ for (machine in fetchMachines(id)) {
+ val machineId = machine.getString("_id")
+ val clusterId = machine.getString("rack_id")
+ val position = machine.getInteger("position")
+
+ val processors = machine.getList("cpus", Document::class.java).flatMap { cpu ->
+ val cores = cpu.getInteger("numberOfCores")
+ val speed = cpu.get("clockRateMhz", Number::class.java).toDouble()
+ // TODO Remove hardcoding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.getList("memories", Document::class.java).map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.getString("name"),
+ memory.get("speedMbPerS", Number::class.java).toDouble(),
+ memory.get("sizeMb", Number::class.java).toLong()
+ )
+ }
+ nodes.add(
+ SimpleBareMetalDriver(
+ dom.newDomain(machineId),
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf(NODE_CLUSTER to clusterId),
+ processors,
+ memoryUnits,
+ // For now we assume a simple linear load model with an idle draw of ~200W and a maximum
+ // power draw of 350W.
+ // Source: https://stackoverflow.com/questions/6128960
+ LinearLoadPowerModel(200.0, 350.0)
+ )
+ )
+ }
+
+ val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner"))
+ dom.launch {
+ for (node in nodes) {
+ provisioningService.create(node)
+ }
+ }
+
+ val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService)
+
+ val platform = Platform(
+ UUID.randomUUID(), "opendc-platform", listOf(
+ Zone(UUID.randomUUID(), "zone", serviceRegistry)
+ )
+ )
+
+ return Environment(fetchName(id), null, listOf(platform))
+ }
+
+ override fun close() {}
+
+ /**
+ * Fetch the metadata of the topology.
+ */
+ private fun fetchName(id: String): String {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.include("name"))
+ )
+ )
+ .first()!!
+ .getString("name")
+ }
+
+ /**
+ * Fetch a topology from the database with the specified [id].
+ */
+ private fun fetchMachines(id: String): AggregateIterable<Document> {
+ return collection.aggregate(
+ listOf(
+ Aggregates.match(Filters.eq("_id", id)),
+ Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))),
+ Aggregates.unwind("\$racks"),
+ Aggregates.unwind("\$racks"),
+ Aggregates.replaceRoot("\$racks"),
+ Aggregates.addFields(Field("machines.rack_id", "\$_id")),
+ Aggregates.unwind("\$machines"),
+ Aggregates.replaceRoot("\$machines")
+ )
+ )
+ }
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
new file mode 100644
index 00000000..1d873554
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ MIT License
+ ~
+ ~ Copyright (c) 2020 atlarge-research
+ ~
+ ~ Permission is hereby granted, free of charge, to any person obtaining a copy
+ ~ of this software and associated documentation files (the "Software"), to deal
+ ~ in the Software without restriction, including without limitation the rights
+ ~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ ~ copies of the Software, and to permit persons to whom the Software is
+ ~ furnished to do so, subject to the following conditions:
+ ~
+ ~ The above copyright notice and this permission notice shall be included in all
+ ~ copies or substantial portions of the Software.
+ ~
+ ~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ ~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ ~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ ~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ ~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ ~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ ~ SOFTWARE.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%highlight{%-5level}] %logger{36} - %msg%n" disableAnsi="false" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="com.atlarge.odcsim" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="com.atlarge.opendc.runner" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.hadoop" level="warn" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.spark" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Root level="error">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index 7c7990e2..1193f7b2 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -39,13 +39,13 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
+import java.util.PriorityQueue
+import java.util.Queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
-import java.util.PriorityQueue
-import java.util.Queue
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
index ad818dde..a60ba0e2 100644
--- a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -26,8 +26,8 @@ package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.core.services.AbstractServiceKey
import com.atlarge.opendc.workflows.workload.Job
-import kotlinx.coroutines.flow.Flow
import java.util.UUID
+import kotlinx.coroutines.flow.Flow
/**
* A service for cloud workflow management.
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 5ee6d5e6..5c129e37 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,6 +35,8 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
+import java.util.ServiceLoader
+import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -44,8 +46,6 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
-import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts
index 677a9817..9411d882 100644
--- a/simulator/settings.gradle.kts
+++ b/simulator/settings.gradle.kts
@@ -31,3 +31,4 @@ include(":opendc:opendc-format")
include(":opendc:opendc-workflows")
include(":opendc:opendc-experiments-sc18")
include(":opendc:opendc-experiments-sc20")
+include(":opendc:opendc-runner-web")