import {RxStomp} from '@stomp/rx-stomp';
import {filter, map, takeUntil, tap} from 'rxjs/operators';
import {
    VIEWER_WEBSOCKET_MESSAGE,
    ViewerWebsocketMessageType,
    ViewerWebsocketResetEvent,
    ViewerWebsocketUpdateMessage
} from '../../interfaces/ViewerWebsocket';
import {firstValueFrom, Observable, Subject} from 'rxjs';
import {keycloakService} from '@ndw/react-keycloak-authentication';

const STOMP_HEARTBEAT_MS = 30000;

type ConnectionURL = string;

class ViewerWebsocketService {
    private static readonly clients: Record<ConnectionURL, ViewerWebsocketService> = {};
    static getConnectedClient(url: ConnectionURL): ViewerWebsocketService {
        if (!(url in ViewerWebsocketService.clients)) {
            const service = new ViewerWebsocketService(url);
            service.connectToWebsocketServer();

            ViewerWebsocketService.clients[url] = service;
        }
        return ViewerWebsocketService.clients[url];
    }

    private readonly rxStomp: RxStomp;
    private readonly unsubscribeStreams$: Subject<string | null>;
    private readonly unsubscribeTopic$: Subject<boolean | null>;

    private constructor(private readonly connectionString: string) {
        this.rxStomp = new RxStomp();
        this.unsubscribeStreams$ = new Subject<string | null>();
        this.unsubscribeTopic$ = new Subject<boolean | null>();
    }

    private connectToWebsocketServer(): void {
        this.rxStomp.configure({
            brokerURL: this.connectionString,
            heartbeatIncoming: STOMP_HEARTBEAT_MS,
            heartbeatOutgoing: STOMP_HEARTBEAT_MS,
            beforeConnect: async (client) => {
                const token = await firstValueFrom(keycloakService.token());
                client.configure({
                    connectHeaders: {
                        'Authorization': `Bearer ${token}`
                    }
                });
            }
        });
        this.rxStomp.activate();
    }

    stopMessages(streamName: string): void {
        this.unsubscribeStreams$.next(streamName);
    }

    getMessages(streamName: string, timestampOffset: number): Observable<VIEWER_WEBSOCKET_MESSAGE> {
        const offsetInSecondsPrecision = Math.floor(new Date(timestampOffset).getTime() / 1000);

        this.unsubscribeStreams$.next(null);
        return this.rxStomp.watch(`/amq/queue/${streamName}`, {
            'prefetch-count': '1',
            'x-stream-offset': `timestamp=${offsetInSecondsPrecision}`,
            'ack': 'client-individual'
        })
            .pipe(
                takeUntil(this.unsubscribeStreams$.pipe(filter(name => name === streamName))),
                tap(message => message.ack()),
                map(message => JSON.parse(message.body) as VIEWER_WEBSOCKET_MESSAGE),
                filter(message => {
                    if (message.type === ViewerWebsocketMessageType.STREAMING_UPDATE_EVENT) {
                        // @ts-ignore
                        const updateEvent = message as ViewerWebsocketUpdateMessage<unknown>;
                        return updateEvent.timestamp > timestampOffset;
                    } else if (message.type === ViewerWebsocketMessageType.STREAMING_RESET_EVENT) {
                        const resetEvent = message as ViewerWebsocketResetEvent;
                        return resetEvent.timestamp > timestampOffset;
                    }

                    return true;
                })

            );
    }

    stopTopicMessages(): void {
        this.unsubscribeTopic$.next(true);
    }

    getTopicMessages(): Observable<VIEWER_WEBSOCKET_MESSAGE> {
        this.unsubscribeTopic$.next(null);
        return this.rxStomp.watch('/topic/message')
            .pipe(
                takeUntil(this.unsubscribeTopic$.pipe(filter(value => !!value))),
                map(message => JSON.parse(message.body) as VIEWER_WEBSOCKET_MESSAGE)
            );
    }
}

export default ViewerWebsocketService;
