summaryrefslogtreecommitdiff
path: root/opendc-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 20:57:51 +0200
committerGitHub <noreply@github.com>2021-10-25 20:57:51 +0200
commita8e2d460a3b6803845687585ae0b34e67a9445a3 (patch)
tree6249023f8f0d56392400c7ebb72238ee848f740a /opendc-compute
parentb4bf7268cbb6d22d3966f469a6b7721b04d91907 (diff)
parent86c65e875b7dde8872dc81a37aa9dca72eee7782 (diff)
merge: Improve the OpenDC compute model (#37)
This pull request contains various improvements to the OpenDC compute simulation model. - Support filtering hosts based on CPU capacity - Do not allocate lambda in fast-path - Redesign VM interference algorithm - Report provisioning time of virtual machines - Prevent allocations during collection cycle - Use correct flow input capacity for counters - Support running workloads without coroutines **Breaking API Changes** - `VirtualMachine` now requires `cpuCapacity` parameter. - `VmInterferenceModel` needs to be constructed using `VmInterferenceModel.Builder` and can't be passed a list of groups anymore. - Scheduling latency is not collected anymore. Instead, use the boot time and provisioning time to derive the scheduling latency. - Telemetry data is recorded using `*TableReader` interfaces as opposed to the `*Data` classes. These classes are re-used per row and should not be shared with other threads, since the underlying data may change. - `SimMachine` does not implement `AutoCloseable` anymore. Machines can be removed from a `SimHypervisor` using the `removeMachine` method. - `SimMachine.run` is moved to an extension method called `runWorkload`. Users can now also choose to run a workload using the asynchronous `SimMachine.startWorkload`.
Diffstat (limited to 'opendc-compute')
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt22
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt2
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt40
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt16
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt65
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt66
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt28
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt13
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt3
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt4
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt18
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt23
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt15
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt68
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt128
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt (renamed from opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt)18
23 files changed, 403 insertions, 212 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
index fc092a3f..f3b94e3d 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt
@@ -25,7 +25,12 @@ package org.opendc.compute.service.driver
/**
* Describes the static machine properties of the host.
*
+ * @property cpuCapacity The total CPU capacity of the host in MHz.
* @property cpuCount The number of logical processing cores available for this host.
- * @property memorySize The amount of memory available for this host in MB.
+ * @property memoryCapacity The amount of memory available for this host in MB.
*/
-public data class HostModel(public val cpuCount: Int, public val memorySize: Long)
+public data class HostModel(
+ public val cpuCapacity: Double,
+ public val cpuCount: Int,
+ public val memoryCapacity: Long
+)
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 57e70fcd..292feabe 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -26,6 +26,7 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.api.metrics.ObservableLongMeasurement
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.api.*
@@ -173,6 +174,12 @@ internal class ComputeServiceImpl(
result.observe(available, upState)
result.observe(total - available, downState)
}
+
+ meter.gaugeBuilder("system.time.provision")
+ .setDescription("The most recent timestamp where the server entered a provisioned state")
+ .setUnit("1")
+ .ofLongs()
+ .buildWithCallback(::collectProvisionTime)
}
override fun newClient(): ComputeClient {
@@ -298,7 +305,7 @@ internal class ComputeServiceImpl(
val hv = HostView(host)
maxCores = max(maxCores, host.model.cpuCount)
- maxMemory = max(maxMemory, host.model.memorySize)
+ maxMemory = max(maxMemory, host.model.memoryCapacity)
hostToView[host] = hv
if (host.state == HostState.UP) {
@@ -324,8 +331,10 @@ internal class ComputeServiceImpl(
internal fun schedule(server: InternalServer): SchedulingRequest {
logger.debug { "Enqueueing server ${server.uid} to be assigned to host." }
+ val now = clock.millis()
+ val request = SchedulingRequest(server, now)
- val request = SchedulingRequest(server, clock.millis())
+ server.lastProvisioningTimestamp = now
queue.add(request)
_serversPending.add(1)
requestSchedulingCycle()
@@ -501,4 +510,13 @@ internal class ComputeServiceImpl(
requestSchedulingCycle()
}
}
+
+ /**
+ * Collect the timestamp when each server entered its provisioning state most recently.
+ */
+ private fun collectProvisionTime(result: ObservableLongMeasurement) {
+ for ((_, server) in servers) {
+ result.observe(server.lastProvisioningTimestamp, server.attributes)
+ }
+ }
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
index e2f33f11..0876209a 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt
@@ -37,7 +37,7 @@ public class HostView(public val host: Host) {
get() = host.uid
public var instanceCount: Int = 0
- public var availableMemory: Long = host.model.memorySize
+ public var availableMemory: Long = host.model.memoryCapacity
public var provisionedCores: Int = 0
override fun toString(): String = "HostView[host=$host]"
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
index 05a7e1bf..f1b92c66 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt
@@ -55,7 +55,7 @@ internal class InternalServer(
/**
* The attributes of a server.
*/
- internal val attributes: Attributes = Attributes.builder()
+ @JvmField internal val attributes: Attributes = Attributes.builder()
.put(ResourceAttributes.HOST_NAME, name)
.put(ResourceAttributes.HOST_ID, uid.toString())
.put(ResourceAttributes.HOST_TYPE, flavor.name)
@@ -70,7 +70,12 @@ internal class InternalServer(
/**
* The [Host] that has been assigned to host the server.
*/
- internal var host: Host? = null
+ @JvmField internal var host: Host? = null
+
+ /**
+ * The most recent timestamp when the server entered a provisioning state.
+ */
+ @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE
/**
* The current scheduling request.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
index a470a453..8a7a646c 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt
@@ -34,7 +34,7 @@ public class RamFilter(private val allocationRatio: Double) : HostFilter {
override fun test(host: HostView, server: Server): Boolean {
val requested = server.flavor.memorySize
val available = host.availableMemory
- val total = host.host.model.memorySize
+ val total = host.host.model.memoryCapacity
// Do not allow an instance to overcommit against itself, only against
// other instances.
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt
new file mode 100644
index 00000000..791710c8
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.filters
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Server] and the available
+ * capacity on the host.
+ */
+public class VCpuCapacityFilter : HostFilter {
+ override fun test(host: HostView, server: Server): Boolean {
+ val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double
+ val hostModel = host.host.model
+ val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount
+
+ return requiredCapacity == null || availableCapacity >= (requiredCapacity / server.flavor.cpuCount)
+ }
+}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt
new file mode 100644
index 00000000..a86226e2
--- /dev/null
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.service.scheduler.weights
+
+import org.opendc.compute.api.Server
+import org.opendc.compute.service.internal.HostView
+
+/**
+ * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity.
+ */
+public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher {
+
+ override fun getWeight(host: HostView, server: Server): Double {
+ val model = host.host.model
+ val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0
+ return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.cpuCount
+ }
+
+ override fun toString(): String = "VCpuWeigher"
+}
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index 564f9493..7b8d0fe2 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -125,7 +125,7 @@ internal class ComputeServiceTest {
fun testAddHost() = scope.runBlockingSimulation {
val host = mockk<Host>(relaxUnitFun = true)
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
assertEquals(0, service.hostCount)
@@ -147,7 +147,7 @@ internal class ComputeServiceTest {
fun testAddHostDouble() = scope.runBlockingSimulation {
val host = mockk<Host>(relaxUnitFun = true)
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.DOWN
assertEquals(0, service.hostCount)
@@ -216,7 +216,7 @@ internal class ComputeServiceTest {
fun testServerCannotFitOnHost() = scope.runBlockingSimulation {
val host = mockk<Host>(relaxUnitFun = true)
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns false
@@ -241,7 +241,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.DOWN
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
every { host.canFit(any()) } returns false
@@ -272,7 +272,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
every { host.canFit(any()) } returns false
@@ -303,7 +303,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
@@ -326,7 +326,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
@@ -369,7 +369,7 @@ internal class ComputeServiceTest {
val listeners = mutableListOf<HostListener>()
every { host.uid } returns UUID.randomUUID()
- every { host.model } returns HostModel(4, 2048)
+ every { host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.state } returns HostState.UP
every { host.canFit(any()) } returns true
every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) }
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
index cafd4498..3f2ce43b 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt
@@ -33,10 +33,7 @@ import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.HostModel
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.internal.HostView
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.InstanceCountFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.filters.*
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
@@ -183,12 +180,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.availableMemory } returns 512
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.availableMemory } returns 2048
scheduler.addHost(hostA)
@@ -210,7 +207,7 @@ internal class FilterSchedulerTest {
val host = mockk<HostView>()
every { host.host.state } returns HostState.UP
- every { host.host.model } returns HostModel(4, 2048)
+ every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.availableMemory } returns 2048
scheduler.addHost(host)
@@ -231,12 +228,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.provisionedCores } returns 3
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.provisionedCores } returns 0
scheduler.addHost(hostA)
@@ -258,7 +255,7 @@ internal class FilterSchedulerTest {
val host = mockk<HostView>()
every { host.host.state } returns HostState.UP
- every { host.host.model } returns HostModel(4, 2048)
+ every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { host.provisionedCores } returns 0
scheduler.addHost(host)
@@ -271,6 +268,34 @@ internal class FilterSchedulerTest {
}
@Test
+ fun testVCpuCapacityFilter() {
+ val scheduler = FilterScheduler(
+ filters = listOf(VCpuCapacityFilter()),
+ weighers = emptyList(),
+ )
+
+ val hostA = mockk<HostView>()
+ every { hostA.host.state } returns HostState.UP
+ every { hostA.host.model } returns HostModel(8 * 2600.0, 8, 2048)
+ every { hostA.availableMemory } returns 512
+ scheduler.addHost(hostA)
+
+ val hostB = mockk<HostView>()
+ every { hostB.host.state } returns HostState.UP
+ every { hostB.host.model } returns HostModel(4 * 3200.0, 4, 2048)
+ every { hostB.availableMemory } returns 512
+
+ scheduler.addHost(hostB)
+
+ val server = mockk<Server>()
+ every { server.flavor.cpuCount } returns 2
+ every { server.flavor.memorySize } returns 1024
+ every { server.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0)
+
+ assertEquals(hostB, scheduler.select(server))
+ }
+
+ @Test
fun testInstanceCountFilter() {
val scheduler = FilterScheduler(
filters = listOf(InstanceCountFilter(limit = 2)),
@@ -279,12 +304,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.instanceCount } returns 2
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.instanceCount } returns 0
scheduler.addHost(hostA)
@@ -306,12 +331,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.availableMemory } returns 1024
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.availableMemory } returns 512
scheduler.addHost(hostA)
@@ -333,12 +358,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(12, 2048)
+ every { hostA.host.model } returns HostModel(12 * 2600.0, 12, 2048)
every { hostA.availableMemory } returns 1024
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.availableMemory } returns 512
scheduler.addHost(hostA)
@@ -360,12 +385,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.provisionedCores } returns 2
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.provisionedCores } returns 0
scheduler.addHost(hostA)
@@ -387,12 +412,12 @@ internal class FilterSchedulerTest {
val hostA = mockk<HostView>()
every { hostA.host.state } returns HostState.UP
- every { hostA.host.model } returns HostModel(4, 2048)
+ every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostA.instanceCount } returns 2
val hostB = mockk<HostView>()
every { hostB.host.state } returns HostState.UP
- every { hostB.host.model } returns HostModel(4, 2048)
+ every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048)
every { hostB.instanceCount } returns 0
scheduler.addHost(hostA)
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index b9d02185..908a58e9 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -121,7 +122,7 @@ public class SimHost(
field = value
}
- override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size })
+ override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size })
/**
* The [GuestListener] that listens for guest events.
@@ -136,11 +137,6 @@ public class SimHost(
}
}
- /**
- * The [Job] that represents the machine running the hypervisor.
- */
- private var _job: Job? = null
-
init {
launch()
@@ -188,7 +184,7 @@ public class SimHost(
}
override fun canFit(server: Server): Boolean {
- val sufficientMemory = model.memorySize >= server.flavor.memorySize
+ val sufficientMemory = model.memoryCapacity >= server.flavor.memorySize
val enoughCpus = model.cpuCount >= server.flavor.cpuCount
val canFit = hypervisor.canFit(server.flavor.toMachineModel())
@@ -199,11 +195,12 @@ public class SimHost(
val guest = guests.computeIfAbsent(server) { key ->
require(canFit(key)) { "Server does not fit" }
- val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name)
+ val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name)
val newGuest = Guest(
scope.coroutineContext,
clock,
this,
+ hypervisor,
mapper,
guestListener,
server,
@@ -249,7 +246,7 @@ public class SimHost(
override fun close() {
reset()
scope.cancel()
- machine.close()
+ machine.cancel()
}
override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]"
@@ -276,26 +273,39 @@ public class SimHost(
}
/**
+ * The [Job] that represents the machine running the hypervisor.
+ */
+ private var _ctx: SimMachineContext? = null
+
+ /**
* Launch the hypervisor.
*/
private fun launch() {
- check(_job == null) { "Concurrent hypervisor running" }
+ check(_ctx == null) { "Concurrent hypervisor running" }
// Launch hypervisor onto machine
- _job = scope.launch {
- try {
- _bootTime = clock.millis()
- _state = HostState.UP
- machine.run(hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
+ _ctx = machine.startWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ hypervisor.onStart(ctx)
+ } catch (cause: Throwable) {
+ _state = HostState.DOWN
+ _ctx = null
+ throw cause
+ }
}
- }
+
+ override fun onStop(ctx: SimMachineContext) {
+ try {
+ hypervisor.onStop(ctx)
+ } finally {
+ _state = HostState.DOWN
+ _ctx = null
+ }
+ }
+ })
}
/**
@@ -305,12 +315,7 @@ public class SimHost(
updateUptime()
// Stop the hypervisor
- val job = _job
- if (job != null) {
- job.cancel()
- _job = null
- }
-
+ _ctx?.close()
_state = HostState.DOWN
}
@@ -319,8 +324,9 @@ public class SimHost(
*/
private fun Flavor.toMachineModel(): MachineModel {
val originalCpu = machine.model.cpus[0]
+ val cpuCapacity = (this.meta["cpu-capacity"] as? Double ?: Double.MAX_VALUE).coerceAtMost(originalCpu.frequency)
val processingNode = originalCpu.node.copy(coreCount = cpuCount)
- val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) }
val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
return MachineModel(processingUnits, memoryUnits).optimize()
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 5ea1860d..9f3122db 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -34,7 +34,9 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
+import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimVirtualMachine
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -46,6 +48,7 @@ internal class Guest(
context: CoroutineContext,
private val clock: Clock,
val host: SimHost,
+ private val hypervisor: SimHypervisor,
private val mapper: SimWorkloadMapper,
private val listener: GuestListener,
val server: Server,
@@ -114,8 +117,7 @@ internal class Guest(
stop()
state = ServerState.DELETED
-
- machine.close()
+ hypervisor.removeMachine(machine)
scope.cancel()
}
@@ -191,7 +193,7 @@ internal class Guest(
*/
private suspend fun runMachine(workload: SimWorkload) {
delay(1) // TODO Introduce model for boot time
- machine.run(workload, mapOf("driver" to host, "server" to server))
+ machine.runWorkload(workload, mapOf("driver" to host, "server" to server))
}
/**
@@ -248,7 +250,7 @@ internal class Guest(
*/
fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result.observe(_bootTime)
+ result.observe(_bootTime, attributes)
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index a0ff9228..799a8cf0 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.HOST_ID
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Duration
@@ -140,10 +140,10 @@ internal class SimHostTest {
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
object : ComputeMetricExporter() {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- stealTime += data.cpuStealTime
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ stealTime += reader.cpuStealTime
}
},
exportInterval = Duration.ofSeconds(duration)
@@ -236,16 +236,16 @@ internal class SimHostTest {
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
object : ComputeMetricExporter() {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- uptime += data.uptime
- downtime += data.downtime
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ uptime += reader.uptime
+ downtime += reader.downtime
}
- override fun record(data: ServerData) {
- guestUptime += data.uptime
- guestDowntime += data.downtime
+ override fun record(reader: ServerTableReader) {
+ guestUptime += reader.uptime
+ guestDowntime += reader.downtime
}
},
exportInterval = Duration.ofSeconds(duration)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 1a6624f7..f23becda 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -92,7 +92,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val idCol = reader.resolve(RESOURCE_ID)
val startTimeCol = reader.resolve(RESOURCE_START_TIME)
val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
- val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
+ val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY)
val memCol = reader.resolve(RESOURCE_MEM_CAPACITY)
var counter = 0
@@ -108,8 +109,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val submissionTime = reader.get(startTimeCol) as Instant
val endTime = reader.get(stopTimeCol) as Instant
- val maxCores = reader.getInt(coresCol)
- val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
+ val cpuCount = reader.getInt(cpuCountCol)
+ val cpuCapacity = reader.getDouble(cpuCapacityCol)
+ val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
val builder = fragments.getValue(id)
@@ -119,8 +121,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
VirtualMachine(
uid,
id,
- maxCores,
- requiredMemory.roundToLong(),
+ cpuCount,
+ cpuCapacity,
+ memCapacity.roundToLong(),
totalLoad,
submissionTime,
endTime,
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index 283f82fe..90ee56cb 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -128,7 +128,8 @@ public class ComputeWorkloadRunner(
client.newFlavor(
entry.name,
entry.cpuCount,
- entry.memCapacity
+ entry.memCapacity,
+ meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
),
meta = mapOf("workload" to workload)
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
index 5dd239f6..88e80719 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
@@ -31,8 +31,9 @@ import java.util.*
*
* @param uid The unique identifier of the virtual machine.
* @param name The name of the virtual machine.
+ * @param cpuCapacity The required CPU capacity for the VM in MHz.
* @param cpuCount The number of vCPUs in the VM.
- * @param memCapacity The provisioned memory for the VM.
+ * @param memCapacity The provisioned memory for the VM in MB.
* @param startTime The start time of the VM.
* @param stopTime The stop time of the VM.
* @param trace The trace that belong to this VM.
@@ -41,6 +42,7 @@ public data class VirtualMachine(
val uid: UUID,
val name: String,
val cpuCount: Int,
+ val cpuCapacity: Double,
val memCapacity: Long,
val totalLoad: Double,
val startTime: Instant,
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
index ad182d67..a46885f4 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
@@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet
import io.opentelemetry.sdk.common.CompletableResultCode
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
import java.io.File
/**
@@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS
bufferSize
)
- override fun record(data: ServerData) {
- serverWriter.write(data)
+ override fun record(reader: ServerTableReader) {
+ serverWriter.write(reader)
}
- override fun record(data: HostData) {
- hostWriter.write(data)
+ override fun record(reader: HostTableReader) {
+ hostWriter.write(reader)
}
- override fun record(data: ServiceData) {
- serviceWriter.write(data)
+ override fun record(reader: ServiceTableReader) {
+ serviceWriter.write(reader)
}
override fun shutdown(): CompletableResultCode {
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 4172d729..84387bbc 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>(
private val logger = KotlinLogging.logger {}
/**
- * The queue of commands to process.
+ * The queue of records to process.
*/
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>(
}
val queue = queue
- val buf = mutableListOf<T>()
+ val buf = mutableListOf<GenericData.Record>()
var shouldStop = false
try {
while (!shouldStop) {
try {
- process(writer, queue.take())
+ writer.write(queue.take())
} catch (e: InterruptedException) {
shouldStop = true
}
if (queue.drainTo(buf) > 0) {
for (data in buf) {
- process(writer, data)
+ writer.write(data)
}
buf.clear()
}
@@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- queue.put(data)
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ queue.put(builder.build())
}
/**
@@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>(
init {
writerThread.start()
}
-
- /**
- * Process the specified [data] to be written to the Parquet file.
- */
- private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- writer.write(builder.build())
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 98a0739e..2b7cac8f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [HostData]s.
+ * A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["host_id"] = data.host.id
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 0d11ec23..144b6624 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [ServerData]s.
+ * A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["server_id"] = data.server.id
@@ -55,9 +55,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- builder["boot_time"] = bootTime?.toEpochMilli()
- builder["scheduling_latency"] = data.schedulingLatency
+ builder["boot_time"] = data.bootTime?.toEpochMilli()
+ builder["provision_time"] = data.provisionTime?.toEpochMilli()
builder["cpu_count"] = data.server.cpuCount
builder["cpu_limit"] = data.cpuLimit
@@ -81,8 +80,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.name("host_id").type(UUID_SCHEMA.optional()).noDefault()
.requiredLong("uptime")
.requiredLong("downtime")
+ .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
.name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredLong("scheduling_latency")
.requiredInt("cpu_count")
.requiredDouble("cpu_limit")
.requiredLong("cpu_time_active")
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index 47824b29..ec8a2b65 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
- * A Parquet event writer for [ServiceData]s.
+ * A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
- override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["hosts_up"] = data.hostsUp
builder["hosts_down"] = data.hostsDown
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
deleted file mode 100644
index 67f9626c..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.util
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
-import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
-import java.io.File
-import java.io.InputStream
-
-/**
- * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
- */
-public class PerformanceInterferenceReader {
- /**
- * The [ObjectMapper] to use.
- */
- private val mapper = jacksonObjectMapper()
-
- init {
- mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java)
- }
-
- /**
- * Read the performance interface model from [file].
- */
- public fun read(file: File): List<VmInterferenceGroup> {
- return mapper.readValue(file)
- }
-
- /**
- * Read the performance interface model from the input.
- */
- public fun read(input: InputStream): List<VmInterferenceGroup> {
- return mapper.readValue(input)
- }
-
- private data class GroupMixin(
- @JsonProperty("minServerLoad")
- val targetLoad: Double,
- @JsonProperty("performanceScore")
- val score: Double,
- @JsonProperty("vms")
- val members: Set<String>,
- )
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
new file mode 100644
index 00000000..e0fa8904
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.util
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import java.io.File
+import java.io.InputStream
+
+/**
+ * A parser for the JSON performance interference setup files used for the TPDS article on Capelin.
+ */
+public class VmInterferenceModelReader {
+ /**
+ * The [ObjectMapper] to use.
+ */
+ private val mapper = jacksonObjectMapper()
+
+ /**
+ * Read the performance interface model from [file].
+ */
+ public fun read(file: File): VmInterferenceModel {
+ val builder = VmInterferenceModel.builder()
+ val parser = mapper.createParser(file)
+ parseGroups(parser, builder)
+ return builder.build()
+ }
+
+ /**
+ * Read the performance interface model from the input.
+ */
+ public fun read(input: InputStream): VmInterferenceModel {
+ val builder = VmInterferenceModel.builder()
+ val parser = mapper.createParser(input)
+ parseGroups(parser, builder)
+ return builder.build()
+ }
+
+ /**
+ * Parse all groups in an interference JSON file.
+ */
+ private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) {
+ parser.nextToken()
+
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextToken() != JsonToken.END_ARRAY) {
+ parseGroup(parser, builder)
+ }
+ }
+
+ /**
+ * Parse a group an interference JSON file.
+ */
+ private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) {
+ var targetLoad = Double.POSITIVE_INFINITY
+ var score = 1.0
+ val members = mutableSetOf<String>()
+
+ if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "vms" -> parseGroupMembers(parser, members)
+ "minServerLoad" -> targetLoad = parser.doubleValue
+ "performanceScore" -> score = parser.doubleValue
+ }
+ }
+
+ builder.addGroup(members, targetLoad, score)
+ }
+
+ /**
+ * Parse the members of a group.
+ */
+ private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
+ }
+
+ while (parser.nextValue() != JsonToken.END_ARRAY) {
+ if (parser.currentToken() != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected string value for group member")
+ }
+
+ val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt")
+ members.add(member)
+ }
+ }
+
+ private data class Group(
+ @JsonProperty("minServerLoad")
+ val targetLoad: Double,
+ @JsonProperty("performanceScore")
+ val score: Double,
+ @JsonProperty("vms")
+ val members: Set<String>,
+ )
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
index c79f0584..1c3e7149 100644
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt
@@ -22,24 +22,16 @@
package org.opendc.compute.workload.util
-import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertDoesNotThrow
/**
- * Test suite for the [PerformanceInterferenceReader] class.
+ * Test suite for the [VmInterferenceModelReader] class.
*/
-class PerformanceInterferenceReaderTest {
+class VmInterferenceModelReaderTest {
@Test
fun testSmoke() {
- val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json"))
- val result = PerformanceInterferenceReader().read(input)
-
- assertAll(
- { assertEquals(2, result.size) },
- { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) },
- { assertEquals(0.0, result[0].targetLoad, 0.001) },
- { assertEquals(0.8830158730158756, result[0].score, 0.001) }
- )
+ val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json"))
+ assertDoesNotThrow { VmInterferenceModelReader().read(input) }
}
}