import { HttpClient, HttpParams } from '@angular/common/http';
import { Injectable, OnDestroy } from '@angular/core';
import { environment } from 'src/environments/environment';
import { AuthService } from '../auth-service/auth.service';
import { AzureApiBase } from '../azure-api/azure-api-base';
import { ProcessSession } from './process-session';
import { ProcessStartRequest } from './process-start-request';
import { ProcessStatus } from './process-status';
import { ProcessUpdate } from './process-update';
import { Process } from './process';
import { Observable, ReplaySubject } from 'rxjs';
import * as signalR from '@microsoft/signalr';
import { HubConnectionState } from '@microsoft/signalr';

@Injectable({
  providedIn: 'root'
})
export class ProcessingService extends AzureApiBase<ProcessSession> implements OnDestroy {

  constructor(http: HttpClient, authService: AuthService) {
    super(http, authService, environment.apiBaseUrl, '/process');
  }

  private _processes$ = new ReplaySubject<Process<unknown>>(1);

  /** Observable that will emit when when a process is added or removed  */
  public processes$ = this._processes$.asObservable();

  /** Collection of current processes */
  public processes: Process<unknown>[] = [];

  private hubConnection: signalR.HubConnection;
  public startConnection(): Promise<void> {
    if (this.hubConnection == null) {
      this.hubConnection = new signalR.HubConnectionBuilder()
        .withUrl(`${environment.serverEventsEndpoint}/processStatus`, {
          accessTokenFactory: () => { return this.authService.IdToken; },
          transport: signalR.HttpTransportType.WebSockets
        })
        .build();
    }

    if (this.hubConnection.state == HubConnectionState.Disconnected || this.hubConnection.state == HubConnectionState.Disconnecting) {
      try {
        return this.hubConnection
          .start();
      } catch (err) {
        console.log('Error while starting signalr connection: ' + err);
      }
    }
  }

  /** Add a new process and start it */
  async addNewProcess<T>(process: Process<T>): Promise<void> {
    this.processes.push(process);
    this._processes$.next(process);

    const request: ProcessStartRequest = {
      message: process.message,
      queue: process.queue,
      expanded: process.expanded || false
    };
    
    await this.startConnection();

    this.startProcess(request).subscribe((session) => {
      process.session = session;
      this.hubConnection.stream('GetStatusUpdates', session.statusSessionId, 'facesheet-export').subscribe({
        next: (status) => {
          process.status = status;
        },
        error: (error) => {
          this.closeConnection();
          process.status.isFailed = true;
          console.error(error);
        },
        complete: () => {
          this.closeConnection();

          if (process.onComplete != null) {
            process.onComplete();
          }
        }
      });
    });
  }

  /** Remove a process. Will cause a cancel event if the process has not completed. */
  removeProcess<T>(process: Process<T>): void {
    const index = this.processes.findIndex(x => x == process);
    this.processes.splice(index, 1);

    this._processes$.next(null);

    if (!process.status?.isComplete && !process.status?.isFailed) {
      if (process.onCancel != null) {
        process.onCancel();
      }

      this.updateProcess({
        isCancel: true,
        message: null,
        queue: process.queue,
        sessionId: process.session.processSessionId
      }).subscribe();
    }
  }

  public updateProcess(request: ProcessUpdate): Observable<void> {
    return this.post('/processUpdate', request);
  }

  private startProcess(request: ProcessStartRequest): Observable<ProcessSession> {
    return this.post('/processStart', request);
  }

  private getProcessStatus(session: ProcessSession, queue: string): Observable<ProcessStatus> {
    const params = new HttpParams().set('sessionId', session.statusSessionId).append('queue', queue);

    return this.get<ProcessStatus>('/processStatus', params);
  }

  private closeConnection(): void {
    const hasAnyRunningProcess = this.processes.some(x => !x.status.isCancelled && !x.status.isFailed && !x.status.isComplete);
    if (this.hubConnection && !hasAnyRunningProcess) {
      this.hubConnection.stop();
      this.hubConnection = undefined;
    }
  }

  ngOnDestroy(): void {
    this.closeConnection();
  }
}
