summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-30 20:57:16 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 20:47:12 +0200
commit448b4cafe3c757812138a8ca7580975191ac2f9c (patch)
treec3d25329908f718f1f7eae1dc944ef532f1849f1 /opendc-compute/opendc-compute-workload
parent07743e75891e8b3ebcefe4771f92af8003ef0b1f (diff)
refactor(exp/compute): Integrate compute workload classes
This change integrates the classes from the old `opendc-compute-workload` module into the `opendc-experiments-compute` module. This new module contains helper classes for setting up experiments with the OpenDC compute service.
Diffstat (limited to 'opendc-compute/opendc-compute-workload')
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts41
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt86
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt126
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt35
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt255
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt62
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt39
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt70
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt117
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt54
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt67
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt132
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt210
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt198
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt128
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt38
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt66
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt143
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt37
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt427
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt47
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt28
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt125
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt37
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt90
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt46
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt70
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt47
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt33
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt36
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt79
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt73
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt67
34 files changed, 0 insertions, 3170 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
deleted file mode 100644
index 7b5fe6c1..00000000
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Support library for simulating VM-based workloads with OpenDC"
-
-/* Build configuration */
-plugins {
- `kotlin-library-conventions`
-}
-
-dependencies {
- api(projects.opendcCompute.opendcComputeSimulator)
-
- implementation(projects.opendcTrace.opendcTraceApi)
- implementation(projects.opendcTrace.opendcTraceParquet)
- implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
-
- implementation(libs.kotlin.logging)
-
- testImplementation(libs.slf4j.simple)
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt
deleted file mode 100644
index c94f30e4..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeSchedulers.kt
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("ComputeSchedulers")
-package org.opendc.compute.workload
-
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.ReplayScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.filters.RamFilter
-import org.opendc.compute.service.scheduler.filters.VCpuFilter
-import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
-import org.opendc.compute.service.scheduler.weights.RamWeigher
-import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import java.util.*
-
-/**
- * Create a [ComputeScheduler] for the experiment.
- */
-public fun createComputeScheduler(name: String, seeder: Random, placements: Map<String, String> = emptyMap()): ComputeScheduler {
- val cpuAllocationRatio = 16.0
- val ramAllocationRatio = 1.5
- return when (name) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = 1.0))
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(RamWeigher(multiplier = -1.0))
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(CoreRamWeigher(multiplier = -1.0))
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
- weighers = emptyList(),
- subsetSize = Int.MAX_VALUE,
- random = Random(seeder.nextLong())
- )
- "replay" -> ReplayScheduler(placements)
- else -> throw IllegalArgumentException("Unknown policy $name")
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
deleted file mode 100644
index e86456fe..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload
-
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.scheduler.ComputeScheduler
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.workload.topology.HostSpec
-import org.opendc.simulator.compute.SimBareMetalMachine
-import org.opendc.simulator.compute.kernel.SimHypervisor
-import org.opendc.simulator.flow.FlowEngine
-import java.time.Clock
-import java.time.Duration
-import java.util.*
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Helper class to simulate VM-based workloads in OpenDC.
- *
- * @param context [CoroutineContext] to run the simulation in.
- * @param clock [Clock] instance tracking simulation time.
- * @param scheduler [ComputeScheduler] implementation to use for the service.
- * @param schedulingQuantum The scheduling quantum of the scheduler.
- */
-public class ComputeServiceHelper(
- private val context: CoroutineContext,
- private val clock: Clock,
- scheduler: ComputeScheduler,
- seed: Long,
- schedulingQuantum: Duration = Duration.ofMinutes(5)
-) : AutoCloseable {
- /**
- * The [ComputeService] that has been configured by the manager.
- */
- public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum)
-
- /**
- * The [FlowEngine] to simulate the hosts.
- */
- private val engine = FlowEngine(context, clock)
-
- /**
- * The hosts that belong to this class.
- */
- private val hosts = mutableSetOf<SimHost>()
-
- /**
- * The source of randomness.
- */
- private val random = SplittableRandom(seed)
-
- /**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
- *
- * @param trace The trace to simulate.
- * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
- * @param failureModel A failure model to use for injecting failures.
- * @param interference A flag to indicate that VM interference needs to be enabled.
- */
- public suspend fun run(
- trace: List<VirtualMachine>,
- submitImmediately: Boolean = false,
- failureModel: FailureModel? = null,
- interference: Boolean = false,
- ) {
- service.replay(clock, trace, random.nextLong(), submitImmediately, failureModel, interference)
- }
-
- /**
- * Register a host for this simulation.
- *
- * @param spec The definition of the host.
- * @param optimize Merge the CPU resources of the host into a single CPU resource.
- * @return The [SimHost] that has been constructed by the runner.
- */
- public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost {
- val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver)
- val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random)
-
- val host = SimHost(
- spec.uid,
- spec.name,
- spec.meta,
- context,
- clock,
- machine,
- hypervisor,
- optimize = optimize
- )
-
- require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" }
- service.addHost(host)
-
- return host
- }
-
- override fun close() {
- service.close()
-
- for (host in hosts) {
- host.close()
- }
-
- hosts.clear()
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
deleted file mode 100644
index 78002c2f..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload
-
-import java.util.*
-
-/**
- * An interface that describes how a workload is resolved.
- */
-public interface ComputeWorkload {
- /**
- * Resolve the workload into a list of [VirtualMachine]s to simulate.
- */
- public fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine>
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
deleted file mode 100644
index 387a3ec2..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload
-
-import mu.KotlinLogging
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.workload.SimTrace
-import org.opendc.trace.*
-import org.opendc.trace.conv.*
-import java.io.File
-import java.lang.ref.SoftReference
-import java.util.*
-import java.util.concurrent.ConcurrentHashMap
-import kotlin.math.max
-import kotlin.math.roundToLong
-
-/**
- * A helper class for loading compute workload traces into memory.
- *
- * @param baseDir The directory containing the traces.
- */
-public class ComputeWorkloadLoader(private val baseDir: File) {
- /**
- * The logger for this instance.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The cache of workloads.
- */
- private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>()
-
- /**
- * Read the fragments into memory.
- */
- private fun parseFragments(trace: Trace): Map<String, Builder> {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
-
- val idCol = reader.resolve(RESOURCE_ID)
- val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
- val durationCol = reader.resolve(RESOURCE_STATE_DURATION)
- val coresCol = reader.resolve(RESOURCE_CPU_COUNT)
- val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE)
-
- val fragments = mutableMapOf<String, Builder>()
-
- return try {
- while (reader.nextRow()) {
- val id = reader.getString(idCol)!!
- val time = reader.getInstant(timestampCol)!!
- val duration = reader.getDuration(durationCol)!!
- val cores = reader.getInt(coresCol)
- val cpuUsage = reader.getDouble(usageCol)
-
- val deadlineMs = time.toEpochMilli()
- val timeMs = (time - duration).toEpochMilli()
- val builder = fragments.computeIfAbsent(id) { Builder() }
- builder.add(timeMs, deadlineMs, cpuUsage, cores)
- }
-
- fragments
- } finally {
- reader.close()
- }
- }
-
- /**
- * Read the metadata into a workload.
- */
- private fun parseMeta(trace: Trace, fragments: Map<String, Builder>, interferenceModel: VmInterferenceModel): List<VirtualMachine> {
- val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
-
- val idCol = reader.resolve(RESOURCE_ID)
- val startTimeCol = reader.resolve(RESOURCE_START_TIME)
- val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
- val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
- val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY)
- val memCol = reader.resolve(RESOURCE_MEM_CAPACITY)
-
- var counter = 0
- val entries = mutableListOf<VirtualMachine>()
-
- return try {
- while (reader.nextRow()) {
-
- val id = reader.getString(idCol)!!
- if (!fragments.containsKey(id)) {
- continue
- }
-
- val submissionTime = reader.getInstant(startTimeCol)!!
- val endTime = reader.getInstant(stopTimeCol)!!
- val cpuCount = reader.getInt(cpuCountCol)
- val cpuCapacity = reader.getDouble(cpuCapacityCol)
- val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
- val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
-
- val builder = fragments.getValue(id)
- val totalLoad = builder.totalLoad
-
- entries.add(
- VirtualMachine(
- uid,
- id,
- cpuCount,
- cpuCapacity,
- memCapacity.roundToLong(),
- totalLoad,
- submissionTime,
- endTime,
- builder.build(),
- interferenceModel.getProfile(id)
- )
- )
- }
-
- // Make sure the virtual machines are ordered by start time
- entries.sortBy { it.startTime }
-
- entries
- } catch (e: Exception) {
- e.printStackTrace()
- throw e
- } finally {
- reader.close()
- }
- }
-
- /**
- * Read the interference model associated with the specified [trace].
- */
- private fun parseInterferenceModel(trace: Trace): VmInterferenceModel {
- val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader()
-
- return try {
- val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS)
- val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET)
- val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE)
-
- val modelBuilder = VmInterferenceModel.builder()
-
- while (reader.nextRow()) {
- val members = reader.getSet(membersCol, String::class.java)!!
- val target = reader.getDouble(targetCol)
- val score = reader.getDouble(scoreCol)
-
- modelBuilder
- .addGroup(members, target, score)
- }
-
- modelBuilder.build()
- } finally {
- reader.close()
- }
- }
-
- /**
- * Load the trace with the specified [name] and [format].
- */
- public fun get(name: String, format: String): List<VirtualMachine> {
- val ref = cache.compute(name) { key, oldVal ->
- val inst = oldVal?.get()
- if (inst == null) {
-
- val path = baseDir.resolve(key)
-
- logger.info { "Loading trace $key at $path" }
-
- val trace = Trace.open(path, format)
- val fragments = parseFragments(trace)
- val interferenceModel = parseInterferenceModel(trace)
- val vms = parseMeta(trace, fragments, interferenceModel)
-
- SoftReference(vms)
- } else {
- oldVal
- }
- }
-
- return checkNotNull(ref?.get()) { "Memory pressure" }
- }
-
- /**
- * Clear the workload cache.
- */
- public fun reset() {
- cache.clear()
- }
-
- /**
- * A builder for a VM trace.
- */
- private class Builder {
- /**
- * The total load of the trace.
- */
- @JvmField var totalLoad: Double = 0.0
-
- /**
- * The internal builder for the trace.
- */
- private val builder = SimTrace.builder()
-
- /**
- * The deadline of the previous fragment.
- */
- private var previousDeadline = Long.MIN_VALUE
-
- /**
- * Add a fragment to the trace.
- *
- * @param timestamp Timestamp at which the fragment starts (in epoch millis).
- * @param deadline Timestamp at which the fragment ends (in epoch millis).
- * @param usage CPU usage of this fragment.
- * @param cores Number of cores used.
- */
- fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
- val duration = max(0, deadline - timestamp)
- totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
-
- if (timestamp != previousDeadline) {
- // There is a gap between the previous and current fragment; fill the gap
- builder.add(timestamp, 0.0, cores)
- }
-
- builder.add(deadline, usage, cores)
- previousDeadline = deadline
- }
-
- /**
- * Build the trace.
- */
- fun build(): SimTrace = builder.build()
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
deleted file mode 100644
index 2f4935ca..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloads.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("ComputeWorkloads")
-package org.opendc.compute.workload
-
-import org.opendc.compute.workload.internal.CompositeComputeWorkload
-import org.opendc.compute.workload.internal.HpcSampledComputeWorkload
-import org.opendc.compute.workload.internal.LoadSampledComputeWorkload
-import org.opendc.compute.workload.internal.TraceComputeWorkload
-
-/**
- * Construct a workload from a trace.
- */
-public fun trace(name: String, format: String = "opendc-vm"): ComputeWorkload = TraceComputeWorkload(name, format)
-
-/**
- * Construct a composite workload with the specified fractions.
- */
-public fun composite(vararg pairs: Pair<ComputeWorkload, Double>): ComputeWorkload {
- return CompositeComputeWorkload(pairs.toMap())
-}
-
-/**
- * Sample a workload by a [fraction] of the total load.
- */
-public fun ComputeWorkload.sampleByLoad(fraction: Double): ComputeWorkload {
- return LoadSampledComputeWorkload(this, fraction)
-}
-
-/**
- * Sample a workload by a [fraction] of the HPC VMs (count)
- */
-public fun ComputeWorkload.sampleByHpc(fraction: Double): ComputeWorkload {
- return HpcSampledComputeWorkload(this, fraction)
-}
-
-/**
- * Sample a workload by a [fraction] of the HPC load
- */
-public fun ComputeWorkload.sampleByHpcLoad(fraction: Double): ComputeWorkload {
- return HpcSampledComputeWorkload(this, fraction, sampleLoad = true)
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
deleted file mode 100644
index 4d9ef15d..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModel.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload
-
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import java.time.Clock
-import java.util.*
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Factory interface for constructing [HostFaultInjector] for modeling failures of compute service hosts.
- */
-public interface FailureModel {
- /**
- * Construct a [HostFaultInjector] for the specified [service].
- */
- public fun createInjector(context: CoroutineContext, clock: Clock, service: ComputeService, random: Random): HostFaultInjector
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
deleted file mode 100644
index be7120b9..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/FailureModels.kt
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("FailureModels")
-package org.opendc.compute.workload
-
-import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.commons.math3.random.Well19937c
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.simulator.SimHost
-import org.opendc.compute.simulator.failure.HostFaultInjector
-import org.opendc.compute.simulator.failure.StartStopHostFault
-import org.opendc.compute.simulator.failure.StochasticVictimSelector
-import java.time.Clock
-import java.time.Duration
-import java.util.*
-import kotlin.coroutines.CoroutineContext
-import kotlin.math.ln
-
-/**
- * Obtain a [FailureModel] based on the GRID'5000 failure trace.
- *
- * This fault injector uses parameters from the GRID'5000 failure trace as described in
- * "A Framework for the Study of Grid Inter-Operation Mechanisms", A. Iosup, 2009.
- */
-public fun grid5000(failureInterval: Duration): FailureModel {
- return object : FailureModel {
- override fun createInjector(
- context: CoroutineContext,
- clock: Clock,
- service: ComputeService,
- random: Random
- ): HostFaultInjector {
- val rng = Well19937c(random.nextLong())
- val hosts = service.hosts.map { it as SimHost }.toSet()
-
- // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
- // GRID'5000
- return HostFaultInjector(
- context,
- clock,
- hosts,
- iat = LogNormalDistribution(rng, ln(failureInterval.toHours().toDouble()), 1.03),
- selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), random),
- fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
- )
- }
-
- override fun toString(): String = "Grid5000FailureModel"
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt
deleted file mode 100644
index dc8713dc..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/TraceHelpers.kt
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("TraceHelpers")
-package org.opendc.compute.workload
-
-import kotlinx.coroutines.*
-import org.opendc.compute.service.ComputeService
-import org.opendc.simulator.compute.workload.SimTraceWorkload
-import java.time.Clock
-import java.util.*
-import kotlin.coroutines.coroutineContext
-import kotlin.math.max
-
-/**
- * Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished.
- *
- * @param clock The simulation clock.
- * @param trace The trace to simulate.
- * @param seed The seed to use for randomness.
- * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time).
- * @param failureModel A failure model to use for injecting failures.
- * @param interference A flag to indicate that VM interference needs to be enabled.
- */
-public suspend fun ComputeService.replay(
- clock: Clock,
- trace: List<VirtualMachine>,
- seed: Long,
- submitImmediately: Boolean = false,
- failureModel: FailureModel? = null,
- interference: Boolean = false
-) {
- val injector = failureModel?.createInjector(coroutineContext, clock, this, Random(seed))
- val client = newClient()
-
- // Create new image for the virtual machine
- val image = client.newImage("vm-image")
-
- try {
- coroutineScope {
- // Start the fault injector
- injector?.start()
-
- var offset = Long.MIN_VALUE
-
- for (entry in trace.sortedBy { it.startTime }) {
- val now = clock.millis()
- val start = entry.startTime.toEpochMilli()
-
- if (offset < 0) {
- offset = start - now
- }
-
- // Make sure the trace entries are ordered by submission time
- assert(start - offset >= 0) { "Invalid trace order" }
-
- if (!submitImmediately) {
- delay(max(0, (start - offset) - now))
- }
-
- val workloadOffset = -offset + 300001
- val workload = SimTraceWorkload(entry.trace, workloadOffset)
- val meta = mutableMapOf<String, Any>("workload" to workload)
-
- val interferenceProfile = entry.interferenceProfile
- if (interference && interferenceProfile != null) {
- meta["interference-profile"] = interferenceProfile
- }
-
- launch {
- val server = client.newServer(
- entry.name,
- image,
- client.newFlavor(
- entry.name,
- entry.cpuCount,
- entry.memCapacity,
- meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap()
- ),
- meta = meta
- )
-
- // Wait for the server reach its end time
- val endTime = entry.stopTime.toEpochMilli()
- delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000)
-
- // Stop the server after reaching the end-time of the virtual machine
- server.stop()
- }
- }
- }
-
- yield()
- } finally {
- injector?.close()
- client.close()
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
deleted file mode 100644
index 8560b537..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload
-
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile
-import org.opendc.simulator.compute.workload.SimTrace
-import java.time.Instant
-import java.util.*
-
-/**
- * A virtual machine workload.
- *
- * @param uid The unique identifier of the virtual machine.
- * @param name The name of the virtual machine.
- * @param cpuCapacity The required CPU capacity for the VM in MHz.
- * @param cpuCount The number of vCPUs in the VM.
- * @param memCapacity The provisioned memory for the VM in MB.
- * @param startTime The start time of the VM.
- * @param stopTime The stop time of the VM.
- * @param trace The trace that belong to this VM.
- * @param interferenceProfile The interference profile of this virtual machine.
- */
-public data class VirtualMachine(
- val uid: UUID,
- val name: String,
- val cpuCount: Int,
- val cpuCapacity: Double,
- val memCapacity: Long,
- val totalLoad: Double,
- val startTime: Instant,
- val stopTime: Instant,
- val trace: SimTrace,
- val interferenceProfile: VmInterferenceProfile?
-)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt
deleted file mode 100644
index af4dad44..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.opendc.compute.workload.telemetry.ComputeMonitor
-import org.opendc.compute.workload.telemetry.table.HostTableReader
-import org.opendc.compute.workload.telemetry.table.ServerTableReader
-import org.opendc.compute.workload.telemetry.table.ServiceTableReader
-import java.io.File
-
-/**
- * A [ComputeMonitor] that logs the events to a Parquet file.
- */
-public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
- private val serverWriter = ParquetServerDataWriter(
- File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- private val hostWriter = ParquetHostDataWriter(
- File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- private val serviceWriter = ParquetServiceDataWriter(
- File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
-
- override fun record(reader: ServerTableReader) {
- serverWriter.write(reader)
- }
-
- override fun record(reader: HostTableReader) {
- hostWriter.write(reader)
- }
-
- override fun record(reader: ServiceTableReader) {
- serviceWriter.write(reader)
- }
-
- override fun close() {
- hostWriter.close()
- serviceWriter.close()
- serverWriter.close()
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
deleted file mode 100644
index c854d874..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import mu.KotlinLogging
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-import java.util.concurrent.ArrayBlockingQueue
-import java.util.concurrent.BlockingQueue
-import kotlin.concurrent.thread
-
-/**
- * A writer that writes data in Parquet format.
- *
- * @param path The path to the file to write the data to.
- * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
- */
-public abstract class ParquetDataWriter<in T>(
- path: File,
- private val writeSupport: WriteSupport<T>,
- bufferSize: Int = 4096
-) : AutoCloseable {
- /**
- * The logging instance to use.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The queue of records to process.
- */
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
-
- /**
- * An exception to be propagated to the actual writer.
- */
- private var exception: Throwable? = null
-
- /**
- * The thread that is responsible for writing the Parquet records.
- */
- private val writerThread = thread(start = false, name = this.toString()) {
- val writer = let {
- val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- buildWriter(builder)
- }
-
- val queue = queue
- val buf = mutableListOf<T>()
- var shouldStop = false
-
- try {
- while (!shouldStop) {
- try {
- writer.write(queue.take())
- } catch (e: InterruptedException) {
- shouldStop = true
- }
-
- if (queue.drainTo(buf) > 0) {
- for (data in buf) {
- writer.write(data)
- }
- buf.clear()
- }
- }
- } catch (e: Throwable) {
- logger.error(e) { "Failure in Parquet data writer" }
- exception = e
- } finally {
- writer.close()
- }
- }
-
- /**
- * Build the [ParquetWriter] used to write the Parquet files.
- */
- protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
- return builder.build()
- }
-
- /**
- * Write the specified metrics to the database.
- */
- public fun write(data: T) {
- val exception = exception
- if (exception != null) {
- throw IllegalStateException("Writer thread failed", exception)
- }
-
- queue.put(data)
- }
-
- /**
- * Signal the writer to stop.
- */
- override fun close() {
- writerThread.interrupt()
- writerThread.join()
- }
-
- init {
- writerThread.start()
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
deleted file mode 100644
index e6e7e42d..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.*
-import org.opendc.compute.workload.telemetry.table.HostTableReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-import java.util.*
-
-/**
- * A Parquet event writer for [HostTableReader]s.
- */
-public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
-
- override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
- return builder
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun toString(): String = "host-writer"
-
- /**
- * A [WriteSupport] implementation for a [HostTableReader].
- */
- private class HostDataWriteSupport : WriteSupport<HostTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: HostTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(consumer: RecordConsumer, data: HostTableReader) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("host_id", 1)
- consumer.addBinary(UUID.fromString(data.host.id).toBinary())
- consumer.endField("host_id", 1)
-
- consumer.startField("uptime", 2)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 2)
-
- consumer.startField("downtime", 3)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 3)
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 4)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 4)
- }
-
- consumer.startField("cpu_count", 5)
- consumer.addInteger(data.host.cpuCount)
- consumer.endField("cpu_count", 5)
-
- consumer.startField("cpu_limit", 6)
- consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 6)
-
- consumer.startField("cpu_time_active", 7)
- consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 7)
-
- consumer.startField("cpu_time_idle", 8)
- consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 8)
-
- consumer.startField("cpu_time_steal", 9)
- consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 9)
-
- consumer.startField("cpu_time_lost", 10)
- consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 10)
-
- consumer.startField("mem_limit", 11)
- consumer.addLong(data.host.memCapacity)
- consumer.endField("mem_limit", 11)
-
- consumer.startField("power_total", 12)
- consumer.addDouble(data.powerTotal)
- consumer.endField("power_total", 12)
-
- consumer.startField("guests_terminated", 13)
- consumer.addInteger(data.guestsTerminated)
- consumer.endField("guests_terminated", 13)
-
- consumer.startField("guests_running", 14)
- consumer.addInteger(data.guestsRunning)
- consumer.endField("guests_running", 14)
-
- consumer.startField("guests_error", 15)
- consumer.addInteger(data.guestsError)
- consumer.endField("guests_error", 15)
-
- consumer.startField("guests_invalid", 16)
- consumer.addInteger(data.guestsInvalid)
- consumer.endField("guests_invalid", 16)
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- /**
- * The schema of the host data.
- */
- val SCHEMA: MessageType = Types
- .buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
- .length(16)
- .`as`(LogicalTypeAnnotation.uuidType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("boot_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("power_total"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_terminated"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_running"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_error"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_invalid"),
- )
- .named("host")
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
deleted file mode 100644
index 082c7c88..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.*
-import org.opendc.compute.workload.telemetry.table.ServerTableReader
-import org.opendc.trace.util.parquet.LocalParquetWriter
-import java.io.File
-import java.util.*
-
-/**
- * A Parquet event writer for [ServerTableReader]s.
- */
-public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
-
- override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
- return builder
- .withDictionaryEncoding("server_id", true)
- .withDictionaryEncoding("host_id", true)
- .build()
- }
-
- override fun toString(): String = "server-writer"
-
- /**
- * A [WriteSupport] implementation for a [ServerTableReader].
- */
- private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ServerTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(consumer: RecordConsumer, data: ServerTableReader) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("server_id", 1)
- consumer.addBinary(UUID.fromString(data.server.id).toBinary())
- consumer.endField("server_id", 1)
-
- val hostId = data.host?.id
- if (hostId != null) {
- consumer.startField("host_id", 2)
- consumer.addBinary(UUID.fromString(hostId).toBinary())
- consumer.endField("host_id", 2)
- }
-
- consumer.startField("uptime", 3)
- consumer.addLong(data.uptime)
- consumer.endField("uptime", 3)
-
- consumer.startField("downtime", 4)
- consumer.addLong(data.downtime)
- consumer.endField("downtime", 4)
-
- val bootTime = data.bootTime
- if (bootTime != null) {
- consumer.startField("boot_time", 5)
- consumer.addLong(bootTime.toEpochMilli())
- consumer.endField("boot_time", 5)
- }
-
- val provisionTime = data.provisionTime
- if (provisionTime != null) {
- consumer.startField("provision_time", 6)
- consumer.addLong(provisionTime.toEpochMilli())
- consumer.endField("provision_time", 6)
- }
-
- consumer.startField("cpu_count", 7)
- consumer.addInteger(data.server.cpuCount)
- consumer.endField("cpu_count", 7)
-
- consumer.startField("cpu_limit", 8)
- consumer.addDouble(data.cpuLimit)
- consumer.endField("cpu_limit", 8)
-
- consumer.startField("cpu_time_active", 9)
- consumer.addLong(data.cpuActiveTime)
- consumer.endField("cpu_time_active", 9)
-
- consumer.startField("cpu_time_idle", 10)
- consumer.addLong(data.cpuIdleTime)
- consumer.endField("cpu_time_idle", 10)
-
- consumer.startField("cpu_time_steal", 11)
- consumer.addLong(data.cpuStealTime)
- consumer.endField("cpu_time_steal", 11)
-
- consumer.startField("cpu_time_lost", 12)
- consumer.addLong(data.cpuLostTime)
- consumer.endField("cpu_time_lost", 12)
-
- consumer.startField("mem_limit", 13)
- consumer.addLong(data.server.memCapacity)
- consumer.endField("mem_limit", 13)
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- /**
- * The schema of the server data.
- */
- val SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
- .length(16)
- .`as`(LogicalTypeAnnotation.uuidType())
- .named("server_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
- .length(16)
- .`as`(LogicalTypeAnnotation.uuidType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("provision_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("boot_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_limit")
- )
- .named("server")
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
deleted file mode 100644
index 2a0fdca1..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.*
-import org.opendc.compute.workload.telemetry.table.ServiceTableReader
-import java.io.File
-
-/**
- * A Parquet event writer for [ServiceTableReader]s.
- */
-public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
-
- override fun toString(): String = "service-writer"
-
- /**
- * A [WriteSupport] implementation for a [ServiceTableReader].
- */
- private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(SCHEMA, emptyMap())
- }
-
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
-
- override fun write(record: ServiceTableReader) {
- write(recordConsumer, record)
- }
-
- private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
- consumer.startMessage()
-
- consumer.startField("timestamp", 0)
- consumer.addLong(data.timestamp.toEpochMilli())
- consumer.endField("timestamp", 0)
-
- consumer.startField("hosts_up", 1)
- consumer.addInteger(data.hostsUp)
- consumer.endField("hosts_up", 1)
-
- consumer.startField("hosts_down", 2)
- consumer.addInteger(data.hostsDown)
- consumer.endField("hosts_down", 2)
-
- consumer.startField("servers_pending", 3)
- consumer.addInteger(data.serversPending)
- consumer.endField("servers_pending", 3)
-
- consumer.startField("servers_active", 4)
- consumer.addInteger(data.serversActive)
- consumer.endField("servers_active", 4)
-
- consumer.startField("attempts_success", 5)
- consumer.addInteger(data.attemptsSuccess)
- consumer.endField("attempts_pending", 5)
-
- consumer.startField("attempts_failure", 6)
- consumer.addInteger(data.attemptsFailure)
- consumer.endField("attempts_failure", 6)
-
- consumer.startField("attempts_error", 7)
- consumer.addInteger(data.attemptsError)
- consumer.endField("attempts_error", 7)
-
- consumer.endMessage()
- }
- }
-
- private companion object {
- private val SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_up"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_down"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_pending"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_success"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_failure"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_error"),
- )
- .named("service")
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
deleted file mode 100644
index 050e0f0a..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.apache.parquet.io.api.Binary
-import java.nio.ByteBuffer
-import java.util.UUID
-
-/**
- * Helper method to convert a [UUID] into a [Binary] object consumed by Parquet.
- */
-internal fun UUID.toBinary(): Binary {
- val bb = ByteBuffer.allocate(16)
- bb.putLong(mostSignificantBits)
- bb.putLong(leastSignificantBits)
- bb.rewind()
- return Binary.fromConstantByteBuffer(bb)
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
deleted file mode 100644
index 9b2bec55..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.internal
-
-import mu.KotlinLogging
-import org.opendc.compute.workload.ComputeWorkload
-import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
-import java.util.*
-
-/**
- * A [ComputeWorkload] that samples multiple workloads based on the total load of all workloads.
- */
-internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double>) : ComputeWorkload {
- /**
- * The logging instance of this class.
- */
- private val logger = KotlinLogging.logger {}
-
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
-
- val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
-
- val res = mutableListOf<VirtualMachine>()
-
- for ((fraction, vms) in traces) {
- var currentLoad = 0.0
-
- for (entry in vms) {
- val entryLoad = entry.totalLoad
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
- }
-
- val vmCount = traces.sumOf { (_, vms) -> vms.size }
- logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" }
-
- return res
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
deleted file mode 100644
index 52f4c672..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.internal
-
-import mu.KotlinLogging
-import org.opendc.compute.workload.ComputeWorkload
-import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
-import java.util.*
-
-/**
- * A [ComputeWorkload] that samples HPC VMs in the workload.
- *
- * @param fraction The fraction of load/virtual machines to sample
- * @param sampleLoad A flag to indicate that the sampling should be based on the total load or on the number of VMs.
- */
-internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double, val sampleLoad: Boolean = false) : ComputeWorkload {
- /**
- * The logging instance of this class.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
- * The pattern to match compute nodes in the workload.
- */
- private val pattern = Regex("^(ComputeNode|cn).*")
-
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val vms = source.resolve(loader, random)
-
- val (hpc, nonHpc) = vms.partition { entry ->
- val name = entry.name
- name.matches(pattern)
- }
-
- val hpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<VirtualMachine>()
- hpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- val nonHpcSequence = generateSequence(0) { it + 1 }
- .map { index ->
- val res = mutableListOf<VirtualMachine>()
- nonHpc.mapTo(res) { sample(it, index) }
- res.shuffle(random)
- res
- }
- .flatten()
-
- logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" }
-
- val totalLoad = vms.sumOf { it.totalLoad }
-
- logger.debug { "Total trace load: $totalLoad" }
- var hpcCount = 0
- var hpcLoad = 0.0
- var nonHpcCount = 0
- var nonHpcLoad = 0.0
-
- val res = mutableListOf<VirtualMachine>()
-
- if (sampleLoad) {
- var currentLoad = 0.0
- for (entry in hpcSequence) {
- val entryLoad = entry.totalLoad
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- hpcLoad += entryLoad
- hpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
-
- for (entry in nonHpcSequence) {
- val entryLoad = entry.totalLoad
- if ((currentLoad + entryLoad) / totalLoad > 1) {
- break
- }
-
- nonHpcLoad += entryLoad
- nonHpcCount += 1
- currentLoad += entryLoad
- res += entry
- }
- } else {
- hpcSequence
- .take((fraction * vms.size).toInt())
- .forEach { entry ->
- hpcLoad += entry.totalLoad
- hpcCount += 1
- res.add(entry)
- }
-
- nonHpcSequence
- .take(((1 - fraction) * vms.size).toInt())
- .forEach { entry ->
- nonHpcLoad += entry.totalLoad
- nonHpcCount += 1
- res.add(entry)
- }
- }
-
- logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" }
- logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" }
- logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
- }
-
- /**
- * Sample a random trace entry.
- */
- private fun sample(entry: VirtualMachine, i: Int): VirtualMachine {
- val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
- return entry.copy(uid = uid)
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
deleted file mode 100644
index ef6de729..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.internal
-
-import mu.KotlinLogging
-import org.opendc.compute.workload.ComputeWorkload
-import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
-import java.util.*
-
-/**
- * A [ComputeWorkload] that is sampled based on total load.
- */
-internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fraction: Double) : ComputeWorkload {
- /**
- * The logging instance of this class.
- */
- private val logger = KotlinLogging.logger {}
-
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- val vms = source.resolve(loader, random)
- val res = mutableListOf<VirtualMachine>()
-
- val totalLoad = vms.sumOf { it.totalLoad }
- var currentLoad = 0.0
-
- for (entry in vms) {
- val entryLoad = entry.totalLoad
- if ((currentLoad + entryLoad) / totalLoad > fraction) {
- break
- }
-
- currentLoad += entryLoad
- res += entry
- }
-
- logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }
-
- return res
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
deleted file mode 100644
index c20cb8f3..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.internal
-
-import org.opendc.compute.workload.ComputeWorkload
-import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
-import java.util.*
-
-/**
- * A [ComputeWorkload] from a trace.
- */
-internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload {
- override fun resolve(loader: ComputeWorkloadLoader, random: Random): List<VirtualMachine> {
- return loader.get(name, format)
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
deleted file mode 100644
index a0ec4bd6..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry
-
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.isActive
-import kotlinx.coroutines.launch
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.service.ComputeService
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.workload.telemetry.table.*
-import java.time.Clock
-import java.time.Duration
-import java.time.Instant
-
-/**
- * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
- * export interval.
- *
- * @param scope The [CoroutineScope] to run the reader in.
- * @param clock The virtual clock.
- * @param service The [ComputeService] to monitor.
- * @param monitor The monitor to export the metrics to.
- * @param exportInterval The export interval.
- */
-public class ComputeMetricReader(
- scope: CoroutineScope,
- clock: Clock,
- private val service: ComputeService,
- private val monitor: ComputeMonitor,
- private val exportInterval: Duration = Duration.ofMinutes(5)
-) : AutoCloseable {
- private val logger = KotlinLogging.logger {}
-
- /**
- * Aggregator for service metrics.
- */
- private val serviceTableReader = ServiceTableReaderImpl(service)
-
- /**
- * Mapping from [Host] instances to [HostTableReaderImpl]
- */
- private val hostTableReaders = mutableMapOf<Host, HostTableReaderImpl>()
-
- /**
- * Mapping from [Server] instances to [ServerTableReaderImpl]
- */
- private val serverTableReaders = mutableMapOf<Server, ServerTableReaderImpl>()
-
- /**
- * The background job that is responsible for collecting the metrics every cycle.
- */
- private val job = scope.launch {
- val intervalMs = exportInterval.toMillis()
- val service = service
- val monitor = monitor
- val hostTableReaders = hostTableReaders
- val serverTableReaders = serverTableReaders
- val serviceTableReader = serviceTableReader
-
- try {
- while (isActive) {
- delay(intervalMs)
-
- try {
- val now = clock.instant()
-
- for (host in service.hosts) {
- val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
- reader.record(now)
- monitor.record(reader)
- reader.reset()
- }
-
- for (server in service.servers) {
- val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
- reader.record(now)
- monitor.record(reader)
- reader.reset()
- }
-
- serviceTableReader.record(now)
- monitor.record(serviceTableReader)
- } catch (cause: Throwable) {
- logger.warn(cause) { "Exporter threw an Exception" }
- }
- }
- } finally {
- if (monitor is AutoCloseable) {
- monitor.close()
- }
- }
- }
-
- override fun close() {
- job.cancel()
- }
-
- /**
- * An aggregator for service metrics before they are reported.
- */
- private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
- private var _timestamp: Instant = Instant.MIN
- override val timestamp: Instant
- get() = _timestamp
-
- override val hostsUp: Int
- get() = _hostsUp
- private var _hostsUp = 0
-
- override val hostsDown: Int
- get() = _hostsDown
- private var _hostsDown = 0
-
- override val serversPending: Int
- get() = _serversPending
- private var _serversPending = 0
-
- override val serversActive: Int
- get() = _serversActive
- private var _serversActive = 0
-
- override val attemptsSuccess: Int
- get() = _attemptsSuccess
- private var _attemptsSuccess = 0
-
- override val attemptsFailure: Int
- get() = _attemptsFailure
- private var _attemptsFailure = 0
-
- override val attemptsError: Int
- get() = _attemptsError
- private var _attemptsError = 0
-
- /**
- * Record the next cycle.
- */
- fun record(now: Instant) {
- _timestamp = now
-
- val stats = service.getSchedulerStats()
- _hostsUp = stats.hostsAvailable
- _hostsDown = stats.hostsUnavailable
- _serversPending = stats.serversPending
- _serversActive = stats.serversActive
- _attemptsSuccess = stats.attemptsSuccess.toInt()
- _attemptsFailure = stats.attemptsFailure.toInt()
- _attemptsError = stats.attemptsError.toInt()
- }
- }
-
- /**
- * An aggregator for host metrics before they are reported.
- */
- private class HostTableReaderImpl(host: Host) : HostTableReader {
- private val _host = host
-
- override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity)
-
- override val timestamp: Instant
- get() = _timestamp
- private var _timestamp = Instant.MIN
-
- override val guestsTerminated: Int
- get() = _guestsTerminated
- private var _guestsTerminated = 0
-
- override val guestsRunning: Int
- get() = _guestsRunning
- private var _guestsRunning = 0
-
- override val guestsError: Int
- get() = _guestsError
- private var _guestsError = 0
-
- override val guestsInvalid: Int
- get() = _guestsInvalid
- private var _guestsInvalid = 0
-
- override val cpuLimit: Double
- get() = _cpuLimit
- private var _cpuLimit = 0.0
-
- override val cpuUsage: Double
- get() = _cpuUsage
- private var _cpuUsage = 0.0
-
- override val cpuDemand: Double
- get() = _cpuDemand
- private var _cpuDemand = 0.0
-
- override val cpuUtilization: Double
- get() = _cpuUtilization
- private var _cpuUtilization = 0.0
-
- override val cpuActiveTime: Long
- get() = _cpuActiveTime - previousCpuActiveTime
- private var _cpuActiveTime = 0L
- private var previousCpuActiveTime = 0L
-
- override val cpuIdleTime: Long
- get() = _cpuIdleTime - previousCpuIdleTime
- private var _cpuIdleTime = 0L
- private var previousCpuIdleTime = 0L
-
- override val cpuStealTime: Long
- get() = _cpuStealTime - previousCpuStealTime
- private var _cpuStealTime = 0L
- private var previousCpuStealTime = 0L
-
- override val cpuLostTime: Long
- get() = _cpuLostTime - previousCpuLostTime
- private var _cpuLostTime = 0L
- private var previousCpuLostTime = 0L
-
- override val powerUsage: Double
- get() = _powerUsage
- private var _powerUsage = 0.0
-
- override val powerTotal: Double
- get() = _powerTotal - previousPowerTotal
- private var _powerTotal = 0.0
- private var previousPowerTotal = 0.0
-
- override val uptime: Long
- get() = _uptime - previousUptime
- private var _uptime = 0L
- private var previousUptime = 0L
-
- override val downtime: Long
- get() = _downtime - previousDowntime
- private var _downtime = 0L
- private var previousDowntime = 0L
-
- override val bootTime: Instant?
- get() = _bootTime
- private var _bootTime: Instant? = null
-
- /**
- * Record the next cycle.
- */
- fun record(now: Instant) {
- val hostCpuStats = _host.getCpuStats()
- val hostSysStats = _host.getSystemStats()
-
- _timestamp = now
- _guestsTerminated = hostSysStats.guestsTerminated
- _guestsRunning = hostSysStats.guestsRunning
- _guestsError = hostSysStats.guestsError
- _guestsInvalid = hostSysStats.guestsInvalid
- _cpuLimit = hostCpuStats.capacity
- _cpuDemand = hostCpuStats.demand
- _cpuUsage = hostCpuStats.usage
- _cpuUtilization = hostCpuStats.utilization
- _cpuActiveTime = hostCpuStats.activeTime
- _cpuIdleTime = hostCpuStats.idleTime
- _cpuStealTime = hostCpuStats.stealTime
- _cpuLostTime = hostCpuStats.lostTime
- _powerUsage = hostSysStats.powerUsage
- _powerTotal = hostSysStats.energyUsage
- _uptime = hostSysStats.uptime.toMillis()
- _downtime = hostSysStats.downtime.toMillis()
- _bootTime = hostSysStats.bootTime
- }
-
- /**
- * Finish the aggregation for this cycle.
- */
- fun reset() {
- // Reset intermediate state for next aggregation
- previousCpuActiveTime = _cpuActiveTime
- previousCpuIdleTime = _cpuIdleTime
- previousCpuStealTime = _cpuStealTime
- previousCpuLostTime = _cpuLostTime
- previousPowerTotal = _powerTotal
- previousUptime = _uptime
- previousDowntime = _downtime
-
- _guestsTerminated = 0
- _guestsRunning = 0
- _guestsError = 0
- _guestsInvalid = 0
-
- _cpuLimit = 0.0
- _cpuUsage = 0.0
- _cpuDemand = 0.0
- _cpuUtilization = 0.0
-
- _powerUsage = 0.0
- }
- }
-
- /**
- * An aggregator for server metrics before they are reported.
- */
- private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader {
- private val _server = server
-
- /**
- * The static information about this server.
- */
- override val server = ServerInfo(
- server.uid.toString(),
- server.name,
- "vm",
- "x86",
- server.image.uid.toString(),
- server.image.name,
- server.flavor.cpuCount,
- server.flavor.memorySize
- )
-
- /**
- * The [HostInfo] of the host on which the server is hosted.
- */
- override var host: HostInfo? = null
- private var _host: Host? = null
-
- private var _timestamp = Instant.MIN
- override val timestamp: Instant
- get() = _timestamp
-
- override val uptime: Long
- get() = _uptime - previousUptime
- private var _uptime: Long = 0
- private var previousUptime = 0L
-
- override val downtime: Long
- get() = _downtime - previousDowntime
- private var _downtime: Long = 0
- private var previousDowntime = 0L
-
- override val provisionTime: Instant?
- get() = _provisionTime
- private var _provisionTime: Instant? = null
-
- override val bootTime: Instant?
- get() = _bootTime
- private var _bootTime: Instant? = null
-
- override val cpuLimit: Double
- get() = _cpuLimit
- private var _cpuLimit = 0.0
-
- override val cpuActiveTime: Long
- get() = _cpuActiveTime - previousCpuActiveTime
- private var _cpuActiveTime = 0L
- private var previousCpuActiveTime = 0L
-
- override val cpuIdleTime: Long
- get() = _cpuIdleTime - previousCpuIdleTime
- private var _cpuIdleTime = 0L
- private var previousCpuIdleTime = 0L
-
- override val cpuStealTime: Long
- get() = _cpuStealTime - previousCpuStealTime
- private var _cpuStealTime = 0L
- private var previousCpuStealTime = 0L
-
- override val cpuLostTime: Long
- get() = _cpuLostTime - previousCpuLostTime
- private var _cpuLostTime = 0L
- private var previousCpuLostTime = 0L
-
- /**
- * Record the next cycle.
- */
- fun record(now: Instant) {
- val newHost = service.lookupHost(_server)
- if (newHost != null && newHost.uid != _host?.uid) {
- _host = newHost
- host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
- }
-
- val cpuStats = _host?.getCpuStats(_server)
- val sysStats = _host?.getSystemStats(_server)
-
- _timestamp = now
- _cpuLimit = cpuStats?.capacity ?: 0.0
- _cpuActiveTime = cpuStats?.activeTime ?: 0
- _cpuIdleTime = cpuStats?.idleTime ?: 0
- _cpuStealTime = cpuStats?.stealTime ?: 0
- _cpuLostTime = cpuStats?.lostTime ?: 0
- _uptime = sysStats?.uptime?.toMillis() ?: 0
- _downtime = sysStats?.downtime?.toMillis() ?: 0
- _provisionTime = _server.launchedAt
- _bootTime = sysStats?.bootTime
- }
-
- /**
- * Finish the aggregation for this cycle.
- */
- fun reset() {
- previousUptime = _uptime
- previousDowntime = _downtime
- previousCpuActiveTime = _cpuActiveTime
- previousCpuIdleTime = _cpuIdleTime
- previousCpuStealTime = _cpuStealTime
- previousCpuLostTime = _cpuLostTime
-
- _host = null
- _cpuLimit = 0.0
- }
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt
deleted file mode 100644
index 36a2079a..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry
-
-import org.opendc.compute.workload.telemetry.table.HostTableReader
-import org.opendc.compute.workload.telemetry.table.ServerTableReader
-import org.opendc.compute.workload.telemetry.table.ServiceTableReader
-
-/**
- * A monitor that tracks the metrics and events of the OpenDC Compute service.
- */
-public interface ComputeMonitor {
- /**
- * Record an entry with the specified [reader].
- */
- public fun record(reader: ServerTableReader) {}
-
- /**
- * Record an entry with the specified [reader].
- */
- public fun record(reader: HostTableReader) {}
-
- /**
- * Record an entry with the specified [reader].
- */
- public fun record(reader: ServiceTableReader) {}
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt
deleted file mode 100644
index 5d383e40..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry.table
-
-/**
- * Information about a host exposed to the telemetry service.
- */
-public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt
deleted file mode 100644
index 8f6f0d01..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry.table
-
-import java.time.Instant
-
-/**
- * An interface that is used to read a row of a host trace entry.
- */
-public interface HostTableReader {
- /**
- * The timestamp of the current entry of the reader.
- */
- public val timestamp: Instant
-
- /**
- * The [HostInfo] of the host to which the row belongs to.
- */
- public val host: HostInfo
-
- /**
- * The number of guests that are in a terminated state.
- */
- public val guestsTerminated: Int
-
- /**
- * The number of guests that are in a running state.
- */
- public val guestsRunning: Int
-
- /**
- * The number of guests that are in an error state.
- */
- public val guestsError: Int
-
- /**
- * The number of guests that are in an unknown state.
- */
- public val guestsInvalid: Int
-
- /**
- * The capacity of the CPUs in the host (in MHz).
- */
- public val cpuLimit: Double
-
- /**
- * The usage of all CPUs in the host (in MHz).
- */
- public val cpuUsage: Double
-
- /**
- * The demand of all vCPUs of the guests (in MHz)
- */
- public val cpuDemand: Double
-
- /**
- * The CPU utilization of the host.
- */
- public val cpuUtilization: Double
-
- /**
- * The duration (in seconds) that a CPU was active in the host.
- */
- public val cpuActiveTime: Long
-
- /**
- * The duration (in seconds) that a CPU was idle in the host.
- */
- public val cpuIdleTime: Long
-
- /**
- * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
- */
- public val cpuStealTime: Long
-
- /**
- * The duration (in seconds) of CPU time that was lost due to interference.
- */
- public val cpuLostTime: Long
-
- /**
- * The current power usage of the host in W.
- */
- public val powerUsage: Double
-
- /**
- * The total power consumption of the host since last time in J.
- */
- public val powerTotal: Double
-
- /**
- * The uptime of the host since last time in ms.
- */
- public val uptime: Long
-
- /**
- * The downtime of the host since last time in ms.
- */
- public val downtime: Long
-
- /**
- * The [Instant] at which the host booted.
- */
- public val bootTime: Instant?
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt
deleted file mode 100644
index 111135b7..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry.table
-
-/**
- * Static information about a server exposed to the telemetry service.
- */
-public data class ServerInfo(
- val id: String,
- val name: String,
- val type: String,
- val arch: String,
- val imageId: String,
- val imageName: String,
- val cpuCount: Int,
- val memCapacity: Long
-)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt
deleted file mode 100644
index bccccd01..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry.table
-
-import java.time.Instant
-
-/**
- * An interface that is used to read a row of a server trace entry.
- */
-public interface ServerTableReader {
- /**
- * The timestamp of the current entry of the reader.
- */
- public val timestamp: Instant
-
- /**
- * The [ServerInfo] of the server to which the row belongs to.
- */
- public val server: ServerInfo
-
- /**
- * The [HostInfo] of the host on which the server is hosted or `null` if it has no host.
- */
- public val host: HostInfo?
-
- /**
- * The uptime of the host since last time in ms.
- */
- public val uptime: Long
-
- /**
- * The downtime of the host since last time in ms.
- */
- public val downtime: Long
-
- /**
- * The [Instant] at which the server was enqueued for the scheduler.
- */
- public val provisionTime: Instant?
-
- /**
- * The [Instant] at which the server booted.
- */
- public val bootTime: Instant?
-
- /**
- * The capacity of the CPUs of the servers (in MHz).
- */
- public val cpuLimit: Double
-
- /**
- * The duration (in seconds) that a CPU was active in the server.
- */
- public val cpuActiveTime: Long
-
- /**
- * The duration (in seconds) that a CPU was idle in the server.
- */
- public val cpuIdleTime: Long
-
- /**
- * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
- */
- public val cpuStealTime: Long
-
- /**
- * The duration (in seconds) of CPU time that was lost due to interference.
- */
- public val cpuLostTime: Long
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt
deleted file mode 100644
index a1df6ea7..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry.table
-
-import java.time.Instant
-
-/**
- * A trace entry for the compute service.
- */
-public data class ServiceData(
- val timestamp: Instant,
- val hostsUp: Int,
- val hostsDown: Int,
- val serversPending: Int,
- val serversActive: Int,
- val attemptsSuccess: Int,
- val attemptsFailure: Int,
- val attemptsError: Int
-)
-
-/**
- * Convert a [ServiceTableReader] into a persistent object.
- */
-public fun ServiceTableReader.toServiceData(): ServiceData {
- return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt
deleted file mode 100644
index 4211ab15..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.telemetry.table
-
-import java.time.Instant
-
-/**
- * An interface that is used to read a row of a service trace entry.
- */
-public interface ServiceTableReader {
- /**
- * The timestamp of the current entry of the reader.
- */
- public val timestamp: Instant
-
- /**
- * The number of hosts that are up at this instant.
- */
- public val hostsUp: Int
-
- /**
- * The number of hosts that are down at this instant.
- */
- public val hostsDown: Int
-
- /**
- * The number of servers that are pending to be scheduled.
- */
- public val serversPending: Int
-
- /**
- * The number of servers that are currently active.
- */
- public val serversActive: Int
-
- /**
- * The scheduling attempts that were successful.
- */
- public val attemptsSuccess: Int
-
- /**
- * The scheduling attempts that were unsuccessful due to client error.
- */
- public val attemptsFailure: Int
-
- /**
- * The scheduling attempts that were unsuccessful due to scheduler error.
- */
- public val attemptsError: Int
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
deleted file mode 100644
index 87530f5a..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.topology
-
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerDriver
-import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
-import java.util.*
-
-/**
- * Description of a physical host that will be simulated by OpenDC and host the virtual machines.
- *
- * @param uid Unique identifier of the host.
- * @param name The name of the host.
- * @param meta The metadata of the host.
- * @param model The physical model of the machine.
- * @param powerDriver The [PowerDriver] to model the power consumption of the machine.
- * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host.
- */
-public data class HostSpec(
- val uid: UUID,
- val name: String,
- val meta: Map<String, Any>,
- val model: MachineModel,
- val powerDriver: PowerDriver,
- val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer()
-)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
deleted file mode 100644
index 3b8dc918..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/Topology.kt
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.topology
-
-/**
- * Representation of the environment of the compute service, describing the physical details of every host.
- */
-public interface Topology {
- /**
- * Resolve the [Topology] into a list of [HostSpec]s.
- */
- public fun resolve(): List<HostSpec>
-}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
deleted file mode 100644
index de4300c7..00000000
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("TopologyHelpers")
-package org.opendc.compute.workload.topology
-
-import org.opendc.compute.workload.ComputeServiceHelper
-
-/**
- * Apply the specified [topology] to the given [ComputeServiceHelper].
- */
-public fun ComputeServiceHelper.apply(topology: Topology, optimize: Boolean = false) {
- val hosts = topology.resolve()
- for (spec in hosts) {
- registerHost(spec, optimize)
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
deleted file mode 100644
index 4344bb08..00000000
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.opendc.compute.workload.telemetry.table.HostInfo
-import org.opendc.compute.workload.telemetry.table.HostTableReader
-import java.nio.file.Files
-import java.time.Instant
-
-/**
- * Test suite for [ParquetHostDataWriter]
- */
-class HostDataWriterTest {
- /**
- * The path to write the data file to.
- */
- private val path = Files.createTempFile("opendc", "parquet")
-
- /**
- * The writer used to write the data.
- */
- private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096)
-
- @AfterEach
- fun tearDown() {
- writer.close()
- Files.deleteIfExists(path)
- }
-
- @Test
- fun testSmoke() {
- assertDoesNotThrow {
- writer.write(object : HostTableReader {
- override val timestamp: Instant = Instant.now()
- override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
- override val guestsTerminated: Int = 0
- override val guestsRunning: Int = 0
- override val guestsError: Int = 0
- override val guestsInvalid: Int = 0
- override val cpuLimit: Double = 4096.0
- override val cpuUsage: Double = 1.0
- override val cpuDemand: Double = 1.0
- override val cpuUtilization: Double = 0.0
- override val cpuActiveTime: Long = 1
- override val cpuIdleTime: Long = 1
- override val cpuStealTime: Long = 1
- override val cpuLostTime: Long = 1
- override val powerUsage: Double = 1.0
- override val powerTotal: Double = 1.0
- override val uptime: Long = 1
- override val downtime: Long = 1
- override val bootTime: Instant? = null
- })
- }
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
deleted file mode 100644
index 8465871d..00000000
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.opendc.compute.workload.telemetry.table.HostInfo
-import org.opendc.compute.workload.telemetry.table.ServerInfo
-import org.opendc.compute.workload.telemetry.table.ServerTableReader
-import java.nio.file.Files
-import java.time.Instant
-
-/**
- * Test suite for [ParquetServerDataWriter]
- */
-class ServerDataWriterTest {
- /**
- * The path to write the data file to.
- */
- private val path = Files.createTempFile("opendc", "parquet")
-
- /**
- * The writer used to write the data.
- */
- private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096)
-
- @AfterEach
- fun tearDown() {
- writer.close()
- Files.deleteIfExists(path)
- }
-
- @Test
- fun testSmoke() {
- assertDoesNotThrow {
- writer.write(object : ServerTableReader {
- override val timestamp: Instant = Instant.now()
- override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096)
- override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
- override val cpuLimit: Double = 4096.0
- override val cpuActiveTime: Long = 1
- override val cpuIdleTime: Long = 1
- override val cpuStealTime: Long = 1
- override val cpuLostTime: Long = 1
- override val uptime: Long = 1
- override val downtime: Long = 1
- override val provisionTime: Instant = timestamp
- override val bootTime: Instant? = null
- })
- }
- }
-}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
deleted file mode 100644
index d91982bc..00000000
--- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.compute.workload.export.parquet
-
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertDoesNotThrow
-import org.opendc.compute.workload.telemetry.table.ServiceTableReader
-import java.nio.file.Files
-import java.time.Instant
-
-/**
- * Test suite for [ParquetServiceDataWriter]
- */
-class ServiceDataWriterTest {
- /**
- * The path to write the data file to.
- */
- private val path = Files.createTempFile("opendc", "parquet")
-
- /**
- * The writer used to write the data.
- */
- private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096)
-
- @AfterEach
- fun tearDown() {
- writer.close()
- Files.deleteIfExists(path)
- }
-
- @Test
- fun testSmoke() {
- assertDoesNotThrow {
- writer.write(object : ServiceTableReader {
- override val timestamp: Instant = Instant.now()
- override val hostsUp: Int = 1
- override val hostsDown: Int = 0
- override val serversPending: Int = 1
- override val serversActive: Int = 1
- override val attemptsSuccess: Int = 1
- override val attemptsFailure: Int = 0
- override val attemptsError: Int = 0
- })
- }
- }
-}