From 2f16cb0f48eca4453e3e894b3d45a3aa09e6dcc0 Mon Sep 17 00:00:00 2001 From: mjkwiatkowski Date: Mon, 16 Feb 2026 15:18:21 +0100 Subject: feat: opendc -> kafka -> postgresql works; added protobuf encoding --- opendc-web/opendc-web-ui/src/redux/sagas/index.js | 7 -- .../opendc-web-ui/src/redux/sagas/topology.js | 76 ---------------------- 2 files changed, 83 deletions(-) delete mode 100644 opendc-web/opendc-web-ui/src/redux/sagas/index.js delete mode 100644 opendc-web/opendc-web-ui/src/redux/sagas/topology.js (limited to 'opendc-web/opendc-web-ui/src/redux/sagas') diff --git a/opendc-web/opendc-web-ui/src/redux/sagas/index.js b/opendc-web/opendc-web-ui/src/redux/sagas/index.js deleted file mode 100644 index 0fabdb6d..00000000 --- a/opendc-web/opendc-web-ui/src/redux/sagas/index.js +++ /dev/null @@ -1,7 +0,0 @@ -import { fork } from 'redux-saga/effects' -import { watchServer, updateServer } from './topology' - -export default function* rootSaga() { - yield fork(watchServer) - yield fork(updateServer) -} diff --git a/opendc-web/opendc-web-ui/src/redux/sagas/topology.js b/opendc-web/opendc-web-ui/src/redux/sagas/topology.js deleted file mode 100644 index 15147bcf..00000000 --- a/opendc-web/opendc-web-ui/src/redux/sagas/topology.js +++ /dev/null @@ -1,76 +0,0 @@ -import { QueryObserver, MutationObserver } from 'react-query' -import { normalize, denormalize } from 'normalizr' -import { select, put, take, race, getContext, call } from 'redux-saga/effects' -import { eventChannel } from 'redux-saga' -import { Topology } from '../../util/topology-schema' -import { storeTopology, OPEN_TOPOLOGY } from '../actions/topology' - -/** - * Update the topology on the server. - */ -export function* updateServer() { - const queryClient = yield getContext('queryClient') - const mutationObserver = new MutationObserver(queryClient, { mutationKey: 'updateTopology' }) - - while (true) { - yield take( - (action) => - action.type.startsWith('EDIT') || action.type.startsWith('ADD') || action.type.startsWith('DELETE') - ) - const topology = yield select((state) => state.topology) - - if (!topology.root) { - continue - } - - const denormalizedTopology = denormalize(topology.root, Topology, topology) - yield call([mutationObserver, mutationObserver.mutate], denormalizedTopology) - } -} - -/** - * Watch the topology on the server for changes. - */ -export function* watchServer() { - let { projectId, id } = yield take(OPEN_TOPOLOGY) - while (true) { - const channel = yield queryObserver(projectId, id) - - while (true) { - const [action, response] = yield race([take(OPEN_TOPOLOGY), take(channel)]) - - if (action) { - projectId = action.projectId - id = action.id - break - } - - const { isFetched, data } = response - // Only update the topology on the client-side when a new topology was fetched - if (isFetched) { - const { result: topologyId, entities } = normalize(data, Topology) - yield put(storeTopology(entities.topologies[topologyId], entities)) - } - } - } -} - -/** - * Observe changes for the topology with the specified identifier. - */ -function* queryObserver(projectId, id) { - const queryClient = yield getContext('queryClient') - const observer = new QueryObserver(queryClient, { queryKey: ['topologies', projectId, id] }) - - return eventChannel((emitter) => { - const unsubscribe = observer.subscribe((result) => { - emitter(result) - }) - - // Update result to make sure we did not miss any query updates - // between creating the observer and subscribing to it. - observer.updateResult() - - return unsubscribe - }) -} -- cgit v1.2.3