All files / journal-processor/domain journal-event-handler.ts

0% Statements 0/51
0% Branches 0/15
0% Functions 0/3
0% Lines 0/48

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116                                                                                                                                                                                                                                       
import { autoInjectable, inject, singleton } from "tsyringe";
 
import { tokens } from "../../registry/tokens.js";
 
import type { Logger } from "pino";
import { AppDataSourceService } from "./data-source.js";
import { JournalEvent } from "../domain/journal-events.js";
import { JournalEntity } from "../entity/journal.entity.js";
import { JournalEntryMapper } from "./journal-entry-mapper.js";
import { JournalEntryEntity } from "../entity/journal.entry.entity.js";
import { AmountMapper } from "../../commons/mappers/amount.mapper.js";
import { MetricLogDto } from "../dtos/journal-event.dto.js";
import { ProcessedEventEntity } from "../entity/processed.event.entity.js";
import { RedisJournalStore } from "../store/redis-journal.store.js";
import { Result } from "../../tr8-script/domain/utils/types.js";
 
@singleton()
@autoInjectable()
class JournalEventHandler {
  constructor(
    @inject(tokens.Logger) private readonly logger: Logger,
    private appDataSourceService: AppDataSourceService,
    private readonly journalEntryMapper: JournalEntryMapper,
    private readonly redisJournalStore: RedisJournalStore,
  ) {}
 
  async handle(
    event: JournalEvent,
    partition: number,
    offset: string,
  ): Promise<Result<{ journalEntity: JournalEntity } | null, Error>> {
    const processedEventRepository =
      this.appDataSourceService.AppDataSource.getRepository(
        ProcessedEventEntity,
      );
 
    const redisResult = await this.redisJournalStore.get(event.id, event.type);
    if (!redisResult.ok) return redisResult;
    const cached = redisResult.value;
 
    if (cached && cached.offset === offset) {
      this.logger.info(
        `[JournalEventHandler] Skip scriptId=${event.id}, offset already exists in Redis`,
      );
      return { ok: true, value: null };
    }
 
    const metadataString = JSON.stringify(event.metadata);
    const metadata = JSON.parse(metadataString);
    const transactionTime = new Date(metadata.transaction_time);
    const eventIds = `${partition}#${offset}`;
    const existingProcessedEvent = await processedEventRepository.findOne({
      where: { event_id: eventIds },
    });
    if (existingProcessedEvent) {
      this.logger.info(`Event ${eventIds} already processed, skipping.`);
    }
 
    const journalEntity = new JournalEntity();
    journalEntity.scriptId = event.id;
    journalEntity.scriptType = event.type;
    journalEntity.metadata = event.metadata;
    journalEntity.transaction_time = transactionTime;
    journalEntity.received_at = event.receivedAt;
    journalEntity.executed_at = event.executedAt;
    journalEntity.partition = partition;
    journalEntity.offset = offset;
    journalEntity.entries = [];
 
    for (const journalEntry of this.journalEntryMapper.map(event.journals)) {
      const journalEntryEntity = new JournalEntryEntity();
      journalEntryEntity.journal_id = journalEntity.id;
      journalEntryEntity.book_type = journalEntry.book.type;
      journalEntryEntity.book_id = journalEntry.book.book;
      journalEntryEntity.account_id = journalEntry.account;
      journalEntryEntity.type = journalEntry.type;
      journalEntryEntity.amount = AmountMapper.toDTO(journalEntry.amount);
      journalEntryEntity.received_at = event.receivedAt;
      journalEntity.entries.push(journalEntryEntity);
    }
 
    // Convert metrics from MetricTxLog to MetricLogDto
    const metrics: MetricLogDto[] = [];
    for (const metric of event.metrics) {
      const isDuplicateMetric = metrics.some(
        (existingMetric) =>
          existingMetric.book.book === metric.book.book &&
          existingMetric.book.type === metric.book.type &&
          existingMetric.account === metric.account,
      );
 
      if (!isDuplicateMetric) {
        metrics.push({
          ...metric,
          change: {
            ...metric.change,
            amount: metric.change.amount.toString(),
            scale: metric.change.scale.toString(),
          },
        });
      }
    }
 
    const saveResult = await this.redisJournalStore.set(
      event.id,
      event.type,
      journalEntity,
    );
    if (!saveResult.ok) return { ok: false, error: saveResult.error };
 
    return { ok: true, value: { journalEntity } };
  }
}
 
export { JournalEventHandler };