diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 20:57:51 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-10-25 20:57:51 +0200 |
| commit | a8e2d460a3b6803845687585ae0b34e67a9445a3 (patch) | |
| tree | 6249023f8f0d56392400c7ebb72238ee848f740a /opendc-compute | |
| parent | b4bf7268cbb6d22d3966f469a6b7721b04d91907 (diff) | |
| parent | 86c65e875b7dde8872dc81a37aa9dca72eee7782 (diff) | |
merge: Improve the OpenDC compute model (#37)
This pull request contains various improvements to the OpenDC compute simulation model.
- Support filtering hosts based on CPU capacity
- Do not allocate lambda in fast-path
- Redesign VM interference algorithm
- Report provisioning time of virtual machines
- Prevent allocations during collection cycle
- Use correct flow input capacity for counters
- Support running workloads without coroutines
**Breaking API Changes**
- `VirtualMachine` now requires `cpuCapacity` parameter.
- `VmInterferenceModel` needs to be constructed using `VmInterferenceModel.Builder` and can't be passed a list of groups anymore.
- Scheduling latency is not collected anymore. Instead, use the boot time and provisioning time to derive the scheduling latency.
- Telemetry data is recorded using `*TableReader` interfaces as opposed to the `*Data` classes. These classes are re-used per row and should not be shared with other threads, since the underlying data may change.
- `SimMachine` does not implement `AutoCloseable` anymore. Machines can be removed from a `SimHypervisor` using the `removeMachine` method.
- `SimMachine.run` is moved to an extension method called `runWorkload`. Users can now also choose to run a workload using the asynchronous `SimMachine.startWorkload`.
Diffstat (limited to 'opendc-compute')
23 files changed, 403 insertions, 212 deletions
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt index fc092a3f..f3b94e3d 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostModel.kt @@ -25,7 +25,12 @@ package org.opendc.compute.service.driver /** * Describes the static machine properties of the host. * + * @property cpuCapacity The total CPU capacity of the host in MHz. * @property cpuCount The number of logical processing cores available for this host. - * @property memorySize The amount of memory available for this host in MB. + * @property memoryCapacity The amount of memory available for this host in MB. */ -public data class HostModel(public val cpuCount: Int, public val memorySize: Long) +public data class HostModel( + public val cpuCapacity: Double, + public val cpuCount: Int, + public val memoryCapacity: Long +) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 57e70fcd..292feabe 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -26,6 +26,7 @@ import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.* @@ -173,6 +174,12 @@ internal class ComputeServiceImpl( result.observe(available, upState) result.observe(total - available, downState) } + + meter.gaugeBuilder("system.time.provision") + .setDescription("The most recent timestamp where the server entered a provisioned state") + .setUnit("1") + .ofLongs() + .buildWithCallback(::collectProvisionTime) } override fun newClient(): ComputeClient { @@ -298,7 +305,7 @@ internal class ComputeServiceImpl( val hv = HostView(host) maxCores = max(maxCores, host.model.cpuCount) - maxMemory = max(maxMemory, host.model.memorySize) + maxMemory = max(maxMemory, host.model.memoryCapacity) hostToView[host] = hv if (host.state == HostState.UP) { @@ -324,8 +331,10 @@ internal class ComputeServiceImpl( internal fun schedule(server: InternalServer): SchedulingRequest { logger.debug { "Enqueueing server ${server.uid} to be assigned to host." } + val now = clock.millis() + val request = SchedulingRequest(server, now) - val request = SchedulingRequest(server, clock.millis()) + server.lastProvisioningTimestamp = now queue.add(request) _serversPending.add(1) requestSchedulingCycle() @@ -501,4 +510,13 @@ internal class ComputeServiceImpl( requestSchedulingCycle() } } + + /** + * Collect the timestamp when each server entered its provisioning state most recently. + */ + private fun collectProvisionTime(result: ObservableLongMeasurement) { + for ((_, server) in servers) { + result.observe(server.lastProvisioningTimestamp, server.attributes) + } + } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt index e2f33f11..0876209a 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/HostView.kt @@ -37,7 +37,7 @@ public class HostView(public val host: Host) { get() = host.uid public var instanceCount: Int = 0 - public var availableMemory: Long = host.model.memorySize + public var availableMemory: Long = host.model.memoryCapacity public var provisionedCores: Int = 0 override fun toString(): String = "HostView[host=$host]" diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index 05a7e1bf..f1b92c66 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -55,7 +55,7 @@ internal class InternalServer( /** * The attributes of a server. */ - internal val attributes: Attributes = Attributes.builder() + @JvmField internal val attributes: Attributes = Attributes.builder() .put(ResourceAttributes.HOST_NAME, name) .put(ResourceAttributes.HOST_ID, uid.toString()) .put(ResourceAttributes.HOST_TYPE, flavor.name) @@ -70,7 +70,12 @@ internal class InternalServer( /** * The [Host] that has been assigned to host the server. */ - internal var host: Host? = null + @JvmField internal var host: Host? = null + + /** + * The most recent timestamp when the server entered a provisioning state. + */ + @JvmField internal var lastProvisioningTimestamp: Long = Long.MIN_VALUE /** * The current scheduling request. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt index a470a453..8a7a646c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/RamFilter.kt @@ -34,7 +34,7 @@ public class RamFilter(private val allocationRatio: Double) : HostFilter { override fun test(host: HostView, server: Server): Boolean { val requested = server.flavor.memorySize val available = host.availableMemory - val total = host.host.model.memorySize + val total = host.host.model.memoryCapacity // Do not allow an instance to overcommit against itself, only against // other instances. diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt new file mode 100644 index 00000000..791710c8 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/filters/VCpuCapacityFilter.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.filters + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostFilter] that filters hosts based on the vCPU speed requirements of a [Server] and the available + * capacity on the host. + */ +public class VCpuCapacityFilter : HostFilter { + override fun test(host: HostView, server: Server): Boolean { + val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double + val hostModel = host.host.model + val availableCapacity = hostModel.cpuCapacity / hostModel.cpuCount + + return requiredCapacity == null || availableCapacity >= (requiredCapacity / server.flavor.cpuCount) + } +} diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt new file mode 100644 index 00000000..a86226e2 --- /dev/null +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/scheduler/weights/VCpuCapacityWeigher.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.service.scheduler.weights + +import org.opendc.compute.api.Server +import org.opendc.compute.service.internal.HostView + +/** + * A [HostWeigher] that weighs the hosts based on the difference required vCPU capacity and the available CPU capacity. + */ +public class VCpuCapacityWeigher(override val multiplier: Double = 1.0) : HostWeigher { + + override fun getWeight(host: HostView, server: Server): Double { + val model = host.host.model + val requiredCapacity = server.flavor.meta["cpu-capacity"] as? Double ?: 0.0 + return model.cpuCapacity / model.cpuCount - requiredCapacity / server.flavor.cpuCount + } + + override fun toString(): String = "VCpuWeigher" +} diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index 564f9493..7b8d0fe2 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -125,7 +125,7 @@ internal class ComputeServiceTest { fun testAddHost() = scope.runBlockingSimulation { val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP assertEquals(0, service.hostCount) @@ -147,7 +147,7 @@ internal class ComputeServiceTest { fun testAddHostDouble() = scope.runBlockingSimulation { val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN assertEquals(0, service.hostCount) @@ -216,7 +216,7 @@ internal class ComputeServiceTest { fun testServerCannotFitOnHost() = scope.runBlockingSimulation { val host = mockk<Host>(relaxUnitFun = true) - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns false @@ -241,7 +241,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.DOWN every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } every { host.canFit(any()) } returns false @@ -272,7 +272,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } every { host.canFit(any()) } returns false @@ -303,7 +303,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } @@ -326,7 +326,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } @@ -369,7 +369,7 @@ internal class ComputeServiceTest { val listeners = mutableListOf<HostListener>() every { host.uid } returns UUID.randomUUID() - every { host.model } returns HostModel(4, 2048) + every { host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.state } returns HostState.UP every { host.canFit(any()) } returns true every { host.addListener(any()) } answers { listeners.add(it.invocation.args[0] as HostListener) } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt index cafd4498..3f2ce43b 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/scheduler/FilterSchedulerTest.kt @@ -33,10 +33,7 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.driver.HostModel import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.internal.HostView -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.filters.InstanceCountFilter -import org.opendc.compute.service.scheduler.filters.RamFilter -import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.filters.* import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher @@ -183,12 +180,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.availableMemory } returns 512 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 2048 scheduler.addHost(hostA) @@ -210,7 +207,7 @@ internal class FilterSchedulerTest { val host = mockk<HostView>() every { host.host.state } returns HostState.UP - every { host.host.model } returns HostModel(4, 2048) + every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.availableMemory } returns 2048 scheduler.addHost(host) @@ -231,12 +228,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.provisionedCores } returns 3 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) @@ -258,7 +255,7 @@ internal class FilterSchedulerTest { val host = mockk<HostView>() every { host.host.state } returns HostState.UP - every { host.host.model } returns HostModel(4, 2048) + every { host.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { host.provisionedCores } returns 0 scheduler.addHost(host) @@ -271,6 +268,34 @@ internal class FilterSchedulerTest { } @Test + fun testVCpuCapacityFilter() { + val scheduler = FilterScheduler( + filters = listOf(VCpuCapacityFilter()), + weighers = emptyList(), + ) + + val hostA = mockk<HostView>() + every { hostA.host.state } returns HostState.UP + every { hostA.host.model } returns HostModel(8 * 2600.0, 8, 2048) + every { hostA.availableMemory } returns 512 + scheduler.addHost(hostA) + + val hostB = mockk<HostView>() + every { hostB.host.state } returns HostState.UP + every { hostB.host.model } returns HostModel(4 * 3200.0, 4, 2048) + every { hostB.availableMemory } returns 512 + + scheduler.addHost(hostB) + + val server = mockk<Server>() + every { server.flavor.cpuCount } returns 2 + every { server.flavor.memorySize } returns 1024 + every { server.flavor.meta } returns mapOf("cpu-capacity" to 2 * 3200.0) + + assertEquals(hostB, scheduler.select(server)) + } + + @Test fun testInstanceCountFilter() { val scheduler = FilterScheduler( filters = listOf(InstanceCountFilter(limit = 2)), @@ -279,12 +304,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.instanceCount } returns 2 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.instanceCount } returns 0 scheduler.addHost(hostA) @@ -306,12 +331,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.availableMemory } returns 1024 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 512 scheduler.addHost(hostA) @@ -333,12 +358,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(12, 2048) + every { hostA.host.model } returns HostModel(12 * 2600.0, 12, 2048) every { hostA.availableMemory } returns 1024 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.availableMemory } returns 512 scheduler.addHost(hostA) @@ -360,12 +385,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.provisionedCores } returns 2 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.provisionedCores } returns 0 scheduler.addHost(hostA) @@ -387,12 +412,12 @@ internal class FilterSchedulerTest { val hostA = mockk<HostView>() every { hostA.host.state } returns HostState.UP - every { hostA.host.model } returns HostModel(4, 2048) + every { hostA.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostA.instanceCount } returns 2 val hostB = mockk<HostView>() every { hostB.host.state } returns HostState.UP - every { hostB.host.model } returns HostModel(4, 2048) + every { hostB.host.model } returns HostModel(4 * 2600.0, 4, 2048) every { hostB.instanceCount } returns 0 scheduler.addHost(hostA) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index b9d02185..908a58e9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -47,6 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext @@ -121,7 +122,7 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) + override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size }) /** * The [GuestListener] that listens for guest events. @@ -136,11 +137,6 @@ public class SimHost( } } - /** - * The [Job] that represents the machine running the hypervisor. - */ - private var _job: Job? = null - init { launch() @@ -188,7 +184,7 @@ public class SimHost( } override fun canFit(server: Server): Boolean { - val sufficientMemory = model.memorySize >= server.flavor.memorySize + val sufficientMemory = model.memoryCapacity >= server.flavor.memorySize val enoughCpus = model.cpuCount >= server.flavor.cpuCount val canFit = hypervisor.canFit(server.flavor.toMachineModel()) @@ -199,11 +195,12 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val machine = hypervisor.createMachine(key.flavor.toMachineModel(), key.name) + val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name) val newGuest = Guest( scope.coroutineContext, clock, this, + hypervisor, mapper, guestListener, server, @@ -249,7 +246,7 @@ public class SimHost( override fun close() { reset() scope.cancel() - machine.close() + machine.cancel() } override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" @@ -276,26 +273,39 @@ public class SimHost( } /** + * The [Job] that represents the machine running the hypervisor. + */ + private var _ctx: SimMachineContext? = null + + /** * Launch the hypervisor. */ private fun launch() { - check(_job == null) { "Concurrent hypervisor running" } + check(_ctx == null) { "Concurrent hypervisor running" } // Launch hypervisor onto machine - _job = scope.launch { - try { - _bootTime = clock.millis() - _state = HostState.UP - machine.run(hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN + _ctx = machine.startWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + _bootTime = clock.millis() + _state = HostState.UP + hypervisor.onStart(ctx) + } catch (cause: Throwable) { + _state = HostState.DOWN + _ctx = null + throw cause + } } - } + + override fun onStop(ctx: SimMachineContext) { + try { + hypervisor.onStop(ctx) + } finally { + _state = HostState.DOWN + _ctx = null + } + } + }) } /** @@ -305,12 +315,7 @@ public class SimHost( updateUptime() // Stop the hypervisor - val job = _job - if (job != null) { - job.cancel() - _job = null - } - + _ctx?.close() _state = HostState.DOWN } @@ -319,8 +324,9 @@ public class SimHost( */ private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] + val cpuCapacity = (this.meta["cpu-capacity"] as? Double ?: Double.MAX_VALUE).coerceAtMost(originalCpu.frequency) val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) return MachineModel(processingUnits, memoryUnits).optimize() diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 5ea1860d..9f3122db 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -34,7 +34,9 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.kernel.SimVirtualMachine +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -46,6 +48,7 @@ internal class Guest( context: CoroutineContext, private val clock: Clock, val host: SimHost, + private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper, private val listener: GuestListener, val server: Server, @@ -114,8 +117,7 @@ internal class Guest( stop() state = ServerState.DELETED - - machine.close() + hypervisor.removeMachine(machine) scope.cancel() } @@ -191,7 +193,7 @@ internal class Guest( */ private suspend fun runMachine(workload: SimWorkload) { delay(1) // TODO Introduce model for boot time - machine.run(workload, mapOf("driver" to host, "server" to server)) + machine.runWorkload(workload, mapOf("driver" to host, "server" to server)) } /** @@ -248,7 +250,7 @@ internal class Guest( */ fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result.observe(_bootTime) + result.observe(_bootTime, attributes) } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a0ff9228..799a8cf0 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration @@ -140,10 +140,10 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - stealTime += data.cpuStealTime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + stealTime += reader.cpuStealTime } }, exportInterval = Duration.ofSeconds(duration) @@ -236,16 +236,16 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - uptime += data.uptime - downtime += data.downtime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + uptime += reader.uptime + downtime += reader.downtime } - override fun record(data: ServerData) { - guestUptime += data.uptime - guestDowntime += data.downtime + override fun record(reader: ServerTableReader) { + guestUptime += reader.uptime + guestDowntime += reader.downtime } }, exportInterval = Duration.ofSeconds(duration) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 1a6624f7..f23becda 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -92,7 +92,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val idCol = reader.resolve(RESOURCE_ID) val startTimeCol = reader.resolve(RESOURCE_START_TIME) val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val coresCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) + val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) val memCol = reader.resolve(RESOURCE_MEM_CAPACITY) var counter = 0 @@ -108,8 +109,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val submissionTime = reader.get(startTimeCol) as Instant val endTime = reader.get(stopTimeCol) as Instant - val maxCores = reader.getInt(coresCol) - val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB + val cpuCount = reader.getInt(cpuCountCol) + val cpuCapacity = reader.getDouble(cpuCapacityCol) + val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val builder = fragments.getValue(id) @@ -119,8 +121,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { VirtualMachine( uid, id, - maxCores, - requiredMemory.roundToLong(), + cpuCount, + cpuCapacity, + memCapacity.roundToLong(), totalLoad, submissionTime, endTime, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt index 283f82fe..90ee56cb 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -128,7 +128,8 @@ public class ComputeWorkloadRunner( client.newFlavor( entry.name, entry.cpuCount, - entry.memCapacity + entry.memCapacity, + meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() ), meta = mapOf("workload" to workload) ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 5dd239f6..88e80719 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -31,8 +31,9 @@ import java.util.* * * @param uid The unique identifier of the virtual machine. * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. * @param cpuCount The number of vCPUs in the VM. - * @param memCapacity The provisioned memory for the VM. + * @param memCapacity The provisioned memory for the VM in MB. * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. * @param trace The trace that belong to this VM. @@ -41,6 +42,7 @@ public data class VirtualMachine( val uid: UUID, val name: String, val cpuCount: Int, + val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, val startTime: Instant, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt index ad182d67..a46885f4 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import io.opentelemetry.sdk.common.CompletableResultCode import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import java.io.File /** @@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS bufferSize ) - override fun record(data: ServerData) { - serverWriter.write(data) + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) } - override fun record(data: HostData) { - hostWriter.write(data) + override fun record(reader: HostTableReader) { + hostWriter.write(reader) } - override fun record(data: ServiceData) { - serviceWriter.write(data) + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) } override fun shutdown(): CompletableResultCode { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 4172d729..84387bbc 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>( private val logger = KotlinLogging.logger {} /** - * The queue of commands to process. + * The queue of records to process. */ - private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>( } val queue = queue - val buf = mutableListOf<T>() + val buf = mutableListOf<GenericData.Record>() var shouldStop = false try { while (!shouldStop) { try { - process(writer, queue.take()) + writer.write(queue.take()) } catch (e: InterruptedException) { shouldStop = true } if (queue.drainTo(buf) > 0) { for (data in buf) { - process(writer, data) + writer.write(data) } buf.clear() } @@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>( throw IllegalStateException("Writer thread failed", exception) } - queue.put(data) + val builder = GenericRecordBuilder(schema) + convert(builder, data) + queue.put(builder.build()) } /** @@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>( init { writerThread.start() } - - /** - * Process the specified [data] to be written to the Parquet file. - */ - private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { - val builder = GenericRecordBuilder(schema) - convert(builder, data) - writer.write(builder.build()) - } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 98a0739e..2b7cac8f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [HostData]s. + * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { return builder @@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: HostData) { + override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 0d11ec23..144b6624 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [ServerData]s. + * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { return builder @@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerData) { + override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["server_id"] = data.server.id @@ -55,9 +55,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : builder["uptime"] = data.uptime builder["downtime"] = data.downtime - val bootTime = data.bootTime - builder["boot_time"] = bootTime?.toEpochMilli() - builder["scheduling_latency"] = data.schedulingLatency + builder["boot_time"] = data.bootTime?.toEpochMilli() + builder["provision_time"] = data.provisionTime?.toEpochMilli() builder["cpu_count"] = data.server.cpuCount builder["cpu_limit"] = data.cpuLimit @@ -81,8 +80,8 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .name("host_id").type(UUID_SCHEMA.optional()).noDefault() .requiredLong("uptime") .requiredLong("downtime") + .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredLong("scheduling_latency") .requiredInt("cpu_count") .requiredDouble("cpu_limit") .requiredLong("cpu_time_active") diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 47824b29..ec8a2b65 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.io.File /** - * A Parquet event writer for [ServiceData]s. + * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) { - override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["hosts_up"] = data.hostsUp builder["hosts_down"] = data.hostsDown diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt deleted file mode 100644 index 67f9626c..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReader.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.util - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup -import java.io.File -import java.io.InputStream - -/** - * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. - */ -public class PerformanceInterferenceReader { - /** - * The [ObjectMapper] to use. - */ - private val mapper = jacksonObjectMapper() - - init { - mapper.addMixIn(VmInterferenceGroup::class.java, GroupMixin::class.java) - } - - /** - * Read the performance interface model from [file]. - */ - public fun read(file: File): List<VmInterferenceGroup> { - return mapper.readValue(file) - } - - /** - * Read the performance interface model from the input. - */ - public fun read(input: InputStream): List<VmInterferenceGroup> { - return mapper.readValue(input) - } - - private data class GroupMixin( - @JsonProperty("minServerLoad") - val targetLoad: Double, - @JsonProperty("performanceScore") - val score: Double, - @JsonProperty("vms") - val members: Set<String>, - ) -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt new file mode 100644 index 00000000..e0fa8904 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReader.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.util + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import java.io.File +import java.io.InputStream + +/** + * A parser for the JSON performance interference setup files used for the TPDS article on Capelin. + */ +public class VmInterferenceModelReader { + /** + * The [ObjectMapper] to use. + */ + private val mapper = jacksonObjectMapper() + + /** + * Read the performance interface model from [file]. + */ + public fun read(file: File): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(file) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Read the performance interface model from the input. + */ + public fun read(input: InputStream): VmInterferenceModel { + val builder = VmInterferenceModel.builder() + val parser = mapper.createParser(input) + parseGroups(parser, builder) + return builder.build() + } + + /** + * Parse all groups in an interference JSON file. + */ + private fun parseGroups(parser: JsonParser, builder: VmInterferenceModel.Builder) { + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + + while (parser.nextToken() != JsonToken.END_ARRAY) { + parseGroup(parser, builder) + } + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser, builder: VmInterferenceModel.Builder) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf<String>() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + builder.addGroup(members, targetLoad, score) + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + val member = parser.text.removePrefix("vm__workload__").removeSuffix(".txt") + members.add(member) + } + } + + private data class Group( + @JsonProperty("minServerLoad") + val targetLoad: Double, + @JsonProperty("performanceScore") + val score: Double, + @JsonProperty("vms") + val members: Set<String>, + ) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt index c79f0584..1c3e7149 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/PerformanceInterferenceReaderTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/util/VmInterferenceModelReaderTest.kt @@ -22,24 +22,16 @@ package org.opendc.compute.workload.util -import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertDoesNotThrow /** - * Test suite for the [PerformanceInterferenceReader] class. + * Test suite for the [VmInterferenceModelReader] class. */ -class PerformanceInterferenceReaderTest { +class VmInterferenceModelReaderTest { @Test fun testSmoke() { - val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val result = PerformanceInterferenceReader().read(input) - - assertAll( - { assertEquals(2, result.size) }, - { assertEquals(setOf("vm_a", "vm_c", "vm_x", "vm_y"), result[0].members) }, - { assertEquals(0.0, result[0].targetLoad, 0.001) }, - { assertEquals(0.8830158730158756, result[0].score, 0.001) } - ) + val input = checkNotNull(VmInterferenceModelReader::class.java.getResourceAsStream("/perf-interference.json")) + assertDoesNotThrow { VmInterferenceModelReader().read(input) } } } |
