summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt13
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt9
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt55
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt26
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt8
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt9
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt3
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt2
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt18
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt10
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt297
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt32
-rw-r--r--opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt6
14 files changed, 412 insertions, 78 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt
index f88eaed8..59acfce2 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt
@@ -2,9 +2,12 @@ package com.atlarge.opendc.compute.virt.service.allocation
import com.atlarge.opendc.compute.virt.service.HypervisorView
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
+import mu.KotlinLogging
+
+private val logger = KotlinLogging.logger {}
/**
- * Policy replaying VM-cluster assignnment.
+ * Policy replaying VM-cluster assignment.
*
* Within each cluster, the active servers on each node determine which node gets
* assigned the VM image.
@@ -18,8 +21,14 @@ class ReplayAllocationPolicy(val vmPlacements: Map<String, String>) : Allocation
val clusterName = vmPlacements[image.name]
?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}")
val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) }
+
+ if (machinesInCluster.isEmpty()) {
+ logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." }
+ return hypervisors.maxBy { it.availableMemory }
+ }
+
return machinesInCluster.maxBy { it.availableMemory }
- ?: throw IllegalStateException("Cloud not find any machines belonging to cluster $clusterName for image ${image.name}")
+ ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign")
}
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
index 14de52b8..677af381 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt
@@ -24,12 +24,13 @@
package com.atlarge.opendc.experiments.sc20
+import com.atlarge.opendc.experiments.sc20.experiment.CompositeWorkloadPortfolio
import com.atlarge.opendc.experiments.sc20.experiment.Experiment
import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio
-import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio
import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio
import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio
import com.atlarge.opendc.experiments.sc20.experiment.Portfolio
+import com.atlarge.opendc.experiments.sc20.experiment.ReplayPortfolio
import com.atlarge.opendc.experiments.sc20.experiment.TestPortfolio
import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter
import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor
@@ -96,10 +97,12 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") {
*/
private val portfolios by option("--portfolio")
.choice(
- "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio,
+ "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) }
+ as (Experiment, Int) -> Portfolio,
"more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) },
- "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) },
+ "composite-workload" to { experiment, i -> CompositeWorkloadPortfolio(experiment, i) },
"operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) },
+ "replay" to { experiment, i -> ReplayPortfolio(experiment, i) },
"test" to { experiment, i -> TestPortfolio(experiment, i) },
ignoreCase = true
)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 9d2b0247..be40e3ca 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -216,7 +216,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
while (reader.hasNext()) {
val (time, workload) = reader.next()
- if (vmPlacements.isNotEmpty()) {
+ if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) {
val vmId = workload.name.replace("VM Workload ", "")
// Check if VM in topology
val clusterName = vmPlacements[vmId]
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
index 362144ae..89012036 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt
@@ -24,6 +24,7 @@
package com.atlarge.opendc.experiments.sc20.experiment
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena
import com.atlarge.opendc.experiments.sc20.experiment.model.Topology
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
@@ -82,7 +83,9 @@ public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(pare
)
}
-public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") {
+public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "composite-workload") {
+ private val totalSampleLoad = 3425709788935.9976
+
override val topologies = listOf(
Topology("base"),
Topology("exp-vol-hor-hom"),
@@ -91,14 +94,35 @@ public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, i
)
override val workloads = listOf(
- Workload("solvinity", 0.1),
- Workload("solvinity", 0.25),
- Workload("solvinity", 0.5),
- Workload("solvinity", 1.0)
+ CompositeWorkload(
+ "all-azure",
+ listOf(Workload("solvinity", 0.0), Workload("azure", 1.0)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-25-azure-75",
+ listOf(Workload("solvinity", 0.25), Workload("azure", 0.75)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-50-azure-50",
+ listOf(Workload("solvinity", 0.5), Workload("azure", 0.5)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "solvinity-75-azure-25",
+ listOf(Workload("solvinity", 0.75), Workload("azure", 0.25)),
+ totalSampleLoad
+ ),
+ CompositeWorkload(
+ "all-solvinity",
+ listOf(Workload("solvinity", 1.0), Workload("azure", 0.0)),
+ totalSampleLoad
+ )
)
override val operationalPhenomenas = listOf(
- OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true)
+ OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false)
)
override val allocationPolicies = listOf(
@@ -136,6 +160,25 @@ public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfo
)
}
+public class ReplayPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "replay") {
+ override val topologies = listOf(
+ Topology("base")
+ )
+
+ override val workloads = listOf(
+ Workload("solvinity", 1.0)
+ )
+
+ override val operationalPhenomenas = listOf(
+ OperationalPhenomena(failureFrequency = 0.0, hasInterference = false)
+ )
+
+ override val allocationPolicies = listOf(
+ "replay",
+ "active-servers"
+ )
+}
+
public class TestPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "test") {
override val repetitions: Int = 1
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index fd3e29c8..5d1c29e2 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersA
import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor
import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext
@@ -78,23 +79,32 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
"provisioned-cores" -> ProvisionedCoresAllocationPolicy()
"provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
"random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- "replay" -> ReplayAllocationPolicy(emptyMap())
+ "replay" -> ReplayAllocationPolicy(experiment.vmPlacements)
else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}")
}
@Suppress("UNCHECKED_CAST")
- val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader>
- val raw = synchronized(rawTraceReaders) {
- val name = parent.workload.name
- rawTraceReaders.computeIfAbsent(name) {
- logger.info { "Loading trace $name" }
- Sc20RawParquetTraceReader(File(experiment.traces, name))
+ val rawTraceReaders =
+ context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf<String, Sc20RawParquetTraceReader>() } as MutableMap<String, Sc20RawParquetTraceReader>
+ val rawReaders = synchronized(rawTraceReaders) {
+ val workloadNames = if (parent.workload is CompositeWorkload) {
+ parent.workload.workloads.map { it.name }
+ } else {
+ listOf(parent.workload.name)
+ }
+
+ workloadNames.map { workloadName ->
+ rawTraceReaders.computeIfAbsent(workloadName) {
+ logger.info { "Loading trace $workloadName" }
+ Sc20RawParquetTraceReader(File(experiment.traces, workloadName))
+ }
}
}
+
val performanceInterferenceModel = experiment.performanceInterferenceModel
?.takeIf { parent.operationalPhenomena.hasInterference }
?.construct(seeder) ?: emptyMap()
- val trace = Sc20ParquetTraceReader(raw, performanceInterferenceModel, parent.workload, seed)
+ val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed)
val monitor = ParquetExperimentMonitor(this)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
index 2dbdf570..cc3c448a 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt
@@ -27,4 +27,10 @@ package com.atlarge.opendc.experiments.sc20.experiment.model
/**
* A workload that is considered for a scenario.
*/
-public class Workload(val name: String, val fraction: Double)
+public open class Workload(open val name: String, val fraction: Double)
+
+/**
+ * A workload that is composed of multiple workloads.
+ */
+public class CompositeWorkload(override val name: String, val workloads: List<Workload>, val totalLoad: Double) :
+ Workload(name, -1.0)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
index 7f71eb3e..be60e5b7 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt
@@ -131,7 +131,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
)
currentHostEvent[hostServer] = event
@@ -148,7 +149,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
)
currentHostEvent[hostServer] = event
@@ -167,7 +169,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor {
interferedBurst,
cpuUsage,
cpuDemand,
- lastPowerConsumption[hostServer] ?: 200.0
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
)
currentHostEvent[hostServer] = event
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
index 8e91bca2..b9030172 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt
@@ -40,5 +40,6 @@ data class HostEvent(
val interferedBurst: Long,
val cpuUsage: Double,
val cpuDemand: Double,
- val powerDraw: Double
+ val powerDraw: Double,
+ val cores: Int
) : Event("host-metrics")
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
index 523897b0..3bc09435 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt
@@ -55,6 +55,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
record.put("cpu_usage", event.cpuUsage)
record.put("cpu_demand", event.cpuDemand)
record.put("power_draw", event.powerDraw * (1.0 / 12))
+ record.put("cores", event.cores)
}
val schema: Schema = SchemaBuilder
@@ -76,6 +77,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) :
.name("cpu_usage").type().doubleType().noDefault()
.name("cpu_demand").type().doubleType().noDefault()
.name("power_draw").type().doubleType().noDefault()
+ .name("cores").type().intType().noDefault()
.endRecord()
}
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
index ad50bf18..06bececf 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt
@@ -28,6 +28,7 @@ import com.atlarge.opendc.compute.core.image.VmImage
import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
@@ -42,7 +43,7 @@ import java.util.TreeSet
*/
@OptIn(ExperimentalStdlibApi::class)
class Sc20ParquetTraceReader(
- raw: Sc20RawParquetTraceReader,
+ rawReaders: List<Sc20RawParquetTraceReader>,
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
workload: Workload,
seed: Int
@@ -51,8 +52,17 @@ class Sc20ParquetTraceReader(
* The iterator over the actual trace.
*/
private val iterator: Iterator<TraceEntry<VmWorkload>> =
- raw.read()
- .run { sampleWorkload(this, workload, seed) }
+ rawReaders
+ .map { it.read() }
+ .run {
+ if (workload is CompositeWorkload) {
+ this.zip(workload.workloads)
+ } else {
+ this.zip(listOf(workload))
+ }
+ }
+ .map { sampleWorkload(it.first, workload, it.second, seed) }
+ .flatten()
.run {
// Apply performance interference model
if (performanceInterferenceModel.isEmpty())
@@ -62,7 +72,7 @@ class Sc20ParquetTraceReader(
val image = entry.workload.image
val id = image.name
val relevantPerformanceInterferenceModelItems =
- performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
+ performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet())
val newImage =
VmImage(
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
index 3b480d33..652f7746 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt
@@ -98,15 +98,18 @@ class Sc20RawParquetTraceReader(private val path: File) {
return try {
while (true) {
val record = metaReader.read() ?: break
+
val id = record["id"].toString()
+ if (!fragments.containsKey(id)) {
+ continue
+ }
+
val submissionTime = record["submissionTime"] as Long
val endTime = record["endTime"] as Long
val maxCores = record["maxCores"] as Int
val requiredMemory = record["requiredMemory"] as Long
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
- logger.info { "VM $id" }
-
val vmFragments = fragments.getValue(id).asSequence()
val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs
val vmWorkload = VmWorkload(
@@ -129,6 +132,9 @@ class Sc20RawParquetTraceReader(private val path: File) {
}
entries
+ } catch (e: Exception) {
+ e.printStackTrace()
+ throw e
} finally {
metaReader.close()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 04cdd302..32a188b5 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -24,14 +24,18 @@
package com.atlarge.opendc.experiments.sc20.trace
+import me.tongfei.progressbar.ProgressBar
+import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
+import java.util.Random
import kotlin.math.max
import kotlin.math.min
@@ -39,8 +43,8 @@ import kotlin.math.min
* A script to convert a trace in text format into a Parquet trace.
*/
fun main(args: Array<String>) {
- if (args.size < 2) {
- println("error: expected <INPUT> <OUTPUT>")
+ if (args.size < 3) {
+ println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> [<SEED>]")
return
}
@@ -66,31 +70,117 @@ fun main(args: Array<String>) {
.name("flops").type().longType().noDefault()
.endRecord()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val vmIdCol = 19
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
val dest = File(args[0])
val traceDirectory = File(args[1])
- val vms =
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
+ val metaParquet = File(dest.absolutePath, "meta.parquet")
+ val traceParquet = File(dest.absolutePath, "trace.parquet")
+
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet"))
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
.withSchema(metaSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
.build()
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val traceType = args[2]
+ val allFragments = if (traceType == "solvinity") {
+ readSolvinityTrace(traceDirectory, metaSchema, metaWriter)
+ } else {
+ val seed = args[3].toLong()
+ readAzureTrace(traceDirectory, metaSchema, metaWriter, seed)
+ }
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+
+ writer.close()
+ metaWriter.close()
+}
+
+data class Fragment(
+ val id: String,
+ val tick: Long,
+ val flops: Long,
+ val duration: Long,
+ val usage: Double,
+ val cores: Int
+)
+
+/**
+ * Reads the confidential Solvinity trace.
+ */
+fun readSolvinityTrace(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ // Identify start time of the entire trace
+ var minTimestamp = Long.MAX_VALUE
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEachIndexed { idx, lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(" ")
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ if (timestamp < minTimestamp) {
+ minTimestamp = timestamp
+ }
+ return@forEach
+ }
+ }
+ }
+ }
+
+ println("Start of trace at $minTimestamp")
+
val allFragments = mutableListOf<Fragment>()
- vms
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
.forEachIndexed { idx, vmFile ->
println(vmFile)
@@ -116,7 +206,7 @@ fun main(args: Array<String>) {
val values = line.split(" ")
vmId = vmFile.name
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
cores = values[coreCol].trim().toInt()
requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
maxCores = max(maxCores, cores)
@@ -176,29 +266,164 @@ fun main(args: Array<String>) {
metaWriter.write(metaRecord)
}
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
+ return allFragments
+}
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+/**
+ * Reads the Azure cloud trace.
+ *
+ * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace.
+ */
+fun readAzureTrace(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>,
+ seed: Long
+): MutableList<Fragment> {
+ val random = Random(seed)
+ val fraction = 0.01
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
+ // Read VM table
+ val vmIdTableCol = 0
+ val coreTableCol = 9
+ val provisionedMemoryTableCol = 10
- writer.write(record)
+ var vmId: String
+ var cores: Int
+ var requiredMemory: Long
+
+ val vmIds = mutableSetOf<String>()
+ val vmIdToMetadata = mutableMapOf<String, VmInfo>()
+
+ BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+ // Sample only a fraction of the VMs
+ if (random.nextDouble() > fraction) {
+ continue
+ }
+
+ val values = line.split(",")
+
+ // Exclude VMs with a large number of cores (not specified exactly)
+ if (values[coreTableCol].contains(">")) {
+ continue
+ }
+
+ vmId = values[vmIdTableCol].trim()
+ cores = values[coreTableCol].trim().toInt()
+ requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+
+ vmIds.add(vmId)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
+ }
+ }
}
- writer.close()
- metaWriter.close()
+ // Read VM metric reading files
+ val timestampCol = 0
+ val vmIdCol = 1
+ val cpuUsageCol = 4
+ val traceInterval = 5 * 60 * 1000L
+
+ val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
+ val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
+ val allFragments = mutableListOf<Fragment>()
+
+ for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
+ val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
+ var timestamp: Long
+ var cpuUsage: Double
+
+ BufferedReader(FileReader(readingsFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(",")
+ vmId = values[vmIdCol].trim()
+
+ // Ignore readings for VMs not in the sample
+ if (!vmIds.contains(vmId)) {
+ continue
+ }
+
+ timestamp = values[timestampCol].trim().toLong() * 1000L
+ vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
+ cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
+ vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val lastFragment = vmIdToLastFragment[vmId]
+
+ vmIdToLastFragment[vmId] =
+ if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
+ Fragment(
+ vmId,
+ lastFragment.tick,
+ lastFragment.flops + flops,
+ lastFragment.duration + traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ if (lastFragment != null) {
+ if (vmIdToFragments[vmId] == null) {
+ vmIdToFragments[vmId] = mutableListOf()
+ }
+ vmIdToFragments[vmId]!!.add(lastFragment)
+ allFragments.add(lastFragment)
+ }
+ fragment
+ }
+ }
+ }
+ }
+ }
+
+ for (entry in vmIdToLastFragment) {
+ if (entry.value != null) {
+ if (vmIdToFragments[entry.key] == null) {
+ vmIdToFragments[entry.key] = mutableListOf()
+ }
+ vmIdToFragments[entry.key]!!.add(entry.value!!)
+ }
+ }
+
+ println("Read ${vmIdToLastFragment.size} VMs")
+
+ for (entry in vmIdToMetadata) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", entry.key)
+ metaRecord.put("submissionTime", entry.value.minTime)
+ metaRecord.put("endTime", entry.value.maxTime)
+ println("${entry.value.minTime} - ${entry.value.maxTime}")
+ metaRecord.put("maxCores", entry.value.cores)
+ metaRecord.put("requiredMemory", entry.value.requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
}
-data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int)
+class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
index a8580686..dd70d4f1 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.compute.core.workload.VmWorkload
+import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload
import com.atlarge.opendc.experiments.sc20.experiment.model.Workload
import com.atlarge.opendc.format.trace.TraceEntry
import mu.KotlinLogging
@@ -35,22 +36,37 @@ private val logger = KotlinLogging.logger {}
/**
* Sample the workload for the specified [run].
*/
-fun sampleWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> {
- return sampleRegularWorkload(trace, workload, seed)
+fun sampleWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<VmWorkload>> {
+ return if (workload is CompositeWorkload) {
+ sampleRegularWorkload(trace, workload, subWorkload, seed)
+ } else {
+ sampleRegularWorkload(trace, workload, workload, seed)
+ }
}
/**
* Sample a regular (non-HPC) workload.
*/
-fun sampleRegularWorkload(trace: List<TraceEntry<VmWorkload>>, workload: Workload, seed: Int): List<TraceEntry<VmWorkload>> {
- val fraction = workload.fraction
- if (fraction >= 1) {
- return trace
- }
+fun sampleRegularWorkload(
+ trace: List<TraceEntry<VmWorkload>>,
+ workload: Workload,
+ subWorkload: Workload,
+ seed: Int
+): List<TraceEntry<VmWorkload>> {
+ val fraction = subWorkload.fraction
val shuffled = trace.shuffled(Random(seed))
val res = mutableListOf<TraceEntry<VmWorkload>>()
- val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ val totalLoad = if (workload is CompositeWorkload) {
+ workload.totalLoad
+ } else {
+ shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double }
+ }
var currentLoad = 0.0
for (entry in shuffled) {
diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 68c2cbc5..5ecf7605 100644
--- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -145,8 +145,8 @@ class Sc20IntegrationTest {
assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs")
assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run")
assertEquals(207379117949, monitor.totalRequestedBurst)
- assertEquals(207102919834, monitor.totalGrantedBurst)
- assertEquals(276198896, monitor.totalOvercommissionedBurst)
+ assertEquals(203388071813, monitor.totalGrantedBurst)
+ assertEquals(3991046136, monitor.totalOvercommissionedBurst)
assertEquals(0, monitor.totalInterferedBurst)
}
@@ -204,7 +204,7 @@ class Sc20IntegrationTest {
*/
private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<VmWorkload> {
return Sc20ParquetTraceReader(
- Sc20RawParquetTraceReader(File("src/test/resources/trace")),
+ listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))),
emptyMap(),
Workload("test", fraction),
seed