// > client.ts
import { replicateRxCollection } from 'rxdb/plugins/replication';
import { RxDatabase, RxReplicationWriteToMasterRow, RxReplicationPullStreamItem } from 'rxdb';
import { Subject, Observable } from 'rxjs';

const messagesPullStream$ = new Subject();
const eventSourceMessages = new EventSource(getAPIUrl('/messages/pullStream'), { withCredentials: true });

eventSourceMessages.onmessage = event => {
    const eventData = JSON.parse(event.data);
    messagesPullStream$.next({
        documents: eventData.documents,
        checkpoint: eventData.checkpoint
    });
};
eventSourceMessages.onerror = () => messagesPullStream$.next('RESYNC');


const environmentsPullStream$ = new Subject();
const eventSourceEnvironments = new EventSource(getAPIUrl('/environments/pullStream'), { withCredentials: true });

eventSourceEnvironments.onmessage = event => {
    const eventData = JSON.parse(event.data);
    environmentsPullStream$.next({
        documents: eventData.documents,
        checkpoint: eventData.checkpoint
    });
};


type checkpoint = {
    id: string;
    updatedAt: number;
}

export async function syncEnvironmentList(database: RxDatabase){
    return replicateRxCollection({
        collection: database.collections.environments,
        replicationIdentifier: 'environment-list',
        waitForLeadership: false,
        push: {
            async handler(changeRowsUntyped){
                const changeRows = changeRowsUntyped as RxReplicationWriteToMasterRow<Environment>[];
                const relevantChangeRows = changeRows.filter(row => row.newDocumentState.public === true);

                const rawResponse = await fetch(getAPIUrl('/environments/push'), {
                    method: 'POST',
                    headers: {
                        'Accept': 'application/json',
                        'Content-Type': 'application/json'
                    },
                    body: JSON.stringify(relevantChangeRows)
                });
                const conflictsArray = await rawResponse.json();
                console.log('pushing environments', relevantChangeRows.length, relevantChangeRows);
                return conflictsArray;
            }
        },
        pull: {
            async handler(checkpointOrNullUntyped, batchSize) {

                const checkpointOrNull = checkpointOrNullUntyped as checkpoint | null;
                const updatedAt = checkpointOrNull?.updatedAt ?? 0;
                const id = checkpointOrNull?.id ?? '';
                const response = await fetch(getAPIUrl(`/environments/pull?updatedAt=${updatedAt}&id=${id}&limit=${batchSize}`));
                const data = await response.json();
                console.log('pulling environments', data.documents.length, data.documents);
                return {
                    documents: data.documents,
                    checkpoint: data.checkpoint
                };
            },
            stream$: environmentsPullStream$.asObservable() as Observable<RxReplicationPullStreamItem<unknown, unknown>>
        }
    });

}



export async function syncEnvironment(database: RxDatabase, envId: string, localAgent: string){ 
    
    return replicateRxCollection({
        collection: database.collections.messages,
        replicationIdentifier: 'messages',
        waitForLeadership: false,
        push: {
            async handler(changeRowsUntyped){
                //filter by envId 
                const changeRows = changeRowsUntyped as RxReplicationWriteToMasterRow<Message>[];
                const relevantChangeRows = changeRows.filter(row => 
                    row.newDocumentState.environmentId === envId &&
                    (row.newDocumentState.agentId === localAgent || row.newDocumentState.hostId === localAgent)
                );
                const rawResponse = await fetch(getAPIUrl('/messages/push'), {
                    method: 'POST',
                    headers: {
                        'Accept': 'application/json',
                        'Content-Type': 'application/json'
                    },
                    body: JSON.stringify(relevantChangeRows)
                });
                const conflictsArray = await rawResponse.json();
                return conflictsArray;
            }
        },
        pull: {
            async handler(checkpointOrNullUntyped, batchSize) {

                const checkpointOrNull = checkpointOrNullUntyped as checkpoint | null;
                const updatedAt = checkpointOrNull ? checkpointOrNull.updatedAt : 0;
                const id = checkpointOrNull ? checkpointOrNull.id : '';
                const response = await fetch(getAPIUrl(`/messages/pull?updatedAt=${updatedAt}&id=${id}&limit=${batchSize}&envId=${envId}`));
                const data = await response.json();
                return {
                    documents: data.documents,
                    checkpoint: data.checkpoint
                };
            },
            stream$: messagesPullStream$.asObservable() as Observable<RxReplicationPullStreamItem<unknown, unknown>>
        }
    });
}

function getAPIUrl(path: string): string {
    const isProduction = process.env.NODE_ENV === 'production';
    return isProduction ? `https://api.scarlet.chat${path}` : `http://localhost:3005${path}`;
}

export {};