Files
gps-backend/library/LibSchedulerDrvBlastNotif.js
2025-05-29 08:59:40 +00:00

93 lines
3.1 KiB
JavaScript
Executable File

const path = require('path');
const RedisConn = require('./LibRedisConn');
const QueueMQ = require('bullmq').Queue;
const QueueSchedulerMQ = require('bullmq').QueueScheduler;
const WorkerMQ = require('bullmq').Worker;
// const JobMQ = require('bullmq').Job;
const LibWinston = require('./LibWinston');
const EventEmitter = require('events');
const schedulerName = process.env.REDIS_SCHEDULER_DRV_BLAST_NOTIF;
const theQueue = new QueueMQ(schedulerName, { connection: RedisConn });
const theQueueScheduler = new QueueSchedulerMQ(schedulerName, { connection: RedisConn });
const Logger = LibWinston.initialize(schedulerName);
class MyEmitter extends EventEmitter { };
const myEmitter = new MyEmitter();
/**
* bisa multiple queue tanpa new Queue(name) lagi, tinggal dibedain aja nama processornya ketika queue.add(nameProcessor)
* worker hanya memproses nama sesuai dengan nameProcessor bukan nama ketika new Queue(name)
* concurrency di worker adalah maksimal yang bisa diproses dalam satu waktu bersamaan
* attempts di queue adalah maksimal percobaan ketika gagal
*/
const LibSchedulerDrvBlastNotif = {
name: schedulerName,
queue: theQueue,
runQueueScheduler: function (data) {
// * * * * * // every minute
// */3 * * * * // every 3 minute
// 0 */1 * * * // every hour
this.queue.add(schedulerName, data, {
removeOnComplete: false, // false just for debugging
repeat: {
cron: process.env.SCHEDULE_DRV_BLAST_NOTIF_TIME,
},
attempts: 0,
});
},
addQueue: function (data) {
this.queue.add(schedulerName, data, {
removeOnComplete: false, // false just for debugging
});
},
pauseQueue: function () {
this.queue.pause();
},
resumeQueue: function () {
this.queue.resume();
},
getJobs: async function () {
return await this.queue.getJobs();
},
getRepeatableJobs: async function () {
return await this.queue.getRepeatableJobs();
},
setWorker: function () {
try {
const processorFile = path.join(__dirname, '../workers/DrvBlastNotifWorker.js');
const schedulerWorker = new WorkerMQ(schedulerName, processorFile, {
connection: RedisConn,
concurrency: 1,
})
schedulerWorker.on('completed', async (job, returnValue) => {
Logger.log('info', `${schedulerName} completed: ${JSON.stringify(returnValue)}`);
})
schedulerWorker.on('failed', async (job, failedReason) => {
Logger.log('error', `${schedulerName} failed: ${JSON.stringify(failedReason)}`);
});
schedulerWorker.on('error', async (errMsg) => {
Logger.log('error', `${schedulerName} error: ${JSON.stringify(errMsg)}`);
});
myEmitter.on(schedulerName + 'ShutDown', () => {
// gracefull shutdown
schedulerWorker.close();
});
} catch (e) {
Logger.log('error', `${schedulerName} worker_error: ${JSON.stringify(e, Object.getOwnPropertyNames(e))}`);
}
},
drainQueue: function () {
this.queue.drain();
},
obliterateQueue: function () {
this.queue.obliterate();
},
workerShutDown: function () {
myEmitter.emit(schedulerName + 'ShutDown');
}
}
module.exports = LibSchedulerDrvBlastNotif