1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import { QueryObserver, MutationObserver } from 'react-query'
import { normalize, denormalize } from 'normalizr'
import { select, put, take, race, getContext, call } from 'redux-saga/effects'
import { eventChannel } from 'redux-saga'
import { Topology } from '../../util/topology-schema'
import { storeTopology, OPEN_TOPOLOGY } from '../actions/topology'
/**
* Update the topology on the server.
*/
export function* updateServer() {
const queryClient = yield getContext('queryClient')
const mutationObserver = new MutationObserver(queryClient, { mutationKey: 'updateTopology' })
while (true) {
yield take(
(action) =>
action.type.startsWith('EDIT') || action.type.startsWith('ADD') || action.type.startsWith('DELETE')
)
const topology = yield select((state) => state.topology)
if (!topology.root) {
continue
}
const denormalizedTopology = denormalize(topology.root, Topology, topology)
yield call([mutationObserver, mutationObserver.mutate], denormalizedTopology)
}
}
/**
* Watch the topology on the server for changes.
*/
export function* watchServer() {
let { id } = yield take(OPEN_TOPOLOGY)
while (true) {
const channel = yield queryObserver(id)
while (true) {
const [action, response] = yield race([take(OPEN_TOPOLOGY), take(channel)])
if (action) {
id = action.id
break
}
const { isFetched, data } = response
// Only update the topology on the client-side when a new topology was fetched
if (isFetched) {
const { result: topologyId, entities } = normalize(data, Topology)
yield put(storeTopology(entities.topologies[topologyId], entities))
}
}
}
}
/**
* Observe changes for the topology with the specified identifier.
*/
function* queryObserver(id) {
const queryClient = yield getContext('queryClient')
const observer = new QueryObserver(queryClient, { queryKey: ['topologies', id] })
return eventChannel((emitter) => {
const unsubscribe = observer.subscribe((result) => {
emitter(result)
})
// Update result to make sure we did not miss any query updates
// between creating the observer and subscribing to it.
observer.updateResult()
return unsubscribe
})
}
|