import {RxStomp} from '@stomp/rx-stomp';
import {filter, map, tap} from 'rxjs/operators';
import {IMessage} from '@stomp/stompjs';
import {
    VIEWER_WEBSOCKET_MESSAGE,
    ViewerWebsocketMessageType,
    ViewerWebsocketResetEvent,
    ViewerWebsocketUpdateMessage
} from '../../interfaces/ViewerWebsocket';
import {Observable} from 'rxjs';
import {keycloakService} from '@ndw/react-keycloak-authentication';

class ViewerWebsocketService {

    rxStomp: RxStomp;

    mapSource: string;
    connectionString: string;

    constructor(connectionString: string, mapSource: string) {
        this.mapSource = mapSource;
        this.connectionString = connectionString;

        this.rxStomp = new RxStomp();

    }

    connectToWebsocketServer(): Promise<void> {
        return new Promise<void>(resolve => {
            keycloakService.token().subscribe(token => {
                this.rxStomp.configure({
                    brokerURL: this.connectionString,
                    connectHeaders: {
                        'Authorization': `Bearer ${token}`
                    }
                });
                this.rxStomp.activate();
                resolve();
            });
        });
    }

    closeWebsocketConnection(): Promise<void> {
        return this.rxStomp.deactivate();
    }

    getMessages(streamName: string, timestampOffset: number): Observable<VIEWER_WEBSOCKET_MESSAGE> {
        const offsetInSecondsPrecision = Math.floor(new Date(timestampOffset).getTime() / 1000);

        return this.rxStomp.watch(`/amq/queue/${streamName}`, {
            'prefetch-count': '1',
            'x-stream-offset': `timestamp=${offsetInSecondsPrecision}`,
            'ack': 'client-individual'
        })
            .pipe(
                tap(message => message.ack()),
                map<IMessage, VIEWER_WEBSOCKET_MESSAGE>((message) => (JSON.parse(message.body) as VIEWER_WEBSOCKET_MESSAGE)),
                filter(message => {
                    if (message.type === ViewerWebsocketMessageType.STREAMING_UPDATE_EVENT) {
                        // @ts-ignore
                        const updateEvent = message as ViewerWebsocketUpdateMessage<any>;
                        return updateEvent.timestamp > timestampOffset;
                    } else if (message.type === ViewerWebsocketMessageType.STREAMING_RESET_EVENT) {
                        const resetEvent = message as ViewerWebsocketResetEvent;
                        return resetEvent.timestamp > timestampOffset;
                    }

                    return true;
                })
            );
    }

    getTopicMessages(): Observable<VIEWER_WEBSOCKET_MESSAGE> {
        return this.rxStomp.watch('/topic/message')
            .pipe(
                map<IMessage, VIEWER_WEBSOCKET_MESSAGE>((message) => (JSON.parse(message.body) as VIEWER_WEBSOCKET_MESSAGE))
            );
    }
}

export default ViewerWebsocketService;
