diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index f782a26d3d..c188aa5b70 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -10,6 +10,10 @@ services: JWT_SECRET: BOOKKEEPING-DEV GRPC_INTERNAL_ORIGIN: '[::]:4001' GRPC_AUTHENTICATED_ORIGIN: '[::]:4002' + GAQ_ENABLE_RECALCULATION: "True" + GAQ_RECALCULATION_PERIOD: 10000 + GAQ_RECALCULATION_MIN_BATCH_SIZE: 5 + GAQ_RECALCULATION_MAX_BATCH_SIZE: 50 ports: - "4000:4000" - "4001:4001" diff --git a/docker-compose.test.yml b/docker-compose.test.yml index b43a5c8ef9..a505b1022e 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -10,6 +10,10 @@ services: JWT_SECRET: BOOKKEEPING-TEST-SUITE PAGE_ITEMS_LIMIT: 100 CCDB_SYNCHRONIZATION_PERIOD: 3153600000000 # 100y in milliseconds, to be sure all the runs are included when testing sync + GAQ_ENABLE_RECALCULATION: "True" + GAQ_RECALCULATION_PERIOD: 1000 + GAQ_RECALCULATION_MIN_BATCH_SIZE: 1 + GAQ_RECALCULATION_MAX_BATCH_SIZE: 1 restart: "no" database: diff --git a/lib/application.js b/lib/application.js index 70a4ac6839..ff39c2ed25 100644 --- a/lib/application.js +++ b/lib/application.js @@ -15,7 +15,7 @@ const database = require('./database'); const { webUiServer } = require('./server'); const { GRPCConfig, ServicesConfig } = require('./config'); -const { userCertificate, monalisa: monalisaConfig, ccdb: ccdbConfig, enableHousekeeping } = ServicesConfig; +const { userCertificate, monalisa: monalisaConfig, ccdb: ccdbConfig, gaq: gaqConfig, enableHousekeeping } = ServicesConfig; const { handleLostRunsAndEnvironments } = require('./server/services/housekeeping/handleLostRunsAndEnvironments.js'); const { isInTestMode } = require('./utilities/env-utils.js'); const { ScheduledProcessesManager } = require('./server/services/ScheduledProcessesManager.js'); @@ -27,6 +27,7 @@ const { AliEcsSynchronizer } = require('./server/kafka/AliEcsSynchronizer.js'); const { environmentService } = require('./server/services/environment/EnvironmentService.js'); const { runService } = require('./server/services/run/RunService.js'); const { CcdbSynchronizer } = require('./server/externalServicesSynchronization/ccdb/CcdbSynchronizer.js'); +const { gaqWorker } = require('./server/services/gaq/GaqWorker.js'); const { promises: fs } = require('fs'); const { MonAlisaClient } = require('./server/externalServicesSynchronization/monalisa/MonAlisaClient.js'); const https = require('https'); @@ -131,6 +132,16 @@ class BookkeepingApplication { }, ); } + + if (gaqConfig.enableRecalculation) { + this.scheduledProcessesManager.schedule( + () => gaqWorker.recalculateGaqSummaries(gaqConfig.minBatchSize, gaqConfig.maxBatchSize), + { + wait: 10 * 1000, + every: gaqConfig.recalculationPeriod, + }, + ); + } } catch (error) { this._logger.errorMessage(`Error while starting: ${error}`); return this.stop(); diff --git a/lib/config/services.js b/lib/config/services.js index 9e535881a9..599cc1ad13 100644 --- a/lib/config/services.js +++ b/lib/config/services.js @@ -27,6 +27,25 @@ const { CCDB_RUN_INFO_URL, } = process.env ?? {}; +/** + * Parse a positive integer env var, falling back to the default and warning if the value is invalid + * + * @param {string|undefined} raw the raw env var value + * @param {number} defaultValue value to use when raw is unset or invalid + * @param {string} name env var name (for the warning message) + * @return {number} the parsed value or the default + */ +const parsePositiveInt = (raw, defaultValue, name) => { + if (raw === undefined) return defaultValue; + const parsed = Number(raw); + if (!Number.isInteger(parsed) || parsed <= 0) { + // eslint-disable-next-line no-console + console.warn(`Invalid ${name}=${JSON.stringify(raw)}; falling back to ${defaultValue}`); + return defaultValue; + } + return parsed; +}; + exports.services = { enableHousekeeping: process.env?.ENABLE_HOUSEKEEPING?.toLowerCase() === 'true', userCertificate: { @@ -68,4 +87,11 @@ exports.services = { synchronizationPeriod: Number(CCDB_SYNCHRONIZATION_PERIOD) || 24 * 60 * 60 * 1000, // 1d in milliseconds runInfoUrl: CCDB_RUN_INFO_URL, }, + + gaq: { + enableRecalculation: process.env?.GAQ_ENABLE_RECALCULATION?.toLowerCase() === 'true', + recalculationPeriod: parsePositiveInt(process.env?.GAQ_RECALCULATION_PERIOD, 30 * 1000, 'GAQ_RECALCULATION_PERIOD'), // 30s default + minBatchSize: parsePositiveInt(process.env?.GAQ_RECALCULATION_MIN_BATCH_SIZE, 1, 'GAQ_RECALCULATION_MIN_BATCH_SIZE'), + maxBatchSize: parsePositiveInt(process.env?.GAQ_RECALCULATION_MAX_BATCH_SIZE, 100, 'GAQ_RECALCULATION_MAX_BATCH_SIZE'), + }, }; diff --git a/lib/database/adapters/GaqSummaryAdapter.js b/lib/database/adapters/GaqSummaryAdapter.js new file mode 100644 index 0000000000..53b6baf61a --- /dev/null +++ b/lib/database/adapters/GaqSummaryAdapter.js @@ -0,0 +1,62 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +/** + * GaqSummaryAdapter + */ +class GaqSummaryAdapter { + /** + * Constructor + */ + constructor() { + this.toEntity = this.toEntity.bind(this); + } + + /** + * Converts the given database object to an entity object. + * + * @param {SequelizeGaqSummary} databaseObject Object to convert. + * @returns {GaqSummary} Converted entity object. + */ + toEntity(databaseObject) { + const { + dataPassId, + runNumber, + badRunCoverage, + explicitlyNotBadRunCoverage, + mcReproducibleCoverage, + missingVerificationsCount, + undefinedQualityPeriodsCount, + notComputable, + invalidatedAt, + createdAt, + updatedAt, + } = databaseObject; + + return { + dataPassId, + runNumber, + badRunCoverage, + explicitlyNotBadRunCoverage, + mcReproducibleCoverage, + missingVerificationsCount, + undefinedQualityPeriodsCount, + notComputable, + invalidatedAt, + createdAt, + updatedAt, + }; + } +} + +module.exports = { GaqSummaryAdapter }; diff --git a/lib/database/adapters/index.js b/lib/database/adapters/index.js index 5ff6404b3d..9c858e1bc2 100644 --- a/lib/database/adapters/index.js +++ b/lib/database/adapters/index.js @@ -27,6 +27,7 @@ const EorReasonAdapter = require('./EorReasonAdapter'); const FlpRoleAdapter = require('./FlpRoleAdapter'); const { HostAdapter } = require('./HostAdapter.js'); const { GaqDetectorAdapter } = require('./GaqDetectorAdapter.js'); +const { GaqSummaryAdapter } = require('./GaqSummaryAdapter.js'); const { LhcFillAdapter } = require('./LhcFillAdapter.js'); const { LhcFillStatisticsAdapter } = require('./LhcFillStatisticsAdapter.js'); const LhcPeriodAdapter = require('./LhcPeriodAdapter'); @@ -63,6 +64,7 @@ const environmentHistoryItemAdapter = new EnvironmentHistoryItemAdapter(); const eorReasonAdapter = new EorReasonAdapter(); const flpRoleAdapter = new FlpRoleAdapter(); const gaqDetectorAdapter = new GaqDetectorAdapter(); +const gaqSummaryAdapter = new GaqSummaryAdapter(); const hostAdapter = new HostAdapter(); const lhcFillAdapter = new LhcFillAdapter(); const lhcFillStatisticsAdapter = new LhcFillStatisticsAdapter(); @@ -159,6 +161,7 @@ module.exports = { eorReasonAdapter, flpRoleAdapter, gaqDetectorAdapter, + gaqSummaryAdapter, hostAdapter, lhcFillAdapter, lhcFillStatisticsAdapter, diff --git a/lib/database/migrations/v1/20260223120000-create-gaq-summary-tables.js b/lib/database/migrations/v1/20260223120000-create-gaq-summary-tables.js new file mode 100644 index 0000000000..639db07c5c --- /dev/null +++ b/lib/database/migrations/v1/20260223120000-create-gaq-summary-tables.js @@ -0,0 +1,71 @@ +'use strict'; + +/** @type {import('sequelize-cli').Migration} */ +module.exports = { + up: async (queryInterface, Sequelize) => queryInterface.sequelize.transaction(async (transaction) => { + await queryInterface.createTable('gaq_summaries', { + data_pass_id: { + type: Sequelize.INTEGER, + primaryKey: true, + allowNull: false, + references: { + model: 'data_passes', + key: 'id', + }, + }, + run_number: { + type: Sequelize.INTEGER, + primaryKey: true, + allowNull: false, + references: { + model: 'runs', + key: 'run_number', + }, + }, + bad_run_coverage: { + type: Sequelize.FLOAT, + }, + explicitly_not_bad_run_coverage: { + type: Sequelize.FLOAT, + }, + mc_reproducible_coverage: { + type: Sequelize.FLOAT, + }, + missing_verifications_count: { + type: Sequelize.INTEGER, + }, + undefined_quality_periods_count: { + type: Sequelize.INTEGER, + }, + not_computable: { + type: Sequelize.BOOLEAN, + allowNull: false, + defaultValue: false, + }, + invalidated_at: { + type: Sequelize.DATE(3), + allowNull: true, + defaultValue: null, + }, + created_at: { + type: Sequelize.DATE(3), + allowNull: false, + defaultValue: Sequelize.literal('CURRENT_TIMESTAMP(3)'), + }, + updated_at: { + type: Sequelize.DATE(3), + allowNull: false, + defaultValue: Sequelize.literal('CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3)'), + }, + }, { transaction }); + + await queryInterface.addIndex('gaq_summaries', { + name: 'gaq_summaries_invalidated_at_idx', + fields: ['invalidated_at'], + }, { transaction }); + }), + + down: async (queryInterface) => queryInterface.sequelize.transaction(async (transaction) => { + await queryInterface.dropTable('gaq_summaries', { transaction }); + }), +}; diff --git a/lib/database/models/gaqSummary.js b/lib/database/models/gaqSummary.js new file mode 100644 index 0000000000..ca2a1f69b8 --- /dev/null +++ b/lib/database/models/gaqSummary.js @@ -0,0 +1,59 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +module.exports = (sequelize) => { + const Sequelize = require('sequelize'); + + const GaqSummary = sequelize.define('GaqSummary', { + dataPassId: { + type: Sequelize.INTEGER, + primaryKey: true, + }, + runNumber: { + type: Sequelize.INTEGER, + primaryKey: true, + }, + badRunCoverage: { + type: Sequelize.FLOAT, + }, + explicitlyNotBadRunCoverage: { + type: Sequelize.FLOAT, + }, + mcReproducibleCoverage: { + type: Sequelize.FLOAT, + }, + missingVerificationsCount: { + type: Sequelize.INTEGER, + }, + undefinedQualityPeriodsCount: { + type: Sequelize.INTEGER, + }, + notComputable: { + type: Sequelize.BOOLEAN, + allowNull: false, + defaultValue: false, + }, + invalidatedAt: { + type: Sequelize.DATE(3), + }, + }, { tableName: 'gaq_summaries' }); + + GaqSummary.removeAttribute('id'); + + GaqSummary.associate = (models) => { + GaqSummary.belongsTo(models.Run, { foreignKey: 'runNumber', as: 'run' }); + GaqSummary.belongsTo(models.DataPass, { foreignKey: 'dataPassId', as: 'dataPass' }); + }; + + return GaqSummary; +}; diff --git a/lib/database/models/index.js b/lib/database/models/index.js index 87d793fac3..2549209c5b 100644 --- a/lib/database/models/index.js +++ b/lib/database/models/index.js @@ -27,6 +27,7 @@ const EorReason = require('./eorreason'); const EpnRoleSession = require('./epnrolesession'); const FlpRole = require('./flprole'); const GaqDetector = require('./gaqDetector.js'); +const GaqSummary = require('./gaqSummary.js'); const Host = require('./host.js'); const LhcFill = require('./lhcFill'); const LhcFillStatistics = require('./lhcFillStatistics.js'); @@ -66,6 +67,7 @@ module.exports = (sequelize) => { EpnRoleSessionkey: EpnRoleSession(sequelize), FlpRole: FlpRole(sequelize), GaqDetector: GaqDetector(sequelize), + GaqSummary: GaqSummary(sequelize), Host: Host(sequelize), LhcFill: LhcFill(sequelize), LhcFillStatistics: LhcFillStatistics(sequelize), diff --git a/lib/database/repositories/GaqSummaryRepository.js b/lib/database/repositories/GaqSummaryRepository.js new file mode 100644 index 0000000000..81463d31c3 --- /dev/null +++ b/lib/database/repositories/GaqSummaryRepository.js @@ -0,0 +1,55 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +const { + models: { + GaqSummary, + }, +} = require('..'); +const Repository = require('./Repository'); + +/** + * GaqSummary repository + */ +class GaqSummaryRepository extends Repository { + /** + * Creates a new `GaqSummaryRepository` instance. + */ + constructor() { + super(GaqSummary); + } + + /** + * Mark the summary for a given (dataPassId, runNumber) as invalidated, creating the row if it does not yet exist + * + * @param {number} dataPassId data pass id + * @param {number} runNumber run number + * @return {Promise} resolves once the summary is invalidated + */ + async invalidate(dataPassId, runNumber) { + await this.upsert({ dataPassId, runNumber, invalidatedAt: new Date() }); + } + + /** + * Mark a list of summaries as invalidated in parallel + * + * @param {{ dataPassId: number, runNumber: number }[]} pairs the (dataPassId, runNumber) pairs to invalidate + * @return {Promise} resolves once all summaries are invalidated + */ + async invalidateMany(pairs) { + const invalidatedAt = new Date(); + await Promise.all(pairs.map(({ dataPassId, runNumber }) => this.upsert({ dataPassId, runNumber, invalidatedAt }))); + } +} + +module.exports = new GaqSummaryRepository(); diff --git a/lib/database/repositories/index.js b/lib/database/repositories/index.js index 0c79279752..4be7600145 100644 --- a/lib/database/repositories/index.js +++ b/lib/database/repositories/index.js @@ -27,6 +27,7 @@ const EnvironmentRepository = require('./EnvironmentRepository'); const EorReasonRepository = require('./EorReasonRepository'); const FlpRoleRepository = require('./FlpRoleRepository'); const GaqDetectorRepository = require('./GaqDetectorRepository.js'); +const GaqSummaryRepository = require('./GaqSummaryRepository.js'); const HostRepository = require('./HostRepository.js'); const LhcFillRepository = require('./LhcFillRepository'); const LhcFillStatisticsRepository = require('./LhcFillStatisticsRepository.js'); @@ -70,6 +71,7 @@ module.exports = { EorReasonRepository, FlpRoleRepository, GaqDetectorRepository, + GaqSummaryRepository, HostRepository, LhcFillRepository, LhcFillStatisticsRepository, diff --git a/lib/server/controllers/qcFlag.controller.js b/lib/server/controllers/qcFlag.controller.js index 7a088a3eb3..b7e4cf15f3 100644 --- a/lib/server/controllers/qcFlag.controller.js +++ b/lib/server/controllers/qcFlag.controller.js @@ -21,7 +21,7 @@ const { PaginationDto } = require('../../domain/dtos'); const { ApiConfig } = require('../../config'); const { countedItemsToHttpView } = require('../utilities/countedItemsToHttpView'); const { qcFlagService } = require('../services/qualityControlFlag/QcFlagService.js'); -const { gaqService } = require('../services/qualityControlFlag/GaqService.js'); +const { gaqService } = require('../services/gaq/GaqService.js'); const { qcFlagSummaryService } = require('../services/qualityControlFlag/QcFlagSummaryService.js'); const qcFlagFilterDTO = Joi.object({ diff --git a/lib/server/services/gaq/GaqDetectorsService.js b/lib/server/services/gaq/GaqDetectorsService.js index 1ac2a0dc7e..4f6d466503 100644 --- a/lib/server/services/gaq/GaqDetectorsService.js +++ b/lib/server/services/gaq/GaqDetectorsService.js @@ -12,7 +12,7 @@ */ const { gaqDetectorAdapter, detectorAdapter } = require('../../../database/adapters'); -const { GaqDetectorRepository, RunRepository, DetectorRepository } = require('../../../database/repositories'); +const { GaqDetectorRepository, RunRepository, DetectorRepository, GaqSummaryRepository } = require('../../../database/repositories'); const { BadParameterError } = require('../../errors/BadParameterError'); const { dataSource } = require('../../../database/DataSource.js'); const { Op } = require('sequelize'); @@ -57,6 +57,9 @@ class GaqDetectorService { .flatMap((runNumber) => detectorIds .map((detectorId) => ({ dataPassId, runNumber, detectorId }))); const createdEntries = await GaqDetectorRepository.insertAll(gaqEntries); + + await GaqSummaryRepository.invalidateMany(runNumbers.map((runNumber) => ({ dataPassId, runNumber }))); + return createdEntries.map(gaqDetectorAdapter.toEntity); }); } @@ -101,6 +104,9 @@ class GaqDetectorService { .flatMap(({ runNumber, detectors }) => detectors .map(({ id: detectorId }) => ({ dataPassId, runNumber, detectorId }))); const createdEntries = await GaqDetectorRepository.insertAll(gaqEntries); + + await GaqSummaryRepository.invalidateMany(runNumbers.map((runNumber) => ({ dataPassId, runNumber }))); + return createdEntries.map(gaqDetectorAdapter.toEntity); }, { transaction }); } diff --git a/lib/server/services/qualityControlFlag/GaqService.js b/lib/server/services/gaq/GaqService.js similarity index 54% rename from lib/server/services/qualityControlFlag/GaqService.js rename to lib/server/services/gaq/GaqService.js index 422e48bcac..a2b145ce3b 100644 --- a/lib/server/services/qualityControlFlag/GaqService.js +++ b/lib/server/services/gaq/GaqService.js @@ -28,15 +28,23 @@ */ const { getOneDataPassOrFail } = require('../dataPasses/getOneDataPassOrFail.js'); -const { QcFlagRepository } = require('../../../database/repositories/index.js'); +const { QcFlagRepository, GaqSummaryRepository } = require('../../../database/repositories/index.js'); const { qcFlagAdapter } = require('../../../database/adapters/index.js'); const { Op } = require('sequelize'); const { QcSummarProperties } = require('../../../domain/enums/QcSummaryProperties.js'); +const { LogManager } = require('@aliceo2/web-ui'); /** * Globally aggregated quality (QC flags aggregated for a predefined list of detectors per runs) service */ class GaqService { + /** + * Constructor + */ + constructor() { + this._logger = LogManager.getLogger('GAQ_SERVICE'); + } + /** * Get GAQ summary * @@ -83,6 +91,37 @@ class GaqService { return Object.fromEntries(gaqSummary); } + /** + * Find GAQ summary for given data pass and run. Returns null if no summary can be computed (e.g. no QC flags in GAQ periods) + * @param {number} dataPassId id of data pass + * @param {number} runNumber run number + * @return {Promise} promise of GAQ summary or null if it can't be computed + */ + async _computeSummary(dataPassId, runNumber) { + const gaqCoverages = await QcFlagRepository.getGaqCoverages(dataPassId, runNumber); + const entry = gaqCoverages[runNumber]; + if (!entry) { + return null; + } + + const { + badCoverage, + mcReproducibleCoverage, + goodCoverage, + flagsIds, + verifiedFlagsIds, + undefinedQualityPeriodsCount, + } = entry; + + return { + badRunCoverage: badCoverage, + explicitlyNotBadRunCoverage: goodCoverage, + mcReproducibleCoverage, + missingVerificationsCount: flagsIds.length - verifiedFlagsIds.length, + undefinedQualityPeriodsCount, + }; + } + /** * Find QC flags in GAQ effective periods for given data pass and run * @@ -113,6 +152,67 @@ class GaqService { contributingFlags: contributingFlagIds.map((id) => idToFlag[id]), })); } + + /** + * Calculate and store GAQ summary for given data pass and run + * @param {number} dataPassId id of data pass + * @param {number} runNumber run number + * @param {object} [options] additional options + * @param {Date} [options.expectedInvalidatedAt] if provided, invalidatedAt will only be cleared if it is equal to the provided value + * @return {Promise} promise + */ + async calculateAndStoreGaqSummary(dataPassId, runNumber, { expectedInvalidatedAt } = {}) { + const summary = await this._computeSummary(dataPassId, runNumber); + + const fields = { + dataPassId, + runNumber, + badRunCoverage: summary?.badRunCoverage ?? null, + explicitlyNotBadRunCoverage: summary?.explicitlyNotBadRunCoverage ?? null, + mcReproducibleCoverage: summary?.mcReproducibleCoverage ?? null, + missingVerificationsCount: summary?.missingVerificationsCount ?? null, + undefinedQualityPeriodsCount: summary?.undefinedQualityPeriodsCount ?? null, + notComputable: summary === null, + }; + + if (expectedInvalidatedAt === undefined) { + // No expected invalidation time provided, just upsert the summary + await GaqSummaryRepository.upsert({ dataPassId, runNumber, ...fields, invalidatedAt: null }); + return; + }; + + // Only clear invalidatedAt if it hasn't been changed during compute + const [rows] = await GaqSummaryRepository.updateAll( + { ...fields, invalidatedAt: null }, + { where: { dataPassId, runNumber, invalidatedAt: expectedInvalidatedAt } }, + ); + + if (rows === 0) { + // Write fresh summary fields but leave invalidatedAt unchanged + await GaqSummaryRepository.updateAll( + { ...fields }, + { where: { dataPassId, runNumber } }, + ); + } + } + + /** + * Remove invalid GAQ summaries and recalculate them + * @param {number} batchSize maximum number of invalid summaries to process + * @return {Promise} promise + */ + async popNInvalidSummaryAndRecalculate(batchSize = 1) { + const { rows, count } = await GaqSummaryRepository.findAndCountAll({ + where: { invalidatedAt: { [Op.not]: null } }, + order: [['invalidatedAt', 'ASC']], + limit: batchSize, + }); + + await Promise.all(rows.map(({ dataPassId, runNumber, invalidatedAt }) => + this.calculateAndStoreGaqSummary(dataPassId, runNumber, { expectedInvalidatedAt: invalidatedAt }))); + + return { processedCount: rows.length, totalInvalidCount: count }; + } } exports.GaqService = GaqService; diff --git a/lib/server/services/gaq/GaqWorker.js b/lib/server/services/gaq/GaqWorker.js new file mode 100644 index 0000000000..5e4cbccf7c --- /dev/null +++ b/lib/server/services/gaq/GaqWorker.js @@ -0,0 +1,127 @@ +const { gaqService } = require('../../services/gaq/GaqService.js'); +const { LogManager } = require('@aliceo2/web-ui'); + +// Tolerate brief blips before warning so transient overshoots don't spam logs +const OVERFLOW_THRESHOLD_TICKS = 5; + +// While the overflow persists, re-warn at this cadence so operators see "still bad" without log floods +const OVERFLOW_REMINDER_EVERY_TICKS = 30; + +/** + * Worker responsible for processing pending GAQ summary invalidations + */ +class GaqWorker { + /** + * Constructor + */ + constructor() { + this._logger = LogManager.getLogger(GaqWorker.name); + this._isPaused = false; + this._currentRun = null; + + // Adaptive batch size for the next tick. Null on first tick → falls back to the passed-in min. + this._nextBatchSize = null; + + this._overflowConsecutiveTicks = 0; + } + + /** + * Pause the worker so it skips future scheduled calls, and await any in-flight call to finish + * so callers can safely mutate shared state (e.g. drop tables in tests) once this resolves + * @return {Promise} resolves once the worker is idle and paused + */ + async pause() { + if (!this._isPaused) { + this._logger.infoMessage('Worker paused'); + } + this._isPaused = true; + if (this._currentRun) { + try { + await this._currentRun; + } catch { + // Already logged inside _doRecalculate + } + } + } + + /** + * Resume the worker after a pause + * @return {void} + */ + resume() { + if (this._isPaused) { + this._logger.infoMessage('Worker resumed'); + } + this._isPaused = false; + } + + /** + * Process pending GAQ summary invalidations. Skips if a previous call is still in progress or if paused. + * The batch size for this tick is clamped between min and max and adapts to the observed backlog. + * @param {number} minBatchSize lower bound on rows to fetch per tick + * @param {number} maxBatchSize upper bound on rows to fetch per tick + * @return {Promise} promise + */ + async recalculateGaqSummaries(minBatchSize, maxBatchSize) { + if (this._isPaused || this._currentRun) { + return; + } + this._currentRun = this._doRecalculate(minBatchSize, maxBatchSize); + try { + await this._currentRun; + } finally { + this._currentRun = null; + } + } + + /** + * Run a single recalculation pass; errors are logged but not rethrown + * @param {number} minBatchSize lower bound on rows to fetch per tick + * @param {number} maxBatchSize upper bound on rows to fetch per tick + * @return {Promise} promise + */ + async _doRecalculate(minBatchSize, maxBatchSize) { + const clamp = (n) => Math.min(maxBatchSize, Math.max(minBatchSize, n)); + const batchSize = clamp(this._nextBatchSize ?? minBatchSize); + + try { + const { processedCount, totalInvalidCount } = await gaqService.popNInvalidSummaryAndRecalculate(batchSize); + + // Adapt next tick's batch size to the observed backlog (clamped to the bounds) + this._nextBatchSize = clamp(totalInvalidCount); + + if (processedCount > 0) { + this._logger.infoMessage(`Processed ${processedCount} out of ${totalInvalidCount} ` + + `invalidated GAQ summaries (batch size: ${batchSize})`); + } + + // Overflow: backlog still exceeds the max even at the largest batch we'll fetch + if (totalInvalidCount > maxBatchSize) { + this._overflowConsecutiveTicks += 1; + + const ticks = this._overflowConsecutiveTicks; + const firstWarning = ticks === OVERFLOW_THRESHOLD_TICKS; + const reminderDue = ticks > OVERFLOW_THRESHOLD_TICKS + && (ticks - OVERFLOW_THRESHOLD_TICKS) % OVERFLOW_REMINDER_EVERY_TICKS === 0; + + if (firstWarning || reminderDue) { + this._logger.warnMessage(`Invalidated GAQ summary backlog (${totalInvalidCount}) has exceeded ` + + `the max batch size (${maxBatchSize}) for ${ticks} consecutive ticks. ` + + 'Consider raising GAQ_RECALCULATION_MAX_BATCH_SIZE or shortening GAQ_RECALCULATION_PERIOD.'); + } + } else { + if (this._overflowConsecutiveTicks >= OVERFLOW_THRESHOLD_TICKS) { + this._logger.infoMessage(`GAQ summary backlog recovered after ${this._overflowConsecutiveTicks} ` + + `consecutive overflow ticks (current backlog: ${totalInvalidCount})`); + } + this._overflowConsecutiveTicks = 0; + } + } catch (error) { + this._logger.errorMessage(`Error recalculating GAQ summaries: ${error.message}\n${error.stack}`); + } + } +} + +const gaqWorker = new GaqWorker(); + +exports.gaqWorker = gaqWorker; diff --git a/lib/server/services/qualityControlFlag/QcFlagService.js b/lib/server/services/qualityControlFlag/QcFlagService.js index 3d34e1446b..e389375453 100644 --- a/lib/server/services/qualityControlFlag/QcFlagService.js +++ b/lib/server/services/qualityControlFlag/QcFlagService.js @@ -20,6 +20,7 @@ const { RunRepository, QcFlagVerificationRepository, QcFlagEffectivePeriodRepository, + GaqSummaryRepository, }, } = require('../../../database/index.js'); const { dataSource } = require('../../../database/DataSource.js'); @@ -209,6 +210,14 @@ class QcFlagService { ], }); + /** + * Invalidate GAQ summary for the dataPass and runNumber of the created flag. + * Skip when `verify` is true: verifyFlag() above already invalidated the same (dataPassId, runNumber). + */ + if (dataPass && !verify) { + await GaqSummaryRepository.invalidate(dataPass.id, runNumber); + } + createdFlags.push(qcFlagAdapter.toEntity(createdFlag)); } catch (error) { this._logger.warnMessage(`Failed to create QC flag with properties: ${JSON.stringify(qcFlag)}. Error: ${error}`); @@ -284,6 +293,12 @@ class QcFlagService { { const { id, from, to, origin, createdById, runNumber, dplDetectorId, flagTypeId, createdAt } = qcFlag; + + if (dataPassId) { + // Invalidate GAQ summary for the dataPass and runNumber of the deleted flag + await GaqSummaryRepository.invalidate(dataPassId, runNumber); + } + const qcFlagPropertiesToLog = { id, from, @@ -338,8 +353,9 @@ class QcFlagService { await QcFlagEffectivePeriodRepository.removeAll({ where: { id: { [Op.in]: effectivePeriodIds } } }); // Sequelize update requires a where and can't work only using association - const qcFlagIds = (await QcFlagRepository.findAll({ - attributes: ['id'], + const qcFlagIdsToRunNumbers = (await QcFlagRepository.findAll({ + attributes: ['id', 'runNumber'], + where: { deleted: false }, include: { association: 'dataPasses', attributes: [], @@ -349,13 +365,19 @@ class QcFlagService { }, }, raw: true, - })).map(({ id }) => id); + })).map(({ id, runNumber }) => ({ id, runNumber })); + + const qcFlagIds = qcFlagIdsToRunNumbers.map(({ id }) => id); + const runNumbers = new Set(qcFlagIdsToRunNumbers.map(({ runNumber }) => runNumber)); await QcFlagRepository.updateAll( { deleted: true }, { where: { id: qcFlagIds } }, ); + // Invalidate GAQ summary for the dataPass and all runNumbers of the deleted flags + await GaqSummaryRepository.invalidateMany(Array.from(runNumbers, (runNumber) => ({ dataPassId, runNumber }))); + return qcFlagIds.length; }); } @@ -389,7 +411,18 @@ class QcFlagService { createdById: user.id, }); - return await this.getOneOrFail(flagId); + const updatedQcFlag = await this.getOneOrFail(flagId); + + // Get data pass id for the flag (if exists) to invalidate only related GAQ summaries + const dataPassQcFlag = await DataPassQcFlagRepository.findOne({ where: { qualityControlFlagId: flagId } }); + const dataPassId = dataPassQcFlag?.dataPassId; + + // Invalidate GAQ summary if it's the first verification + if (dataPassId && (!qcFlag.verifications || qcFlag.verifications.length === 0)) { + await GaqSummaryRepository.invalidate(dataPassId, updatedQcFlag.runNumber); + } + + return updatedQcFlag; }, { transaction }); } diff --git a/lib/server/services/run/updateRun.js b/lib/server/services/run/updateRun.js index 2e8b50048c..f4b9f0e69a 100644 --- a/lib/server/services/run/updateRun.js +++ b/lib/server/services/run/updateRun.js @@ -12,6 +12,8 @@ */ const RunRepository = require('../../../database/repositories/RunRepository.js'); +const GaqSummaryRepository = require('../../../database/repositories/GaqSummaryRepository.js'); +const DataPassRunRepository = require('../../../database/repositories/DataPassRunRepository.js'); const { getRunOrFail } = require('./getRunOrFail.js'); const { utilities: { TransactionHelper } } = require('../../../database'); const { checkLhcFill } = require('../../../usecases/lhcFill/checkLhcFill.js'); @@ -35,6 +37,20 @@ const { logSpecificRunTag } = require('./logEntriesCreation/logSpecificRunTag.js */ const TAGS_TO_LOG = ['Not for physics']; +/** + * Run patch fields whose values feed the qc_time_start / qc_time_end virtual columns + * + * @type {string[]} + */ +const QC_TIME_SOURCE_FIELDS = [ + 'firstTfTimestamp', + 'lastTfTimestamp', + 'timeTrgStart', + 'timeTrgEnd', + 'timeO2Start', + 'timeO2End', +]; + /** * Update the given run * @@ -82,7 +98,12 @@ exports.updateRun = async (identifier, payload, transaction) => { } // Store the run quality to create a log if it changed - const previousRun = { runQuality: runModel?.runQuality, calibrationStatus: runModel?.calibrationStatus }; + const previousRun = { + runQuality: runModel?.runQuality, + calibrationStatus: runModel?.calibrationStatus, + qcTimeStart: runModel?.qcTimeStart, + qcTimeEnd: runModel?.qcTimeEnd, + }; runPatch.definition = runPatch.definition ?? getRunDefinition({ ...runModel.dataValues, @@ -204,6 +225,20 @@ exports.updateRun = async (identifier, payload, transaction) => { } } + // Only check for invalidation when the patch could have changed them + if (QC_TIME_SOURCE_FIELDS.some((field) => field in runPatch)) { + // Reload because qcTimeStart/qcTimeEnd are virtual columns not in the patch and stay stale after update + await runModel.reload(); + if (previousRun.qcTimeStart?.getTime() !== runModel.qcTimeStart?.getTime() + || previousRun.qcTimeEnd?.getTime() !== runModel.qcTimeEnd?.getTime()) { + const dataPassRuns = await DataPassRunRepository.findAll({ + attributes: ['dataPassId'], + where: { runNumber: runModel.runNumber }, + }); + await GaqSummaryRepository.invalidateMany(dataPassRuns.map(({ dataPassId }) => ({ dataPassId, runNumber: runModel.runNumber }))); + } + } + return runModel; }, { transaction }); }; diff --git a/lib/usecases/run/GetAllRunsUseCase.js b/lib/usecases/run/GetAllRunsUseCase.js index df1b5f7f5b..3fd43b8f76 100644 --- a/lib/usecases/run/GetAllRunsUseCase.js +++ b/lib/usecases/run/GetAllRunsUseCase.js @@ -20,7 +20,7 @@ const sequelize = require('sequelize'); const { EorReasonRepository } = require('../../database/repositories'); const { PhysicalConstant } = require('../../domain/enums/PhysicalConstant'); const { BadParameterError } = require('../../server/errors/BadParameterError'); -const { gaqService } = require('../../server/services/qualityControlFlag/GaqService.js'); +const { gaqService } = require('../../server/services/gaq/GaqService.js'); const { qcFlagSummaryService } = require('../../server/services/qualityControlFlag/QcFlagSummaryService.js'); const { DetectorType } = require('../../domain/enums/DetectorTypes.js'); const { unpackNumberRange } = require('../../utilities/rangeUtils.js'); diff --git a/test/api/qcFlags.test.js b/test/api/qcFlags.test.js index ee67961439..092df4d883 100644 --- a/test/api/qcFlags.test.js +++ b/test/api/qcFlags.test.js @@ -1110,7 +1110,7 @@ module.exports = () => { .delete(`/api/qcFlags/perDataPass?dataPassId=${dataPassId}&token=${BkpRoles.DPG_ASYNC_QC_ADMIN}`); expect(response.status).to.be.equal(200); - expect(response.body.data.deletedCount).to.equal(11); // 9 from seeders, 2 created in POST requests previously in this test + expect(response.body.data.deletedCount).to.equal(10); // 9 from seeders, 2 created in POST requests previously in this test, 1 already soft-deleted }); }); diff --git a/test/lib/server/services/gaq/GaqService.test.js b/test/lib/server/services/gaq/GaqService.test.js new file mode 100644 index 0000000000..b2a96c49e9 --- /dev/null +++ b/test/lib/server/services/gaq/GaqService.test.js @@ -0,0 +1,186 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +const { expect } = require('chai'); +const sinon = require('sinon'); +const { resetDatabaseContent } = require('../../../../utilities/resetDatabaseContent.js'); +const { repositories: { GaqSummaryRepository} } = require('../../../../../lib/database'); +const { gaqService } = require('../../../../../lib/server/services/gaq/GaqService.js'); +const { Op } = require('sequelize'); + +/** + * Find the GAQ summary row for a given data pass and run + * @param {number} dataPassId data pass id + * @param {number} runNumber run number + * @return {Promise} + */ +const findSummary = (dataPassId, runNumber) => GaqSummaryRepository.findOne({ where: { dataPassId, runNumber } }); + +/** + * Insert an invalidation entry + * @param {number} dataPassId data pass id + * @param {number} runNumber run number + * @param {string} createdAt ISO timestamp string + * @return {Promise} + */ +const insertInvalidation = (dataPassId, runNumber, createdAt) => + GaqSummaryRepository.upsert({ dataPassId, runNumber, invalidatedAt: new Date(createdAt) }); + +// Tests for GaqService are split between QcFlagService.test.js and GaqSummary.test.js +// GaqService.test.js (this file) focuses on the summary recalculation and invalidation processing logic + +module.exports = () => { + before(resetDatabaseContent); + + // Data pass 1 (LHC22b_apass1), run 107 has GAQ detectors CPV (1) and ACO (2) seeded + // and has seeded QC flags, so a summary can always be computed + const dataPassId = 1; + const runNumber = 107; + + describe('calculateAndStoreGaqSummary', () => { + afterEach(async () => { + await GaqSummaryRepository.removeAll({ where: { dataPassId, runNumber } }); + }); + + it('should compute and store a summary row with correct values', async () => { + await gaqService.calculateAndStoreGaqSummary(dataPassId, runNumber); + + const summary = await findSummary(dataPassId, runNumber); + expect(summary).to.not.be.null; + expect(summary.dataPassId).to.equal(dataPassId); + expect(summary.runNumber).to.equal(runNumber); + expect(summary.badRunCoverage).to.equal(0); + expect(summary.explicitlyNotBadRunCoverage).to.equal(0.759654); + expect(summary.mcReproducibleCoverage).to.equal(0.240346); + expect(summary.missingVerificationsCount).to.equal(3); + expect(summary.undefinedQualityPeriodsCount).to.equal(0); + }); + + it('should upsert when a summary already exists', async () => { + await gaqService.calculateAndStoreGaqSummary(dataPassId, runNumber); + await gaqService.calculateAndStoreGaqSummary(dataPassId, runNumber); + + const rows = await GaqSummaryRepository.findAll({ where: { dataPassId, runNumber } }); + expect(rows).to.have.lengthOf(1); + }); + + it('should store a summary with notComputable set to true when there is no coverage data for the run', async () => { + // Run 49 has no QC flags seeded for data pass 1 + await gaqService.calculateAndStoreGaqSummary(dataPassId, 49); + + const summary = await findSummary(dataPassId, 49); + expect(summary).to.not.be.null; + expect(summary.notComputable).to.be.true; + + await GaqSummaryRepository.removeAll({ where: { dataPassId, runNumber: 49 } }); + }); + + it('should clear stale coverage fields when a previously-computable row becomes notComputable', async () => { + // Seed a row that has values but will become notComputable after recalculation due to missing QC flags + const staleRunNumber = 49; + await GaqSummaryRepository.upsert({ + dataPassId, + runNumber: staleRunNumber, + badRunCoverage: 0.5, + explicitlyNotBadRunCoverage: 0.4, + mcReproducibleCoverage: 0.1, + missingVerificationsCount: 2, + undefinedQualityPeriodsCount: 1, + notComputable: false, + }); + + // Run 49 has no QC flags seeded for data pass 1, so _computeSummary returns null + await gaqService.calculateAndStoreGaqSummary(dataPassId, staleRunNumber); + + const summary = await findSummary(dataPassId, staleRunNumber); + expect(summary).to.not.be.null; + expect(summary.notComputable).to.be.true; + expect(summary.badRunCoverage).to.be.null; + expect(summary.explicitlyNotBadRunCoverage).to.be.null; + expect(summary.mcReproducibleCoverage).to.be.null; + expect(summary.missingVerificationsCount).to.be.null; + expect(summary.undefinedQualityPeriodsCount).to.be.null; + + await GaqSummaryRepository.removeAll({ where: { dataPassId, runNumber: staleRunNumber } }); + }); + + it('should not clear invalidatedAt if the row was re-invalidated during compute', async () => { + // Seed an initial invalidation so the row has a known invalidatedAt + await GaqSummaryRepository.invalidate(dataPassId, runNumber); + const initial = await findSummary(dataPassId, runNumber); + expect(initial.invalidatedAt).to.not.be.null; + + // Simulate a concurrent invalidate by mocking _computeSummary to invalidate the row again but still return a valid summary + sinon.stub(gaqService, '_computeSummary').callsFake(async () => { + await GaqSummaryRepository.invalidate(dataPassId, runNumber); + return { + badRunCoverage: 0, + explicitlyNotBadRunCoverage: 1, + mcReproducibleCoverage: 0, + missingVerificationsCount: 0, + undefinedQualityPeriodsCount: 0, + }; + }); + + try { + await gaqService.calculateAndStoreGaqSummary( + dataPassId, + runNumber, + { expectedInvalidatedAt: initial.invalidatedAt }, + ); + } finally { + sinon.restore(); + } + + const after = await findSummary(dataPassId, runNumber); + // InvalidatedAt should not have been cleared because the row was re-invalidated during compute + expect(after.invalidatedAt).to.not.be.null; + expect(after.invalidatedAt).to.be.greaterThan(initial.invalidatedAt); + }); + }); + + describe('popNInvalidSummaryAndRecalculate', () => { + beforeEach(async () => { + await GaqSummaryRepository.updateAll({ invalidatedAt: null }, { where: {} }); + }); + + it('should do nothing when the invalidation table is empty', async () => { + await gaqService.popNInvalidSummaryAndRecalculate(5); + // No error thrown and nothing written + const summary = await findSummary(dataPassId, runNumber); + expect(summary).to.be.null; + }); + + it('should process exactly N invalidations ordered by createdAt', async () => { + // Insert invalidations for runs 106 and 107 in data pass 1 with different timestamps + await insertInvalidation(dataPassId, 106, '2024-01-01 10:00:00'); + await insertInvalidation(dataPassId, 107, '2024-01-01 11:00:00'); + + // Process only 1 — should pick run 106 (oldest) + await gaqService.popNInvalidSummaryAndRecalculate(1); + + const remaining = await GaqSummaryRepository.findOne({ where: { dataPassId, runNumber: 107, invalidatedAt: { [Op.not]: null } } }); + expect(remaining).to.not.be.null; + }); + + it('should process all invalidations when batchSize covers them all', async () => { + await insertInvalidation(dataPassId, 106, '2024-01-01 10:00:00'); + await insertInvalidation(dataPassId, 107, '2024-01-01 11:00:00'); + + await gaqService.popNInvalidSummaryAndRecalculate(10); + + const count = await GaqSummaryRepository.count({ where: { dataPassId, invalidatedAt: { [Op.not]: null } } }); + expect(count).to.equal(0); + }); + }); +}; diff --git a/test/lib/server/services/gaq/GaqSummary.test.js b/test/lib/server/services/gaq/GaqSummary.test.js new file mode 100644 index 0000000000..fc15274711 --- /dev/null +++ b/test/lib/server/services/gaq/GaqSummary.test.js @@ -0,0 +1,260 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +const { expect } = require('chai'); +const sinon = require('sinon'); +const { resetDatabaseContent } = require('../../../../utilities/resetDatabaseContent.js'); +const { repositories: { GaqSummaryRepository } } = require('../../../../../lib/database'); +const { qcFlagService } = require('../../../../../lib/server/services/qualityControlFlag/QcFlagService.js'); +const { gaqDetectorService } = require('../../../../../lib/server/services/gaq/GaqDetectorsService.js'); +const { updateRun } = require('../../../../../lib/server/services/run/updateRun.js'); +const { gaqWorker } = require('../../../../../lib/server/services/gaq/GaqWorker.js'); +const { gaqService } = require('../../../../../lib/server/services/gaq/GaqService.js'); + +/** + * Wait for a given number of milliseconds + * @param {number} ms milliseconds to wait + * @return {Promise} + */ +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +/** + * Check whether an invalidation entry exists for a given data pass and run + * + * @param {number} expectedDataPassId + * @param {number} expectedRunNumber + * @param {boolean} toBeNull + * + * @return {Promise} + */ +const expectInvalidation = async (expectedDataPassId, expectedRunNumber, toBeNull = false) => { + const summary = await GaqSummaryRepository.findOne({ + where: { dataPassId: expectedDataPassId, runNumber: expectedRunNumber }, + }); + if (toBeNull) { + expect(summary?.invalidatedAt, `Expected no invalidation for dataPassId=${expectedDataPassId} runNumber=${expectedRunNumber}`).to.be.null; + } else { + expect(summary?.invalidatedAt, `Expected invalidation for dataPassId=${expectedDataPassId} runNumber=${expectedRunNumber}`).to.not.be.null; + } +}; + +module.exports = () => { + before(async () => { + await resetDatabaseContent(); + }); + + const relations = { user: { roles: ['admin'], externalUserId: 1 } }; + const dataPassId = 4; // LHC22a_apass2 + const runNumber = 56; + + describe("GAQ Summary Invalidation", async () => { + before(() => gaqWorker.pause()); + after(() => gaqWorker.resume()); + + // Resetting the invalidated column between each case + afterEach(async () => { + await GaqSummaryRepository.updateAll({ invalidatedAt: null }, { where: {} }); + }); + + it('should invalidate GAQ summary when a QC flag is created for a data pass', async () => { + await qcFlagService.create( + [{ from: null, to: null, flagTypeId: 3 }], + { runNumber, detectorIdentifier: { detectorId: 7 }, dataPassIdentifier: { id: dataPassId } }, + relations, + ); + + await expectInvalidation(dataPassId, runNumber); + }); + + it('should invalidate GAQ summary when a QC flag is verified for the first time for a data pass', async () => { + const flagId = 8; // Seeded flag in data pass 4, run 100, with no verifications + + await qcFlagService.verifyFlag({ flagId }, relations); + + await expectInvalidation(dataPassId, 100); + + // Clear invalidation + await GaqSummaryRepository.updateAll({ invalidatedAt: null }, { where: { dataPassId, runNumber: 100 } }); + + // Verify again to check that no new invalidation is created when the flag is already verified + await qcFlagService.verifyFlag({ flagId }, relations); + await expectInvalidation(dataPassId, 100, true); + }); + + it('should invalidate GAQ summary when a QC flag is deleted for a data pass', async () => { + const flagId = 9; // Seeded flag in data pass 4, run 105, with no verifications (so deletion is allowed) + await qcFlagService.delete(flagId); + + await expectInvalidation(dataPassId, 105); + }); + + it('should invalidate GAQ summary for all runs when all QC flags are deleted for a data pass', async () => { + await qcFlagService.deleteAllForDataPass(dataPassId); + + await expectInvalidation(dataPassId, 100); + }); + + it('should invalidate GAQ summary when GAQ detectors are explicitly set for a data pass and run', async () => { + const gaqDataPassId = 3; // LHC22a_apass1 (has run 56 and detectors set up in GaqDetectorService tests) + const detectorIds = [4, 7]; + + await gaqDetectorService.setGaqDetectors(gaqDataPassId, [runNumber], detectorIds); + + await expectInvalidation(gaqDataPassId, runNumber); + }); + + it('should invalidate GAQ summary when default GAQ detectors are used for a data pass and run', async () => { + const gaqDataPassId = 3; + + await gaqDetectorService.useDefaultGaqDetectors(gaqDataPassId, [runNumber]); + + await expectInvalidation(gaqDataPassId, runNumber); + }); + + it('should invalidate GAQ summary when the QC time of a run changes', async () => { + await updateRun( + { runNumber }, + { runPatch: { timeTrgStart: new Date('2019-08-08 20:30:00') } }, + ); + + await expectInvalidation(dataPassId, runNumber); + }); + + it('should NOT invalidate GAQ summary when updateRun patch does not touch QC time source fields', async () => { + await updateRun( + { runNumber }, + { runPatch: { definition: 'PHYSICS' } }, + ); + + await expectInvalidation(dataPassId, runNumber, true); + }); + + it('should NOT invalidate GAQ summary when a QC time source field is patched but qcTimeStart/qcTimeEnd do not change', async () => { + // Run 56 has a non-null time_trg_start, so qc_time_start resolves to time_trg_start regardless of time_o2_start. + // Patching only time_o2_start moves a source field but does not change the virtual qcTimeStart / qcTimeEnd, so no invalidation should fire. + await updateRun( + { runNumber }, + { runPatch: { timeO2Start: new Date('2019-08-08 18:00:00') } }, + ); + + await expectInvalidation(dataPassId, runNumber, true); + }); + }); + + describe('GAQ Worker', () => { + beforeEach(async () => { + await resetDatabaseContent(); + }); + + after(() => gaqWorker.pause()); + + it('should process invalidations and update the summary', async () => { + const workerDataPassId = 1; + const workerRunNumber = 107; + + await qcFlagService.create( + [{ from: null, to: null, flagTypeId: 3 }], + { runNumber: workerRunNumber, detectorIdentifier: { detectorId: 1 }, dataPassIdentifier: { id: workerDataPassId } }, + relations, + ); + + // confirm that the invalidation is made + await expectInvalidation(workerDataPassId, workerRunNumber); + + // wait at least 2s, recalculation period is 1s in test env, for the worker to process the invalidation + await sleep(2000); + + await expectInvalidation(workerDataPassId, workerRunNumber, true); + const summary = await GaqSummaryRepository.findOne({ where: { dataPassId: workerDataPassId, runNumber: workerRunNumber } }); + expect(summary.badRunCoverage).to.not.be.null; + }); + + it('should only upsert an existing summary row rather than creating a duplicate', async () => { + const workerDataPassId = 1; + const workerRunNumber = 107; + + await gaqService.calculateAndStoreGaqSummary(workerDataPassId, workerRunNumber); + const firstSummary = await GaqSummaryRepository.findOne({ where: { dataPassId: workerDataPassId, runNumber: workerRunNumber } }); + expect(firstSummary).to.not.be.null; + + // Trigger an invalidation + await qcFlagService.create( + [{ from: null, to: null, flagTypeId: 3 }], + { runNumber: workerRunNumber, detectorIdentifier: { detectorId: 1 }, dataPassIdentifier: { id: workerDataPassId } }, + relations, + ); + await expectInvalidation(workerDataPassId, workerRunNumber); + + await sleep(2000); + + await expectInvalidation(workerDataPassId, workerRunNumber, true); + + // confirm only one summary row exists (upsert, not duplicate) + const summaries = await GaqSummaryRepository.findAll({ where: { dataPassId: workerDataPassId, runNumber: workerRunNumber } }); + expect(summaries).to.have.lengthOf(1); + }); + + it('should process multiple invalidations in a single batch', async () => { + // Create invalidations for two different runs in data pass 1 + await qcFlagService.create( + [{ from: null, to: null, flagTypeId: 3 }], + { runNumber: 106, detectorIdentifier: { detectorId: 1 }, dataPassIdentifier: { id: 1 } }, + relations, + ); + await qcFlagService.create( + [{ from: null, to: null, flagTypeId: 3 }], + { runNumber: 107, detectorIdentifier: { detectorId: 1 }, dataPassIdentifier: { id: 1 } }, + relations, + ); + + await expectInvalidation(1, 106); + await expectInvalidation(1, 107); + + // Manually call the worker with min/max batchSize=2 to process both in one go + await gaqWorker.recalculateGaqSummaries(2, 2); + + await expectInvalidation(1, 106, true); + await expectInvalidation(1, 107, true); + + const summary106 = await GaqSummaryRepository.findOne({ where: { dataPassId: 1, runNumber: 106 } }); + const summary107 = await GaqSummaryRepository.findOne({ where: { dataPassId: 1, runNumber: 107 } }); + expect(summary106).to.not.be.null; + expect(summary107).to.not.be.null; + }); + + it('should skip processing if a previous call is still in progress', async () => { + // Stub gaqService to be slow so the first call blocks + let resolveFirst; + const slowPromise = new Promise((resolve) => { resolveFirst = resolve; }); + const stub = sinon.stub(gaqService, 'popNInvalidSummaryAndRecalculate').returns(slowPromise); + + try { + // First call — will be held open by the slow stub + const firstCall = gaqWorker.recalculateGaqSummaries(1, 1); + + // Second call — should be skipped because a previous run is still in flight + await gaqWorker.recalculateGaqSummaries(1, 1); + + // Stub should only have been called once + expect(stub.callCount).to.equal(1); + + // Release the first call with the shape popNInvalidSummaryAndRecalculate normally returns + resolveFirst({ processedCount: 0, totalInvalidCount: 0 }); + await firstCall; + } finally { + sinon.restore(); + } + }); + + }); +}; diff --git a/test/lib/server/services/gaq/index.js b/test/lib/server/services/gaq/index.js index 985f80fe84..32200b17d0 100644 --- a/test/lib/server/services/gaq/index.js +++ b/test/lib/server/services/gaq/index.js @@ -12,7 +12,11 @@ */ const GaqDetectorServiceSuite = require('./GaqDetectorService.test.js'); +const GaqServiceSuite = require('./GaqService.test.js'); +const GaqSummarySuite = require('./GaqSummary.test.js'); module.exports = () => { describe('GaqDetectorService', GaqDetectorServiceSuite); + describe('GaqService', GaqServiceSuite); + describe('GaqSummary', GaqSummarySuite); }; diff --git a/test/lib/server/services/index.js b/test/lib/server/services/index.js index 01df33fcaf..38aabbf6dd 100644 --- a/test/lib/server/services/index.js +++ b/test/lib/server/services/index.js @@ -32,7 +32,7 @@ const UserSuite = require('./user/index.js'); const SimulationPassesSuite = require('./simulationPasses/index.js'); const QcFlagsSuite = require('./qualityControlFlag/index.js'); const CtpTriggerCountersSuite = require('./ctpTriggerCounters/index.js'); -const GaqDetectorSuite = require('./gaq'); +const GaqSuite = require('./gaq'); module.exports = () => { before(resetDatabaseContent); @@ -46,7 +46,7 @@ module.exports = () => { describe('Environment history item', EnvironmentHistoryItemSuite); describe('EOS report', EosReportSuite); describe('Flp role', FlpRoleSuite); - describe('GaqDetector', GaqDetectorSuite); + describe('GAQ', GaqSuite); describe('LHC fill suite', LhcFillSuite); describe('Logs', LogSuite); describe('RunType', RunTypeSuite); diff --git a/test/lib/server/services/qualityControlFlag/QcFlagService.test.js b/test/lib/server/services/qualityControlFlag/QcFlagService.test.js index f4c533444f..e62fd06b4e 100644 --- a/test/lib/server/services/qualityControlFlag/QcFlagService.test.js +++ b/test/lib/server/services/qualityControlFlag/QcFlagService.test.js @@ -21,7 +21,7 @@ const { Op } = require('sequelize'); const { qcFlagAdapter } = require('../../../../../lib/database/adapters'); const { runService } = require('../../../../../lib/server/services/run/RunService'); const { gaqDetectorService } = require('../../../../../lib/server/services/gaq/GaqDetectorsService'); -const { gaqService } = require('../../../../../lib/server/services/qualityControlFlag/GaqService.js'); +const { gaqService } = require('../../../../../lib/server/services/gaq/GaqService.js'); const { qcFlagSummaryService } = require('../../../../../lib/server/services/qualityControlFlag/QcFlagSummaryService.js'); const { dataPassService } = require('../../../../../lib/server/services/dataPasses/DataPassService.js'); diff --git a/test/utilities/resetDatabaseContent.js b/test/utilities/resetDatabaseContent.js index 9e04ebddab..79d9996f7d 100644 --- a/test/utilities/resetDatabaseContent.js +++ b/test/utilities/resetDatabaseContent.js @@ -12,9 +12,14 @@ */ const { database } = require('../../lib/application.js'); +const { gaqWorker } = require('../../lib/server/services/gaq/GaqWorker.js'); exports.resetDatabaseContent = async () => { + // Pause GAQ worker and await any in-flight call before dropping tables, otherwise a tick + // already past the guard would hit dropped tables and log a spurious ERROR + await gaqWorker.pause(); await database.dropAllTables(); await database.migrate(); await database.seed(); + gaqWorker.resume(); };