import { BehaviorSubject, EMPTY, Observable, ObservableInput, Subject, defer, merge, of, throwError, timer } from "rxjs";
import { catchError, concatMap, filter, finalize, mergeMap, takeUntil, tap } from "rxjs/operators";
import { afterSubscribe } from "../../rxjs/operators/after-subscribe";
import { bufferUntil } from "../../rxjs/operators/buffer-until";
import { IQueuerParams } from "./iqueuer-params";
import { NumberHelper } from "../../utils/helpers/numberHelper";
import { ArrayHelper } from "../../utils/helpers/arrayHelper";

interface IExecutionParams<T, P> {
	params: P;
	subjects?: IExecutionSubjects<T>;
}

interface IExecutionSubjects<T> {
	responseSubject: Subject<T>;
	errorSubject: Subject<any>;
	completeSubject: Subject<void>;
}

interface IQueueParams<P extends any[], T> extends Omit<IQueuerParams<P, T>, "idBuilder"> { }

class Queue<T, P extends any[] = []> {
	//#region FIELDS

	private moQueueSubject?: Subject<IExecutionParams<T, P>>;
	private moActionRunningCounterSubject?: BehaviorSubject<number>;
	private moExecSubject?: Subject<IExecutionParams<T, P>>;

	//#endregion

	//#region METHODS

	constructor(private readonly moParams: IQueueParams<P, T>) { }

	/** Débute la file. */
	public start(): Observable<T> {
		this.initSubjects();

		this.moQueueSubject.asObservable()
			.pipe(
				filter((poValue: IExecutionParams<T, P>) => {
					const lbFilter: boolean = !this.moParams.excludePendings || this.moActionRunningCounterSubject.value === 0;

					if (!lbFilter)
						poValue.subjects?.completeSubject.next(undefined);

					return lbFilter;
				}),
				bufferUntil(() => this.moActionRunningCounterSubject.asObservable().pipe(filter((pnRunningCounter: number) => pnRunningCounter === 0))),
				tap((paValues: IExecutionParams<T, P>[]) => {
					if (this.moParams.keepOnlyLastPending) {
						const loExecutedValue: IExecutionParams<T, P> = paValues.pop();
						paValues.forEach((poValue: IExecutionParams<T, P>) => poValue.subjects?.completeSubject.next(undefined));

						this.sendExecRequest(loExecutedValue);
					}
					else if (this.moParams.paramsReducer) {
						this.sendExecRequest(
							paValues.reduce((poAccumulator: IExecutionParams<T, P>, poActual: IExecutionParams<T, P>) => {
								if (poAccumulator.subjects)
									poAccumulator.subjects.completeSubject.next(undefined); // On termine le flux précédent car il ne sera pas executé.

								return { params: this.moParams.paramsReducer(poAccumulator.params, poActual.params), subjects: poActual.subjects };
							})
						);
					}
					else
						paValues.forEach((poParams: IExecutionParams<T, P>) => this.sendExecRequest(poParams));
				})
			)
			.subscribe();

		return this.execQueue();
	}

	private initSubjects(): void {
		this.moQueueSubject = new Subject<IExecutionParams<T, P>>();
		this.moActionRunningCounterSubject = new BehaviorSubject<number>(0);
		this.moExecSubject = new Subject<IExecutionParams<T, P>>();
	}

	private sendExecRequest(poParams: IExecutionParams<T, P>): void {
		this.moExecSubject.next(poParams);
	}

	private execQueue(): Observable<T> {
		return this.moExecSubject.asObservable()
			.pipe(
				tap(() => this.moActionRunningCounterSubject.next(this.moActionRunningCounterSubject.value + 1)),
				concatMap((poParams: IExecutionParams<T, P>) =>
					defer(() => this.moParams.thingToQueue(...poParams.params)).pipe(
						tap(
							(poResponse: T) => poParams.subjects?.responseSubject.next(poResponse),
							(poError: any) => poParams.subjects?.errorSubject.next(poError),
							() => poParams.subjects?.completeSubject.next(undefined)
						),
						catchError(() => EMPTY), // L'erreur est déjà envoyée au dessus, on ne doit pas bloquer l'execution.
						finalize(() => this.waitBeforeEndEvent())
					)
				)
			);
	}

