Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | import { autoInjectable, inject, singleton } from "tsyringe";
import { tokens } from "../../registry/tokens.js";
import type { Logger } from "pino";
import { AppDataSourceService } from "./data-source.js";
import { JournalEvent } from "../domain/journal-events.js";
import { JournalEntity } from "../entity/journal.entity.js";
import { JournalEntryMapper } from "./journal-entry-mapper.js";
import { JournalEntryEntity } from "../entity/journal.entry.entity.js";
import { AmountMapper } from "../../commons/mappers/amount.mapper.js";
import { MetricLogDto } from "../dtos/journal-event.dto.js";
import { ProcessedEventEntity } from "../entity/processed.event.entity.js";
import { RedisJournalStore } from "../store/redis-journal.store.js";
import { Result } from "../../tr8-script/domain/utils/types.js";
@singleton()
@autoInjectable()
class JournalEventHandler {
constructor(
@inject(tokens.Logger) private readonly logger: Logger,
private appDataSourceService: AppDataSourceService,
private readonly journalEntryMapper: JournalEntryMapper,
private readonly redisJournalStore: RedisJournalStore,
) {}
async handle(
event: JournalEvent,
partition: number,
offset: string,
): Promise<Result<{ journalEntity: JournalEntity } | null, Error>> {
const processedEventRepository =
this.appDataSourceService.AppDataSource.getRepository(
ProcessedEventEntity,
);
const redisResult = await this.redisJournalStore.get(event.id, event.type);
if (!redisResult.ok) return redisResult;
const cached = redisResult.value;
if (cached && cached.offset === offset) {
this.logger.info(
`[JournalEventHandler] Skip scriptId=${event.id}, offset already exists in Redis`,
);
return { ok: true, value: null };
}
const metadataString = JSON.stringify(event.metadata);
const metadata = JSON.parse(metadataString);
const transactionTime = new Date(metadata.transaction_time);
const eventIds = `${partition}#${offset}`;
const existingProcessedEvent = await processedEventRepository.findOne({
where: { event_id: eventIds },
});
if (existingProcessedEvent) {
this.logger.info(`Event ${eventIds} already processed, skipping.`);
}
const journalEntity = new JournalEntity();
journalEntity.scriptId = event.id;
journalEntity.scriptType = event.type;
journalEntity.metadata = event.metadata;
journalEntity.transaction_time = transactionTime;
journalEntity.received_at = event.receivedAt;
journalEntity.executed_at = event.executedAt;
journalEntity.partition = partition;
journalEntity.offset = offset;
journalEntity.entries = [];
for (const journalEntry of this.journalEntryMapper.map(event.journals)) {
const journalEntryEntity = new JournalEntryEntity();
journalEntryEntity.journal_id = journalEntity.id;
journalEntryEntity.book_type = journalEntry.book.type;
journalEntryEntity.book_id = journalEntry.book.book;
journalEntryEntity.account_id = journalEntry.account;
journalEntryEntity.type = journalEntry.type;
journalEntryEntity.amount = AmountMapper.toDTO(journalEntry.amount);
journalEntryEntity.received_at = event.receivedAt;
journalEntity.entries.push(journalEntryEntity);
}
// Convert metrics from MetricTxLog to MetricLogDto
const metrics: MetricLogDto[] = [];
for (const metric of event.metrics) {
const isDuplicateMetric = metrics.some(
(existingMetric) =>
existingMetric.book.book === metric.book.book &&
existingMetric.book.type === metric.book.type &&
existingMetric.account === metric.account,
);
if (!isDuplicateMetric) {
metrics.push({
...metric,
change: {
...metric.change,
amount: metric.change.amount.toString(),
scale: metric.change.scale.toString(),
},
});
}
}
const saveResult = await this.redisJournalStore.set(
event.id,
event.type,
journalEntity,
);
if (!saveResult.ok) return { ok: false, error: saveResult.error };
return { ok: true, value: { journalEntity } };
}
}
export { JournalEventHandler };
|