import { Injectable } from "@angular/core";
import { PerformanceManager } from "@calaosoft/osapp-common/performance/PerformanceManager";
import { StoreHelper } from "@calaosoft/osapp-common/store/helpers/store-helper";
import { IStoreReplicationResponse } from "@calaosoft/osapp-common/store/models/IStoreReplicationResponse";
import { IStoreDocument } from "@calaosoft/osapp-common/store/models/istore-document";
import { EStoreReplicationResponseStatus } from "@calaosoft/osapp-common/store/models/replications/estore-replication-response-status";
import { OsappError } from "@calaosoft/osapp-common/utils/errors/OsappError";
import { ArrayHelper } from "@calaosoft/osapp-common/utils/helpers/arrayHelper";
import { MapHelper } from "@calaosoft/osapp-common/utils/helpers/mapHelper";
import PCancelable, { CancelError } from "p-cancelable";
import { Database } from "../../../../model/store/Database";
import { NetworkService } from "../../../../services/network.service";
import { Store } from "../../../../services/store.service";
import { CancelablePromiseTracker } from "../../../promises/models/cancelable-promise-tracker";
import { IChangeTrackerItem } from "../../change-tracking/models/ichange-tracker-item";
import { ILot } from "../../change-tracking/models/ilot";
import { ChangeTrackingService } from "../../change-tracking/services/change-tracking.service";
import { ReplicatorError } from "../models/errors/replicator-error";
import { IBulkGetResult } from "../models/ibulk-get-result";
import { IBulkGetResultError } from "../models/ibulk-get-result-error";
import { IBulkGetResultMeta } from "../models/ibulk-get-result-meta";
import { IOnProgressFunction } from "../models/ion-progress-function";
import { IReplicatorParamsBase } from "../models/ireplicator-params-base";
import { ReplicatorBase } from "../models/replicator-base";

type IDocRevsInfo = PouchDB.Core.Document<{}> & PouchDB.Core.GetMeta;
type TBulkGetResultMetaDocument = IStoreDocument & PouchDB.Core.RevisionIdMeta;
interface IDocIdRev {
	id: string;
	rev: string;
}

@Injectable()
export class ChangeTrackingReplicatorService extends ReplicatorBase {
	//#region FIELDS

	private static readonly C_LOG_ID = "CHANGETRACKREPLIC.S::";
	private static readonly C_DEFAULT_BATCH_SIZE = 100;

	//#endregion

	//#region METHODS

	constructor(private readonly isvcChangeTracker: ChangeTrackingService, psvcNetwork: NetworkService) {
		super(psvcNetwork, "ChangeTracking");
	}

