import { Inject, Injectable, NgZone } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable, BehaviorSubject, EMPTY, firstValueFrom } from 'rxjs';
import { catchError, tap, switchAll, filter, map } from 'rxjs/operators';
import { LiveDataDto, LiveMessageDto } from '@ledsreact/data-models';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class LiveResultsService {
  private _logger: (msg: string, context: unknown, sendToRum: boolean) => void = console.log;
  private socket$: WebSocketSubject<LiveMessageDto>;
  private messagesSubject$ = new BehaviorSubject<Observable<LiveMessageDto>>(EMPTY);
  private pingInterval: NodeJS.Timeout;
  private pingIntervalTime = 30000; // every 30 seconds
  private readonly _isConnectedSubject$ = new BehaviorSubject<boolean>(false);
  private readonly _subscribeTimeoutByUuid = new Map<string, NodeJS.Timeout>();
  private readonly MAX_RETRIES = 999999; // TODO handle this better

  public liveMessages$ = this.messagesSubject$.pipe(
    // This approach is done so that the stream is not closed when the websocket is reconnected
    switchAll(),
    catchError((e) => {
      throw e;
    })
  );

  public isConnected$ = this._isConnectedSubject$.asObservable();

  constructor(
    protected http: HttpClient,
    @Inject('baseUrl') protected baseUrl: string,
    @Inject('baseUrl$') protected baseUrl$: Observable<string>,
    private ngZone: NgZone
  ) {
    this.baseUrl$.subscribe((baseUrl) => {
      this.baseUrl = baseUrl;
    });
  }

  // TODO provide LoggerService as dependency
  registerLogger(logger: (msg: string, context: unknown, sendToRum: boolean) => void): void {
    this._logger = logger || console.log;
  }

  getConnectedProfiles(): Promise<{ [profileId: number]: string[] }> {
    return firstValueFrom(
      this.http.get<{ [profileId: number]: string[] }>(`${this.baseUrl}/live-results/connected-profiles`)
    );
  }

  isConnected(): boolean {
    return this.socket$ && !this.socket$.closed;
  }

  connect(token: string, url: string, uuid?: string, retries = 0, profileId: number | null = null): void {
    if (!this.socket$ || this.socket$.closed) {
      this.socket$ = this.getNewWebSocket(token, url, uuid, retries, profileId);
      if (uuid) {
        if (this._subscribeTimeoutByUuid.has(uuid)) {
          clearTimeout(this._subscribeTimeoutByUuid.get(uuid));
        }
        this._subscribeTimeoutByUuid.set(
          uuid,
          setTimeout(() => {
            // wait for the connection to be opened
            this.subscribe(uuid);
            clearTimeout(this._subscribeTimeoutByUuid.get(uuid));
          }, 3000)
        );
      }
      const messages = this.socket$.pipe(
        // ignore errors in this stream (= must be handled in the closeObserver)
        tap({
          error: (error) => this._logger('live-results websocket error', error, false),
        }),
        catchError((_) => EMPTY)
      );
      this.messagesSubject$.next(messages);

      // Start ping interval
      this.startPingInterval();
    }
  }

  async reconnect(
    token: string,
    url: string,
    uuid?: string,
    retries = 0,
    profileId: number | null = null
  ): Promise<void> {
    this._logger(`🚀 Reconnecting WS connection for LPRO live-results`, { uuid, retries }, false);
    this.closeConnection();
    this.connect(token, url, uuid, retries, profileId);
    await new Promise((resolve) => setTimeout(resolve, 3000)); // wait at least 3 seconds before reconnecting
  }

  subscribe(uuid?: string): void {
    this._logger(`Subscribing to LPRO live-results for uuid ${uuid}`, null, false);
    this.sendMessage({ event: 'subscribe', uuid });
  }

  unsubscribe(uuid?: string): void {
    this._logger(`Unsubscribing from LPRO live-results for uuid ${uuid}`, null, false);
    this.sendMessage({ event: 'unsubscribe', uuid });
  }

  closeConnection(): void {
    try {
      this._logger('🚪 Closing live-results websocket connection', null, true);
      if (this.socket$) {
        if (this.isConnected()) {
          this.unsubscribe();
        }
        this.socket$.complete();
        // the above method will update _isConnectedSubject via the closeObserver
        this.stopPingInterval();
        this.socket$ = null;
      }
    } catch (e) {
      // ignore
    }
  }

  sendMessage(msg: LiveMessageDto, retry = false): void {
    if (this.socket$) {
      this.socket$.next(msg);
    } else {
      this._logger('No live-results websocket connection to send message to', null, false);
      if (!retry) {
        // Try again in 3 seconds
        setTimeout(() => {
          this.sendMessage(msg, true);
        }, 3000);
      }
    }
  }

  getLiveData$(): Observable<LiveDataDto> {
    return this.liveMessages$.pipe(
      tap((r) => this._logger('RECEIVED LIVE RESULTS', r, false)),
      filter((msg) => (msg.event === 'lr' || msg.event === 'error') && !!msg.data),
      map((msg: LiveMessageDto) => {
        if (msg.event === 'lr') {
          return {
            status: 'received',
            ...msg.data,
          };
        } else {
          return msg.data;
        }
      })
    ) as Observable<LiveDataDto>;
  }

  private getNewWebSocket(
    token: string,
    url: string,
    uuid?: string,
    retries = 0,
    profileId: number | null = null
  ): WebSocketSubject<LiveMessageDto> {
    let webSocketUrl = url + '?t=' + btoa(token);
    if (profileId) {
      webSocketUrl += `&profile=${profileId}`;
    }
    return webSocket({
      url: webSocketUrl,
      openObserver: {
        next: () => {
          this._logger('🚀 WS connection opened for LPRO live-results', { uuid }, true);
          this._isConnectedSubject$.next(true);
        },
      },
      closeObserver: {
        next: async (closeEvent: CloseEvent) => {
          this._isConnectedSubject$.next(false);

          if (retries === 0) {
            // avoid spamming the logs with closeEvent
            this._logger(
              `🚀 WS connection closed for LPRO live-results`,
              { uuid, code: closeEvent?.code, retries },
              true
            );
            this._logger(`closeEvent`, closeEvent, false);
          }

          if (closeEvent?.code === 1006) {
            // Abnormal closure
            if (++retries >= this.MAX_RETRIES) {
              this._logger(
                `🚀 Max live-results retries reached, not reconnecting`,
                { uuid, code: closeEvent?.code, retries },
                true
              );
            } else {
              if (retries > 1) {
                // wait a bit before reconnecting
                await new Promise((resolve) => setTimeout(resolve, 3000));
              } else {
                this._logger(`🚀 Reconnecting WS connection for LPRO live-results`, { uuid, retries }, false);
              }
              await this.reconnect(token, url, uuid, ++retries);
            }
          } else {
            this._logger(
              `🚪 WS connection closed for LPRO live-results`,
              { uuid, code: closeEvent?.code, retries },
              false
            );
            this.closeConnection();
          }
        },
      },
    });
  }

  private startPingInterval(): void {
    this.ngZone.runOutsideAngular(() => {
      this.pingInterval = setInterval(() => {
        this.sendPing();
      }, this.pingIntervalTime);
    });
  }

  private stopPingInterval(): void {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  private sendPing(): void {
    if (this.isConnected()) {
      this.sendMessage({ event: 'ping' });
    }
  }
}
