import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { ReplaySubject, Subject } from 'rxjs';

import { Centrifuge } from 'centrifuge';
import { BehaviorSubject, Observable } from 'rxjs';
import { filter, map, take, tap } from 'rxjs/operators';
import { Subscription } from 'centrifuge/build/subscription';
import { environment } from '@env/environment';

interface CentrifugoConfig {
  token: string;
}

@Injectable({providedIn: 'root'})
export class CentrifugoService {
  private channels$: { [key: string]: Subject<any> } = {};
  private subscriptions: { [key: string]: Subscription } = {};
  private centrifuge: Centrifuge;
  private initializeEngineSubject: ReplaySubject<boolean>;
  private connectedSubject = new BehaviorSubject<boolean>(false);

  get connected$(): Observable<boolean> {
    return this.connectedSubject;
  }

  constructor(
    private httpClient: HttpClient,
  ) { }

  listen<T = any>(channel): Observable<T> {
    if (channel in this.channels$) {
      return this.channels$[channel];
    }

    this.channels$[channel] = new Subject<T>();
    this.connect().subscribe(connected => {
      this.subscriptions[channel] = this.addEngineSubscription(channel);
    });

    return this.channels$[channel];
  }

  unsubscribe(channel): void {
    if (this.subscriptions[channel]) {
      this.subscriptions[channel].unsubscribe();
      this.centrifuge.removeSubscription(this.subscriptions[channel]);
      delete this.subscriptions[channel];
    }
    if (this.channels$[channel]) {
      this.channels$[channel].complete();
      delete this.channels$[channel];
    }
  }

  connect(isWidget = false): Observable<boolean> {
    if (!this.initializeEngineSubject) {
      this.initializeEngineSubject = new ReplaySubject(1);

      this.httpClient.get<CentrifugoConfig>(`/api/v1/centrifugo_v2/`)
        .subscribe(config => {
          this.initializeEngine({
            url: environment.centrifugoUrl,
            token: config.token,
            transports: ['websocket']
          }, isWidget);
          this.centrifuge.on('connected', () => {
            this.initializeEngineSubject.next(true);
            this.connectedSubject.next(true);
          });
          this.centrifuge.on('disconnected', () => {
            this.connectedSubject.next(false);
            this.initializeEngineSubject.next(false);
          });
          this.centrifuge.connect();
        });
    }

    return this.initializeEngineSubject.pipe(filter(item => Boolean(item)), take(1));
  }

  publish(channel: string, message: any): Observable<any> {
    return this.connect().pipe(
      tap(() => this.centrifuge.publish(channel, message))
    );
  }

  disconnect(): void {
    this.centrifuge.disconnect();
    this.initializeEngineSubject = undefined;
    for (const channel of Object.keys(this.channels$)) {
      this.unsubscribe(channel);
    }
    this.channels$ = {};
    this.subscriptions = {};
  }

  private initializeEngine(config: {
    url: string,
    token: string,
    transports: string[]
  }, isWidget): void {
    this.centrifuge = new Centrifuge(config.url, {
      token: config.token,
      debug: true,
      maxServerPingDelay: 3000,
      timeout: 1000,
      ...(isWidget ? {
        maxReconnectDelay: 3000,
      } : {}),
    });
    // @ts-ignore
    this.centrifuge._debugEnabled = false;
  }

  private addEngineSubscription(channel: string): Subscription {
    let sub = this.centrifuge.getSubscription(channel);
    if (!sub) {
      sub = this.centrifuge.newSubscription(channel);
      sub.on('publication', ctx => {
        this.resolveMessage(channel, ctx.data);
      });
    }
    sub.subscribe();
    return sub;
  }

  private resolveMessage(channel: string, message: any): void {
    if (!this.channels$[channel]) {
      return;
    }

    this.channels$[channel].next(message);
  }
}
