summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-telemetry/src
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-09-12 15:32:47 +0200
committerGitHub <noreply@github.com>2024-09-12 15:32:47 +0200
commit5047e4a25a0814f96852882f02c4017e1d5f81e7 (patch)
tree348f064fd8e03a2a64fc5b30406e992586b4aac0 /opendc-compute/opendc-compute-telemetry/src
parentad8051faa1f0a6e7f78384e9e0607e847848c033 (diff)
Added max number of failures (#254)
* Added a max failure for tasks. If tasks fail more times, they get cancelled * Added maxNumFailures to the frontend * Updated tests
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry/src')
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt25
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt16
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt8
3 files changed, 35 insertions, 14 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
index 5bd237fd..a98a1a6d 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
@@ -30,6 +30,7 @@ import mu.KotlinLogging
import org.opendc.common.Dispatcher
import org.opendc.common.asCoroutineDispatcher
import org.opendc.compute.api.Task
+import org.opendc.compute.api.TaskState
import org.opendc.compute.carbon.CarbonTrace
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
@@ -422,11 +423,11 @@ public class ComputeMetricReader(
*/
private class TaskTableReaderImpl(
private val service: ComputeService,
- task: Task,
+ private val task: Task,
private val startTime: Duration = Duration.ofMillis(0),
) : TaskTableReader {
override fun copy(): TaskTableReader {
- val newTaskTable = TaskTableReaderImpl(service, _task)
+ val newTaskTable = TaskTableReaderImpl(service, task)
newTaskTable.setValues(this)
return newTaskTable
@@ -448,14 +449,14 @@ public class ComputeMetricReader(
_provisionTime = table.provisionTime
_bootTime = table.bootTime
_bootTimeAbsolute = table.bootTimeAbsolute
- }
- private val _task = task
+ _taskState = table.taskState
+ }
/**
* The static information about this task.
*/
- override val task =
+ override val taskInfo =
TaskInfo(
task.uid.toString(),
task.name,
@@ -527,18 +528,22 @@ public class ComputeMetricReader(
get() = _bootTimeAbsolute
private var _bootTimeAbsolute: Instant? = null
+ override val taskState: TaskState?
+ get() = _taskState
+ private var _taskState: TaskState? = null
+
/**
* Record the next cycle.
*/
fun record(now: Instant) {
- val newHost = service.lookupHost(_task)
+ val newHost = service.lookupHost(task)
if (newHost != null && newHost.uid != _host?.uid) {
_host = newHost
host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
}
- val cpuStats = _host?.getCpuStats(_task)
- val sysStats = _host?.getSystemStats(_task)
+ val cpuStats = _host?.getCpuStats(task)
+ val sysStats = _host?.getSystemStats(task)
_timestamp = now
_timestampAbsolute = now + startTime
@@ -550,9 +555,11 @@ public class ComputeMetricReader(
_cpuLostTime = cpuStats?.lostTime ?: 0
_uptime = sysStats?.uptime?.toMillis() ?: 0
_downtime = sysStats?.downtime?.toMillis() ?: 0
- _provisionTime = _task.launchedAt
+ _provisionTime = task.launchedAt
_bootTime = sysStats?.bootTime
+ _taskState = task.state
+
if (sysStats != null) {
_bootTimeAbsolute = sysStats.bootTime + startTime
} else {
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt
index 5bb7dd1f..9e86e1a3 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt
@@ -64,7 +64,7 @@ public object DfltTaskExportColumns {
Types.required(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("task_id"),
- ) { Binary.fromString(it.task.id) }
+ ) { Binary.fromString(it.taskInfo.id) }
public val HOST_ID: ExportColumn<TaskTableReader> =
ExportColumn(
@@ -80,17 +80,17 @@ public object DfltTaskExportColumns {
Types.required(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("task_name"),
- ) { Binary.fromString(it.task.name) }
+ ) { Binary.fromString(it.taskInfo.name) }
public val CPU_COUNT: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT32).named("cpu_count"),
- ) { it.task.cpuCount }
+ ) { it.taskInfo.cpuCount }
public val MEM_CAPACITY: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT64).named("mem_capacity"),
- ) { it.task.memCapacity }
+ ) { it.taskInfo.memCapacity }
public val CPU_LIMIT: ExportColumn<TaskTableReader> =
ExportColumn(
@@ -142,6 +142,14 @@ public object DfltTaskExportColumns {
field = Types.optional(INT64).named("boot_time_absolute"),
) { it.bootTimeAbsolute?.toEpochMilli() }
+ public val TASK_STATE: ExportColumn<TaskTableReader> =
+ ExportColumn(
+ field =
+ Types.optional(BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("task_state"),
+ ) { Binary.fromString(it.taskState?.name) }
+
/**
* The columns that are always included in the output file.
*/
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt
index 1e38d5eb..ae7f7a49 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.telemetry.table
+import org.opendc.compute.api.TaskState
import org.opendc.compute.telemetry.export.parquet.DfltTaskExportColumns
import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
@@ -47,7 +48,7 @@ public interface TaskTableReader : Exportable {
/**
* The [TaskInfo] of the task to which the row belongs to.
*/
- public val task: TaskInfo
+ public val taskInfo: TaskInfo
/**
* The [HostInfo] of the host on which the task is hosted or `null` if it has no host.
@@ -103,6 +104,11 @@ public interface TaskTableReader : Exportable {
* The duration (in seconds) of CPU time that was lost due to interference.
*/
public val cpuLostTime: Long
+
+ /**
+ * The state of the task
+ */
+ public val taskState: TaskState?
}
// Loads the default export fields for deserialization whenever this file is loaded.