Files
gps-backend/library/LibQueueBlastOrder.js
2025-05-29 08:59:40 +00:00

82 lines
2.7 KiB
JavaScript
Executable File

const path = require('path');
const RedisConn = require('./LibRedisConn');
const QueueMQ = require('bullmq').Queue;
const QueueSchedulerMQ = require('bullmq').QueueScheduler;
const WorkerMQ = require('bullmq').Worker;
// const JobMQ = require('bullmq').Job;
const LibWinston = require('./LibWinston');
const EventEmitter = require('events');
const queueName = process.env.REDIS_QUEUE_BLAST_ORDER;
const theQueue = new QueueMQ(queueName, { connection: RedisConn });
const theQueueScheduler = new QueueSchedulerMQ(queueName, { connection: RedisConn });
const Logger = LibWinston.initialize(queueName);
class MyEmitter extends EventEmitter { };
const myEmitter = new MyEmitter();
/**
* bisa multiple queue tanpa new Queue(name) lagi, tinggal dibedain aja nama processornya ketika queue.add(nameProcessor)
* worker hanya memproses nama sesuai dengan nameProcessor bukan nama ketika new Queue(name)
* concurrency di worker adalah maksimal yang bisa diproses dalam satu waktu bersamaan
* attempts di queue adalah maksimal percobaan ketika gagal
*/
const LibQueueBlastOrder = {
name: queueName,
queue: theQueue,
addQueue: function (data) {
this.queue.add(queueName, data, {
removeOnComplete: false, // false just for debugging
delay: data.delay,
});
},
pauseQueue: function () {
this.queue.pause();
},
resumeQueue: function () {
this.queue.resume();
},
getJobs: async function () {
return await this.queue.getJobs();
},
getRepeatableJobs: async function () {
return await this.queue.getRepeatableJobs();
},
setWorker: function () {
try {
const processorFile = path.join(__dirname, '../workers/BlastOrderWorker.js');
const schedulerWorker = new WorkerMQ(queueName, processorFile, {
connection: RedisConn,
concurrency: 1,
})
schedulerWorker.on('completed', async (job, returnValue) => {
Logger.log('info', `${queueName} completed: ${JSON.stringify(returnValue)}`);
})
schedulerWorker.on('failed', async (job, failedReason) => {
Logger.log('error', `${queueName} failed: ${JSON.stringify(failedReason)}`);
});
schedulerWorker.on('error', async (errMsg) => {
Logger.log('error', `${queueName} error: ${JSON.stringify(errMsg)}`);
});
myEmitter.on(queueName + 'ShutDown', () => {
// gracefull shutdown
schedulerWorker.close();
});
} catch (e) {
Logger.log('error', `${queueName} worker_error: ${JSON.stringify(e, Object.getOwnPropertyNames(e))}`);
}
},
drainQueue: function () {
this.queue.drain();
},
obliterateQueue: function () {
this.queue.obliterate();
},
workerShutDown: function () {
myEmitter.emit(queueName + 'ShutDown');
}
}
module.exports = LibQueueBlastOrder