diff options
Diffstat (limited to 'opendc-compute')
2 files changed, 45 insertions, 37 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt index bf8b7825..a4c4209c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/SmartScheduler.kt @@ -48,13 +48,14 @@ public class SmartScheduler() : ComputeScheduler { // You need to specify how much time do you have to make the prediction between receiving a time and putting onto a host return SchedulingResult(SchedulingResultType.EMPTY) } + // Benefits of a digital twin: during operations you make sure what is happening in the real world. // The use-case is making split-second automated decisions before operators can make them. // Make a strong case for making a Digital Twin. override fun removeTask( task: ServiceTask, - host: HostView? + host: HostView?, ) { TODO("Not yet implemented") } -}
\ No newline at end of file +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt index d0be10db..5dfa21c5 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/KafkaComputeMonitor.kt @@ -1,50 +1,57 @@ -package org.opendc.compute.simulator.telemetry +/* + * Copyright (c) 2026 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package org.opendc.compute.simulator.telemetry -import com.fasterxml.jackson.databind.ObjectMapper -import org.opendc.common.utils.Config -import org.opendc.common.utils.Kafka import org.opendc.common.ProtobufMetrics - +import org.opendc.common.utils.Kafka import org.opendc.compute.simulator.telemetry.table.host.HostTableReader /** - * @author Mateusz * This class logs data from the simulator into Kafka. + * The data uses the Protobuf format. + * + * @author Mateusz Kwiatkowski + * + * @see <a href=https://protobuf.dev/getting-started/javatutorial/> + * https://protobuf.dev/getting-started/javatutorial/</a> */ -public class KafkaComputeMonitor: ComputeMonitor { - private val metrics : MonitoringMetrics = MonitoringMetrics() - private val kafka : Kafka? = Config.getKafkaInstance() +public class KafkaComputeMonitor : ComputeMonitor { + private val send = Kafka("postgres_topic").getSend() @Override override fun record(reader: HostTableReader) { - metrics.id += 1 - metrics.timestamp = reader.timestamp.toEpochMilli() - metrics.tasksActive = reader.tasksActive - metrics.clusterName = reader.hostInfo.clusterName - - try{ - val packet = ProtobufMetrics.ProtoExport.newBuilder() - .setId(metrics.id) - .setTasksactive(metrics.tasksActive) - .build() - kafka?.send(packet) - } - - catch(e: Exception){ + try { + val packet = + ProtobufMetrics.ProtoExport.newBuilder() + .setTimestamp(reader.timestamp.toEpochMilli().toString()) + .setHostId(reader.hostInfo.name) + .setTasksactive(reader.tasksActive) + .setCpuutilization(reader.cpuUtilization) + .setEnergyusage(reader.energyUsage) + .build() + this.send(packet) + } catch (e: Exception) { println("${e.message}") } } - -} - -/** - * @author Mateusz - * This serves as editable data class for ObjectMapper(). - */ -public class MonitoringMetrics { - public var id: Int = 0 - public var timestamp: Long = 0 - public var tasksActive : Int = 0 - public var clusterName: String = "" } |