	public override replicateAsync(
		poParams: IReplicatorParamsBase,
		pfOnProgress?: IOnProgressFunction
	): PCancelable<IStoreReplicationResponse> {
		const loPromise = new PCancelable<IStoreReplicationResponse>(async (pfResolve, pfReject, pfOnCancel) => {
			const loReplicatePerformanceManager = new PerformanceManager().markStart();
			const lsStartTime: string = new Date(loReplicatePerformanceManager.startTime).toISOString();
			const laReplicatedDocs: IStoreDocument[] = [];
			const lnSeqNumber: number = (await poParams.sourceInstance.info()).update_seq as number;
			let lnDocs = 0;
			const loCancelablePromiseTracker = new CancelablePromiseTracker();

			if (StoreHelper.isRemoteDatabase(poParams.sourceInstance.name))
				throw new OsappError("La base source doit être une base locale.");

			console.debug(`${ChangeTrackingReplicatorService.C_LOG_ID}Starting replication for '${poParams.database.id}'.`);
			try {
				await this.handleNetworkAsync(poParams.database);

				pfOnCancel(() => loCancelablePromiseTracker.cancelAll());

				// On met à jour et on récupère les lots avant la mise à jour, pour que les nouveaux traceurs se fassent sur un nouveau lot.
				const laLots: ILot[] = await loCancelablePromiseTracker.track(
					this.isvcChangeTracker.getAndUpdateLastLotAsync(
						poParams.database.id,
						(
							await poParams.sourceInstance.info()
						).update_seq as number
					)
				);
				const loLastLot: ILot | undefined = ArrayHelper.getLastElement(laLots);
				const laChangeTrackerItems: IChangeTrackerItem[] = await loCancelablePromiseTracker.track(
					this.isvcChangeTracker.getTrackedAsync(poParams.database.id, loLastLot)
				);
				// On groupe les trackers par identifiants de document.
				const loChangeTrackerItemsGroupedByLot: Map<number, IChangeTrackerItem[]> =
					this.groupChangeTrackerItemsByLot(laChangeTrackerItems);
				const loLotsById: Map<number, ILot> = ArrayHelper.groupByUnique(laLots, (poLot: ILot) => poLot.id);
				// On fonctionne par groupe de 100 par défaut
				const lnBatchSize: number =
					poParams.replicateOptions?.batch_size ?? ChangeTrackingReplicatorService.C_DEFAULT_BATCH_SIZE;

				console.debug(
					`${ChangeTrackingReplicatorService.C_LOG_ID}Lots to bulk: ${laLots
						.map((loLot: ILot) => `${loLot.id} (${loLot.since})`)
						.join(", ")}.`
				);

				if (pfOnProgress)
					// On envoie une première progression pour indiquer un début de réplication.
					pfOnProgress({ total: laChangeTrackerItems.length, loaded: lnDocs });

				// Pour chaque lot.
				for (const [lnLotId, laLotChangeTrackerItems] of Array.from(loChangeTrackerItemsGroupedByLot)) {
					if (loPromise.isCanceled) break;

					while (ArrayHelper.hasElements(laLotChangeTrackerItems)) {
						// Tant qu'il y a des éléments à envoyer.
						const laBatchItems: IChangeTrackerItem[] = laLotChangeTrackerItems.splice(0, lnBatchSize);
						console.debug(
							`${ChangeTrackingReplicatorService.C_LOG_ID}Bulking doc: ${lnLotId}, docs: ${laBatchItems
								.map((poItem: IChangeTrackerItem) => poItem.id)
								.join(", ")}.`
						);
						laReplicatedDocs.push(
							...(await this.execReplicateAsync(
								loCancelablePromiseTracker,
								poParams,
								laBatchItems,
								loLotsById,
								lnLotId
							))
						);

						if (pfOnProgress) {
							pfOnProgress(
								{ total: laChangeTrackerItems.length, loaded: (lnDocs += laBatchItems.length) },
								this.createReplicationResponse(lnDocs, lnSeqNumber, lsStartTime, laReplicatedDocs, true)
							);
						}
					}
				}
				console.debug(
					`${ChangeTrackingReplicatorService.C_LOG_ID}Replication time for database '${
						poParams.database.id
					}' : ${loReplicatePerformanceManager.markEnd().measure()}ms.`,
					laReplicatedDocs
				);

				pfResolve(this.createReplicationResponse(lnDocs, lnSeqNumber, lsStartTime, laReplicatedDocs));
			} catch (poError) {
				if (poError instanceof CancelError)
					pfResolve(this.createReplicationResponse(lnDocs, lnSeqNumber, lsStartTime, laReplicatedDocs));
				else pfReject(new ReplicatorError(poError, laReplicatedDocs));
			}
		});

		return loPromise;
	}

	private execReplicateAsync(
		poCancelablePromiseTracker: CancelablePromiseTracker,
		poParams: IReplicatorParamsBase,
		paBatchItems: IChangeTrackerItem[],
		loLotsById: Map<number, ILot>,
		pnLotId: number
	): Promise<IStoreDocument[]> {
		return poCancelablePromiseTracker.track(
			this.processBatchAsync(
				poParams.database,
				paBatchItems,
				poParams.sourceInstance,
				loLotsById,
				pnLotId,
				poParams.targetInstance
			)
		);
	}

