diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-09-12 15:32:47 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-09-12 15:32:47 +0200 |
| commit | 5047e4a25a0814f96852882f02c4017e1d5f81e7 (patch) | |
| tree | 348f064fd8e03a2a64fc5b30406e992586b4aac0 /opendc-compute/opendc-compute-telemetry/src | |
| parent | ad8051faa1f0a6e7f78384e9e0607e847848c033 (diff) | |
Added max number of failures (#254)
* Added a max failure for tasks. If tasks fail more times, they get cancelled
* Added maxNumFailures to the frontend
* Updated tests
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry/src')
3 files changed, 35 insertions, 14 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt index 5bd237fd..a98a1a6d 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt @@ -30,6 +30,7 @@ import mu.KotlinLogging import org.opendc.common.Dispatcher import org.opendc.common.asCoroutineDispatcher import org.opendc.compute.api.Task +import org.opendc.compute.api.TaskState import org.opendc.compute.carbon.CarbonTrace import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host @@ -422,11 +423,11 @@ public class ComputeMetricReader( */ private class TaskTableReaderImpl( private val service: ComputeService, - task: Task, + private val task: Task, private val startTime: Duration = Duration.ofMillis(0), ) : TaskTableReader { override fun copy(): TaskTableReader { - val newTaskTable = TaskTableReaderImpl(service, _task) + val newTaskTable = TaskTableReaderImpl(service, task) newTaskTable.setValues(this) return newTaskTable @@ -448,14 +449,14 @@ public class ComputeMetricReader( _provisionTime = table.provisionTime _bootTime = table.bootTime _bootTimeAbsolute = table.bootTimeAbsolute - } - private val _task = task + _taskState = table.taskState + } /** * The static information about this task. */ - override val task = + override val taskInfo = TaskInfo( task.uid.toString(), task.name, @@ -527,18 +528,22 @@ public class ComputeMetricReader( get() = _bootTimeAbsolute private var _bootTimeAbsolute: Instant? = null + override val taskState: TaskState? + get() = _taskState + private var _taskState: TaskState? = null + /** * Record the next cycle. */ fun record(now: Instant) { - val newHost = service.lookupHost(_task) + val newHost = service.lookupHost(task) if (newHost != null && newHost.uid != _host?.uid) { _host = newHost host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) } - val cpuStats = _host?.getCpuStats(_task) - val sysStats = _host?.getSystemStats(_task) + val cpuStats = _host?.getCpuStats(task) + val sysStats = _host?.getSystemStats(task) _timestamp = now _timestampAbsolute = now + startTime @@ -550,9 +555,11 @@ public class ComputeMetricReader( _cpuLostTime = cpuStats?.lostTime ?: 0 _uptime = sysStats?.uptime?.toMillis() ?: 0 _downtime = sysStats?.downtime?.toMillis() ?: 0 - _provisionTime = _task.launchedAt + _provisionTime = task.launchedAt _bootTime = sysStats?.bootTime + _taskState = task.state + if (sysStats != null) { _bootTimeAbsolute = sysStats.bootTime + startTime } else { diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt index 5bb7dd1f..9e86e1a3 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/DfltTaskExportColumns.kt @@ -64,7 +64,7 @@ public object DfltTaskExportColumns { Types.required(BINARY) .`as`(LogicalTypeAnnotation.stringType()) .named("task_id"), - ) { Binary.fromString(it.task.id) } + ) { Binary.fromString(it.taskInfo.id) } public val HOST_ID: ExportColumn<TaskTableReader> = ExportColumn( @@ -80,17 +80,17 @@ public object DfltTaskExportColumns { Types.required(BINARY) .`as`(LogicalTypeAnnotation.stringType()) .named("task_name"), - ) { Binary.fromString(it.task.name) } + ) { Binary.fromString(it.taskInfo.name) } public val CPU_COUNT: ExportColumn<TaskTableReader> = ExportColumn( field = Types.required(INT32).named("cpu_count"), - ) { it.task.cpuCount } + ) { it.taskInfo.cpuCount } public val MEM_CAPACITY: ExportColumn<TaskTableReader> = ExportColumn( field = Types.required(INT64).named("mem_capacity"), - ) { it.task.memCapacity } + ) { it.taskInfo.memCapacity } public val CPU_LIMIT: ExportColumn<TaskTableReader> = ExportColumn( @@ -142,6 +142,14 @@ public object DfltTaskExportColumns { field = Types.optional(INT64).named("boot_time_absolute"), ) { it.bootTimeAbsolute?.toEpochMilli() } + public val TASK_STATE: ExportColumn<TaskTableReader> = + ExportColumn( + field = + Types.optional(BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("task_state"), + ) { Binary.fromString(it.taskState?.name) } + /** * The columns that are always included in the output file. */ diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt index 1e38d5eb..ae7f7a49 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/TaskTableReader.kt @@ -22,6 +22,7 @@ package org.opendc.compute.telemetry.table +import org.opendc.compute.api.TaskState import org.opendc.compute.telemetry.export.parquet.DfltTaskExportColumns import org.opendc.trace.util.parquet.exporter.Exportable import java.time.Instant @@ -47,7 +48,7 @@ public interface TaskTableReader : Exportable { /** * The [TaskInfo] of the task to which the row belongs to. */ - public val task: TaskInfo + public val taskInfo: TaskInfo /** * The [HostInfo] of the host on which the task is hosted or `null` if it has no host. @@ -103,6 +104,11 @@ public interface TaskTableReader : Exportable { * The duration (in seconds) of CPU time that was lost due to interference. */ public val cpuLostTime: Long + + /** + * The state of the task + */ + public val taskState: TaskState? } // Loads the default export fields for deserialization whenever this file is loaded. |
