import { Injectable } from '@nestjs/common'; import { Insertable, Kysely, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { DB, SessionSyncCheckpoints } from 'src/db'; import { SyncEntityType } from 'src/enum'; import { SyncAck } from 'src/types'; @Injectable() export class SyncRepository { constructor(@InjectKysely() private db: Kysely) {} getCheckpoints(sessionId: string) { return this.db .selectFrom('session_sync_checkpoints') .select(['type', 'ack']) .where('sessionId', '=', sessionId) .execute(); } upsertCheckpoints(items: Insertable[]) { return this.db .insertInto('session_sync_checkpoints') .values(items) .onConflict((oc) => oc.columns(['sessionId', 'type']).doUpdateSet((eb) => ({ ack: eb.ref('excluded.ack'), })), ) .execute(); } deleteCheckpoints(sessionId: string, types?: SyncEntityType[]) { return this.db .deleteFrom('session_sync_checkpoints') .where('sessionId', '=', sessionId) .$if(!!types, (qb) => qb.where('type', 'in', types!)) .execute(); } getUserUpserts(ack?: SyncAck) { return this.db .selectFrom('users') .select(['id', 'name', 'email', 'deletedAt', 'updateId']) .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) .orderBy(['updateId asc']) .stream(); } getUserDeletes(ack?: SyncAck) { return this.db .selectFrom('users_audit') .select(['id', 'userId']) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) .orderBy(['id asc']) .stream(); } }