	private groupChangeTrackerItemsByLot(paChangeTrackerItems: IChangeTrackerItem[]): Map<number, IChangeTrackerItem[]> {
		// On va récupérer pour chaque document le tracker le plus ancien.
		const laUniqueChangeTrackerItems: IChangeTrackerItem[] = ArrayHelper.unique(
			[...paChangeTrackerItems].reverse(), // On inverse pour avoir le plus récent en premier en ne modifiant pas le tableau d'origine.
			(poChangeTrackerItem: IChangeTrackerItem) => poChangeTrackerItem.id
		).reverse(); // On inverse à nouveau pour remettre dans l'ordre naturel, donc indexé par numéro de lot.

		return ArrayHelper.groupBy(
			laUniqueChangeTrackerItems,
			(poChangeTrackerItem: IChangeTrackerItem) => poChangeTrackerItem.lotId
		);
	}

	private createReplicationResponse(
		pnDocs: number,
		pnSeqNumber: number,
		psStartTime: string,
		paResponses: IStoreDocument[],
		pbRunning = false
	): IStoreReplicationResponse {
		return {
			doc_write_failures: 0,
			docs_read: pnDocs,
			docs_written: pnDocs,
			errors: [],
			last_seq: pnSeqNumber,
			ok: true,
			start_time: psStartTime,
			end_time: new Date().toISOString(),
			status: pbRunning ? EStoreReplicationResponseStatus.running : EStoreReplicationResponseStatus.complete,
			docs: paResponses,
			replicationMode: this.name
		};
	}

	private processBatchAsync(
		poDatabase: Database,
		paBatchItems: IChangeTrackerItem[],
		poSource: PouchDB.Database<{}>,
		poLotsById: Map<number, ILot>,
		pnLotId: number,
		poTarget: PouchDB.Database<{}>
	): PCancelable<IStoreDocument[]> {
		const loPromise = new PCancelable<IStoreDocument[]>(async (pfResolve, pfReject, pfOnCancel) => {
			try {
				const laLotChangeTrackerItemsGroupedById: Map<string, IChangeTrackerItem> = ArrayHelper.groupByUnique(
					paBatchItems,
					(poChangeTrackerItem: IChangeTrackerItem) => poChangeTrackerItem.id
				);
				const laDocsRevsInfo: IDocRevsInfo[] = await this.getDocumentsAsync(
					poSource,
					laLotChangeTrackerItemsGroupedById,
					poLotsById.get(pnLotId)
				);
				const loPerformanceManager = new PerformanceManager().markStart();

				if (loPromise.isCanceled) return;

				if (ArrayHelper.hasElements(laDocsRevsInfo)) {
					// Le 'new_edits: false' fait que le 'bulkDocs' ne retourne pas les documents sauvegardés.
					await poTarget.bulkDocs(laDocsRevsInfo, { new_edits: false });
					console.debug(
						`${ChangeTrackingReplicatorService.C_LOG_ID}Bulk ${laDocsRevsInfo.length} docs time: ${loPerformanceManager
							.markEnd()
							.measure()} ms.`
					);
				}

				if (loPromise.isCanceled) return;

				const laIdsToDrop: string[] = [];
				const laDocsToReturn: IStoreDocument[] = [];

				laDocsRevsInfo.forEach((poDocRevInfo: IDocRevsInfo) => {
					laIdsToDrop.push(poDocRevInfo._id);
					laDocsToReturn.push({ _id: poDocRevInfo._id, _rev: poDocRevInfo._rev });
				});

				const loDropPromise: PCancelable<void> = this.isvcChangeTracker.dropTrackedAsync(
					poDatabase.id,
					pnLotId,
					laIdsToDrop
				);

				pfOnCancel(() => loDropPromise.cancel());

				await loDropPromise;
				console.debug(
					`${ChangeTrackingReplicatorService.C_LOG_ID}Lot "${pnLotId}" dropped, ids: ${laIdsToDrop.join(", ")}.`
				);

				pfResolve(laDocsToReturn);
			} catch (poError) {
				pfReject(poError);
			}
		});

		return loPromise;
	}

