import {combineEpics, Epic} from 'redux-observable';
import {catchError, filter, ignoreElements, map, mergeMap, tap} from 'rxjs/operators';
import {isActionOf} from 'typesafe-actions';
import {
    viewerWebsocketServiceCloseWebsocketConnection,
    viewerWebsocketServiceCreateWebsocketConnection,
    viewerWebsocketServiceHeartbeatMessage,
    viewerWebsocketServiceMapSourceUpdateMessage,
    viewerWebsocketServiceReceivedWebsocketMessage,
    viewerWebsocketServiceResetEventMessage,
    viewerWebsocketServiceUnhandledMessage
} from '../../services/ViewerWebsocketService/actions';
import ViewerWebsocketService from '../../services/ViewerWebsocketService';
import {ViewerWebsocketMessageType} from '../../interfaces/ViewerWebsocket';
import {
    mapSceneDataDeactivateDataSource,
    mapSceneDataLoadDataSource,
    mapSceneDataUpdateBaseDataSource
} from '../../scenes/MapScene/actions/reducers/data';
import {EMPTY} from 'rxjs';

// When snapshot is received - with timestamp - start websocket connection to receive increments after timestamp
const reconnectWebsocketOnBaseMapSourceReceived: Epic = (action$) => action$
    .pipe(
        filter(isActionOf(mapSceneDataUpdateBaseDataSource)),
        filter(({payload: {dataSource: {websocketReload}}}) => websocketReload),
        map(({
            payload: {
                data: {
                    identifier
                },
                dataSource: {
                    id,
                    websocketUrl,
                    dataStreamerId
                }
            }
        }) => viewerWebsocketServiceCreateWebsocketConnection(id, websocketUrl, identifier, dataStreamerId))
    );

// Stop subscription when datasource is active and new data is loaded, i.e., a reset
const closeWebsocketOnRequestingNewBase: Epic = (action$) => action$
    .pipe(
        filter(isActionOf(mapSceneDataLoadDataSource)),
        filter(({payload: {dataSource: {websocketReload, isActive}}}) => websocketReload && isActive),
        map(({
            payload: {
                dataSource: {
                    id,
                    websocketUrl,
                    dataStreamerId
                }
            }
        }) => viewerWebsocketServiceCloseWebsocketConnection(id, websocketUrl, dataStreamerId))
    );

// Stop subscription
const closeWebsocketConnection: Epic = (action$) => action$.pipe(
    filter(isActionOf(viewerWebsocketServiceCloseWebsocketConnection)),
    tap(({payload: {websocketUrl, dataStreamerId}}) => {
        const client = ViewerWebsocketService.getConnectedClient(websocketUrl);
        if (dataStreamerId) {
            client.stopMessages(dataStreamerId);
        } else {
            client.stopTopicMessages();
        }

    }),
    ignoreElements()
);

const createWebsocketConnection: Epic = (action$) => {
    return action$
        .pipe(
            filter(isActionOf(viewerWebsocketServiceCreateWebsocketConnection)),
            mergeMap(({payload: {mapSource, websocketUrl, lastSnapshotUpdateTimestamp, dataStreamerId}}) => {
                const client = ViewerWebsocketService.getConnectedClient(websocketUrl);

                // Keep temporary support of topic messages
                let stream;
                if(dataStreamerId){
                    if (lastSnapshotUpdateTimestamp == undefined) {
                        return EMPTY;
                    }
                    stream = client.getMessages(dataStreamerId, lastSnapshotUpdateTimestamp);
                } else {
                    stream = client.getTopicMessages();
                }
                return stream
                    .pipe(
                        map((message) => {
                            return viewerWebsocketServiceReceivedWebsocketMessage(mapSource, message);
                        }),
                        catchError(err => {
                            // eslint-disable-next-line no-console
                            console.error(err);
                            return EMPTY;
                        })
                    );
            })
        );
};

// convert generic websocket message to specific message
const processWebsocketMessageOnReceivedWebsocketMessage: Epic = (action$) => action$
    .pipe(
        filter(isActionOf(viewerWebsocketServiceReceivedWebsocketMessage)),
        map(({payload: {mapSource, message}}) => {
            switch (message.type) {
                case ViewerWebsocketMessageType.UPDATE:
                case ViewerWebsocketMessageType.UPDATE_EVENT:
                case ViewerWebsocketMessageType.STREAMING_UPDATE_EVENT:
                    return viewerWebsocketServiceMapSourceUpdateMessage(mapSource, message);
                case ViewerWebsocketMessageType.HEARTBEAT:
                    return viewerWebsocketServiceHeartbeatMessage();
                case ViewerWebsocketMessageType.RESET_EVENT:
                case ViewerWebsocketMessageType.STREAMING_RESET_EVENT:
                    return viewerWebsocketServiceResetEventMessage(mapSource);
                default:
                    return viewerWebsocketServiceUnhandledMessage();
            }
        })
    );

// close websocket when datasource is deactivated - no more need for updates and resets
const closeWebsocketConnectionOnDeactivateMapSourceWithWebsocket: Epic = (action$) => action$
    .pipe(
        filter(isActionOf(mapSceneDataDeactivateDataSource)),
        filter(({payload: {dataSource: {websocketReload}}}) => websocketReload),
        map(({payload: {dataSource: {id, websocketUrl, dataStreamerId}}}) =>
            viewerWebsocketServiceCloseWebsocketConnection(id, websocketUrl, dataStreamerId))
    );

const viewerWebsocketServiceEpics = combineEpics(
    createWebsocketConnection,
    closeWebsocketConnectionOnDeactivateMapSourceWithWebsocket,
    processWebsocketMessageOnReceivedWebsocketMessage,
    reconnectWebsocketOnBaseMapSourceReceived,
    closeWebsocketOnRequestingNewBase,
    closeWebsocketConnection
);

export default viewerWebsocketServiceEpics;
