1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
/*
* Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.opendc.compute.failure.models
import kotlinx.coroutines.delay
import org.opendc.compute.simulator.service.ComputeService
import org.opendc.trace.Trace
import org.opendc.trace.conv.FAILURE_DURATION
import org.opendc.trace.conv.FAILURE_INTENSITY
import org.opendc.trace.conv.FAILURE_INTERVAL
import org.opendc.trace.conv.TABLE_FAILURES
import java.io.File
import java.time.InstantSource
import java.util.random.RandomGenerator
import kotlin.coroutines.CoroutineContext
import kotlin.time.times
/**
* A definition of a Failure
*
* @property failureInterval The time between this and the previous failure in ms
* @property failureDuration The Duration of the failure in ms
* @property failureIntensity The ratio of hosts affected by the failure
* @constructor Create empty Failure
*/
public data class Failure(
val failureInterval: Long,
val failureDuration: Long,
val failureIntensity: Double,
) {
init {
require(failureInterval >= 0.0) { "A failure cannot start at a negative time" }
require(failureDuration >= 0.0) { "A failure can not have a duration of 0 or less" }
require(failureIntensity > 0.0 && failureIntensity <= 1.0) { "The intensity of a failure has to be in the range (0.0, 1.0]" }
}
}
/**
* A [FailureModel] based on a provided parquet file
* The file provides a list of [Failure] objects
*
*
* @param context
* @param clock
* @param service
* @param random
* @param pathToTrace The path to the parquet file as a [String]
*/
public class TraceBasedFailureModel(
context: CoroutineContext,
clock: InstantSource,
service: ComputeService,
random: RandomGenerator,
pathToTrace: String,
startPoint: Double,
private val repeat: Boolean = true,
) : FailureModel(context, clock, service, random) {
private val failureList = loadTrace(pathToTrace, startPoint)
override suspend fun runInjector() {
do {
for (failure in failureList) {
delay(failure.failureInterval)
val victims = victimSelector.select(hosts, failure.failureIntensity)
fault.apply(victims, failure.failureDuration)
}
} while (repeat)
}
/**
* Load a list [Failure] objects from the provided [pathToFile]
*
* @param pathToFile
*/
private fun loadTrace(
pathToFile: String,
startPoint: Double,
): List<Failure> {
val trace = Trace.open(File(pathToFile), "failure")
val reader = checkNotNull(trace.getTable(TABLE_FAILURES)).newReader()
val failureStartTimeCol = reader.resolve(FAILURE_INTERVAL)
val failureDurationCol = reader.resolve(FAILURE_DURATION)
val failureIntensityCol = reader.resolve(FAILURE_INTENSITY)
val entries = mutableListOf<Failure>()
try {
while (reader.nextRow()) {
val failureStartTime = reader.getLong(failureStartTimeCol)
val failureDuration = reader.getLong(failureDurationCol)
val failureIntensity = reader.getDouble(failureIntensityCol)
entries.add(Failure(failureStartTime, failureDuration, failureIntensity))
}
val startIndex: Int = (entries.size * startPoint).toInt()
return entries.subList(startIndex, entries.size) + entries.subList(0, startIndex)
} catch (e: Exception) {
e.printStackTrace()
throw e
} finally {
reader.close()
}
}
}
|