feat(server): job metrics (#8255)

* metric repo

* add metric repo

* remove unused import

* formatting

* fix

* try disabling job metrics for e2e

* import otel in test module
This commit is contained in:
Mert
2024-03-24 23:02:04 -04:00
committed by GitHub
parent 1855aaea99
commit c58a70ac8f
7 changed files with 83 additions and 6 deletions

View File

@@ -1,4 +1,5 @@
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { snakeCase } from 'lodash';
import { FeatureFlag, SystemConfigCore } from 'src/cores/system-config.core';
import { mapAsset } from 'src/dtos/asset-response.dto';
import { AllJobStatusResponseDto, JobCommandDto, JobStatusDto } from 'src/dtos/job.dto';
@@ -16,8 +17,10 @@ import {
QueueCleanType,
QueueName,
} from 'src/interfaces/job.interface';
import { IMetricRepository } from 'src/interfaces/metric.interface';
import { IPersonRepository } from 'src/interfaces/person.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
import { jobMetrics } from 'src/utils/instrumentation';
import { ImmichLogger } from 'src/utils/logger';
@Injectable()
@@ -31,6 +34,7 @@ export class JobService {
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IPersonRepository) private personRepository: IPersonRepository,
@Inject(IMetricRepository) private metricRepository: IMetricRepository,
) {
this.configCore = SystemConfigCore.create(configRepository);
}
@@ -92,6 +96,8 @@ export class JobService {
throw new BadRequestException(`Job is already running`);
}
this.metricRepository.addToCounter(`immich.queues.${snakeCase(name)}.started`, 1), { enabled: jobMetrics };
switch (name) {
case QueueName.VIDEO_CONVERSION: {
return this.jobRepository.queue({ name: JobName.QUEUE_VIDEO_CONVERSION, data: { force } });
@@ -156,14 +162,21 @@ export class JobService {
this.jobRepository.addHandler(queueName, concurrency, async (item: JobItem): Promise<void> => {
const { name, data } = item;
const queueMetric = `immich.queues.${snakeCase(queueName)}.active`;
this.metricRepository.updateGauge(queueMetric, 1, { enabled: jobMetrics });
try {
const handler = jobHandlers[name];
const status = await handler(data);
const jobMetric = `immich.jobs.${name.replaceAll('-', '_')}.${status}`;
this.metricRepository.addToCounter(jobMetric, 1, { enabled: jobMetrics });
if (status === JobStatus.SUCCESS || status == JobStatus.SKIPPED) {
await this.onDone(item);
}
} catch (error: Error | any) {
this.logger.error(`Unable to run job handler (${queueName}/${name}): ${error}`, error?.stack, data);
} finally {
this.metricRepository.updateGauge(queueMetric, -1, { enabled: jobMetrics });
}
});
}