	/** Permet de récupérer les révisions, conflits et corps des documents à synchroniser.
	 * @param poChangeTrackerItemsById
	 */
	private async getDocumentsAsync(
		poSource: PouchDB.Database<{}>,
		poChangeTrackerItemsById: Map<string, IChangeTrackerItem>,
		poLot?: ILot
	): Promise<IDocRevsInfo[]> {
		const loPerformanceManager = new PerformanceManager().markStart();
		const laKeys: string[] = MapHelper.keysToArray(poChangeTrackerItemsById);
		const [laConflictedRevsByIds, laDocsIdRev]: [Map<string, string[]>, IDocIdRev[]] =
			await this.getDocumentsIdRevAsync(laKeys, poSource, poLot);
		const laRevsInfo: IDocRevsInfo[] = await this.getDocumentsBodyAsync(laDocsIdRev, poSource, laConflictedRevsByIds);

		console.debug(
			`${ChangeTrackingReplicatorService.C_LOG_ID}Get docs revs info time: ${loPerformanceManager
				.markEnd()
				.measure()} ms.`,
			laRevsInfo
		);

		return laRevsInfo;
	}

	private async getDocumentsIdRevAsync(
		paKeys: string[],
		poSource: PouchDB.Database<{}>,
		poLot?: ILot
	): Promise<[Map<string, string[]>, IDocIdRev[]]> {
		const laConflictedRevsByIds = new Map<string, string[]>();
		const laDocs: IDocIdRev[] = [];

		if (paKeys.length > 0) {
			// On récupère les infos des docs (sans include_docs) pour voir si les docs existent et s'ils ont des conflits.
			// On prépare un tableau de id/rev pour pouvoir faire une requête contenant toutes les versions à envoyer.
			const loChangesResponse: PouchDB.Core.ChangesResponse<{}> = await poSource.changes({
				doc_ids: paKeys,
				style: "all_docs",
				since: poLot?.since,
				batch_size: Store.C_DEFAULT_BATCH_SIZE
			});

			loChangesResponse.results.forEach((poRow: PouchDB.Core.ChangesResponseChange<{}>) => {
				// Il semble que la première révision soit toujours la 'winning'.
				laDocs.push({ id: poRow.id, rev: poRow.changes.shift()!.rev });

				if (ArrayHelper.hasElements(poRow.changes)) {
					// Si > 0, c'est qu'on a des conflits.
					const laConflictedRevs: string[] = [];
					poRow.changes.forEach((poChange: { rev: string }) => {
						laConflictedRevs.push(poChange.rev);
						laDocs.push({ id: poRow.id, rev: poChange.rev });
					});
					laConflictedRevsByIds.set(poRow.id, laConflictedRevs);
				}
			});
		}

		return [laConflictedRevsByIds, laDocs];
	}

	private async getDocumentsBodyAsync(
		paDocs: IDocIdRev[],
		poSource: PouchDB.Database<{}>,
		paConflictedRevsByIds: Map<string, string[]>
	): Promise<IDocRevsInfo[]> {
		const laRevsInfo: IDocRevsInfo[] = [];

		if (paDocs.length > 0) {
			// On récupère le contenu de toutes les versions (winning et conflits) de tous les documents passés en paramètre.
			const loBulkGetResponse: PouchDB.Core.BulkGetResponse<{}> = await poSource.bulkGet({ docs: paDocs, revs: true });

			loBulkGetResponse.results.forEach((poResult: IBulkGetResult) => {
				const laConflictRevs: string[] | undefined = paConflictedRevsByIds.get(poResult.id);

				poResult.docs.forEach((poDoc: IBulkGetResultError | IBulkGetResultMeta) => {
					// Le champs "ok" contient le 'IStoreDocument'.
					const loOkDoc: TBulkGetResultMetaDocument = (poDoc as IBulkGetResultMeta).ok as TBulkGetResultMetaDocument;

					if (loOkDoc) {
						if (!laConflictRevs?.includes(loOkDoc._rev))
							// On est dans le cas de la révision 'winnig', donc on tronque l'historique si besoin.
							loOkDoc._conflicts = laConflictRevs;

						laRevsInfo.push(loOkDoc);
					}
				});
			});
		}

		return laRevsInfo;
	}

	//#endregion
}