	private waitBeforeEndEvent(): void {
		// Si on veut un écart minimum entre 2 exécutions, on attend avant d'envoyer l'événement de fin.
		defer(() => NumberHelper.isValid(this.moParams.minimumGapMs) ? timer(this.moParams.minimumGapMs) : of(null))
			.pipe(finalize(() => this.moActionRunningCounterSubject.next(this.moActionRunningCounterSubject.value - 1)))
			.subscribe();
	}

	/** Ajoute une exécution dans la file. */
	public exec(poParams: P): void {
		this.moQueueSubject?.next({ params: poParams });
	}

	/** Ajoute une exécution dans la file. */
	public exec$(poParams: P): Observable<T> {
		const loSubjects: IExecutionSubjects<T> = {
			responseSubject: new Subject,
			errorSubject: new Subject,
			completeSubject: new Subject
		};

		return merge(
			loSubjects.responseSubject.asObservable(),
			loSubjects.errorSubject.asObservable().pipe(mergeMap((poError: any) => throwError(() => poError)))
		)
			.pipe(
				afterSubscribe(() => this.moQueueSubject?.next({ params: poParams, subjects: loSubjects })),
				takeUntil(loSubjects.completeSubject.asObservable()),
				finalize(() => {
					loSubjects.responseSubject.complete();
					loSubjects.errorSubject.complete();
					loSubjects.completeSubject.complete();
				})
			);
	}

	/** Termine la file. */
	public end(): void {
		this.moQueueSubject?.complete();
		this.moExecSubject?.complete();
		this.moActionRunningCounterSubject?.complete();
	}

	//#endregion
}

export class Queuer<T, P extends any[] = []> {

	//#region FIELDS

	private moResponseSubject: Subject<T>;
	private readonly moQueuesById = new Map<any, Queue<T, P>>();

	//#endregion

	//#region METHODS

	constructor(private readonly moParams: IQueuerParams<P, T>) { }

	/** Débute la file. */
	public start(): Observable<T> {
		if (!this.moResponseSubject)
			this.moResponseSubject = new Subject;

		return this.moResponseSubject.asObservable();
	}

	/** Ajoute une exécution dans la file. */
	public exec(...poParams: P): void {
		this.getQueue(poParams)?.exec(poParams);
	}

	/** Ajoute une exécution dans la file. */
	public exec$(...poParams: P): Observable<T> {
		return this.getQueue(poParams)?.exec$(poParams) ?? EMPTY;
	}

	private getQueue(poParams: P): Queue<T, P> | undefined {
		if (!this.moResponseSubject) // Si le sujet n'est pas défini alors le queuer n'a pas été démarré.
			return undefined;

		const loId: any = this.moParams.idBuilder ? this.moParams.idBuilder(...poParams) : undefined;

		if (loId instanceof Array) { // Si l'id est un tableau, alors on va lancer en parallèle sur plusieurs files pour les bloquer.
			const laQueues: Queue<T, P>[] = loId.map((poId: any) => this.startQueue(poId));

			return ArrayHelper.getFirstElement(laQueues);
		}

		return this.startQueue(loId);
	}

	private startQueue(poId: any): Queue<T, P> {
		let loQueue: Queue<T, P> | undefined = this.moQueuesById.get(poId);
		if (!loQueue) {
			loQueue = new Queue(this.moParams);

			loQueue.start()
				.pipe(
					finalize(() => {
						if (this.moResponseSubject) {
							this.moResponseSubject.complete();
							this.moResponseSubject = undefined;
						}
					})
				)
				.subscribe((poResponse: T) => {
					if (this.moResponseSubject)
						this.moResponseSubject.next(poResponse);
				});

			this.moQueuesById.set(poId, loQueue);
		}

		return loQueue;
	}

	/** Termine la file. */
	public end(): void {
		this.moQueuesById.forEach((poQueue: Queue<T, P>, psId: string, poQueuesById: Map<string, Queue<T, P>>) => {
			poQueue.end();
			poQueuesById.delete(psId);
		});
	}

	/** Remplacement de l'action à exécuter dans la file par une nouvelle.
	 * @param pfNewValue Action à exécuter à la place de l'ancienne.
	 */
	public replaceActionQueue(pfNewValue: (...poParams: P) => ObservableInput<T>): void {
		this.moParams.thingToQueue = pfNewValue;
	}

	//#endregion

}