import {
    RSocketClient,
    MESSAGE_RSOCKET_ROUTING,
    APPLICATION_JSON
} from 'rsocket-core';
import RSocketWebsocketClient from 'rsocket-websocket-client';
import {AbstractWebSocket} from "./AbstractWebSocket";
import {Flowable} from "rsocket-flowable";

export class RSocketWebSocket extends AbstractWebSocket {
    constructor() {
        super();
        this.url = null;
        this.client = null;
        this.rsocket = null;
    }

    async connect(url) {
        this.url = url;

        const transport = new RSocketWebsocketClient({
            url,
            debug: true
        })

        const client = new RSocketClient({
            setup: {
                keepAlive: 60000,
                lifetime: 180000,
                dataMimeType: APPLICATION_JSON.string,
                metadataMimeType: MESSAGE_RSOCKET_ROUTING.string
            },
            transport
        })

        this.client = client;

        return new Promise((resolve, reject) => {
            client.connect().subscribe({
                onComplete: (socket) => {
                    this.rsocket = socket;
                    resolve()
                },
                onError: (error) => {
                    reject(error);
                }
            });
        })
    }

    on(
        {
            route,
            data = null,
            onMessage,
            type,
            onError = () => {
            },
            onComplete = () => {
            }
        }
    ) {
        let subscription;

        const stream = new Flowable((subscriber) => {
            subscriber.onSubscribe({
                cancel: () => {
                    /* no-op */
                },
                request: () => {
                    const metadata = String.fromCharCode(route.length) + route;

                    const payload = {
                        data: JSON.stringify(data),
                        metadata
                    }

                    subscriber.onNext(payload);
                }
            })
        })

        this.rsocket.requestChannel(stream).subscribe({
            onSubscribe: (sub) => {
                subscription = sub;
                subscription.request(0x7fffffff);
            },
            onNext: (msg) => {
                onMessage({...msg, type});
            },
            onComplete: () => {
                onComplete();
            },
            onError: (e) => {
                onError(e);
            }
        });
    }

    onLot(
        auctionId,
        onMessage,
        onError = () => {
        },
        onComplete = () => {
        }
    ) {
        this.on({
            route: 'api.v1.lot',
            data: auctionId,
            type: "lot",
            onMessage,
            onError,
            onComplete
        })
    }

    onBid(
        auctionId,
        onMessage,
        onError = () => {
        },
        onComplete = () => {
        }
    ) {
        this.on({
            route: 'api.v1.bid',
            data: auctionId,
            type: "bid",
            onMessage,
            onError,
            onComplete
        })
    }

    onNotification(
        userId,
        onMessage,
        onError = () => {
        },
        onComplete = () => {
        }
    ) {
        this.on({
            route: 'api.v1.notification',
            data: userId,
            type: "notification",
            onMessage,
            onError,
            onComplete
        })
    }

    disconnect(callback = () => {
    }) {
        if (this.rsocket) {
            this.client.close();
            callback();
        }
    }
}