const path = require('path'); const RedisConn = require('./LibRedisConn'); const QueueMQ = require('bullmq').Queue; const QueueSchedulerMQ = require('bullmq').QueueScheduler; const WorkerMQ = require('bullmq').Worker; // const JobMQ = require('bullmq').Job; const LibWinston = require('./LibWinston'); const EventEmitter = require('events'); const queueName = process.env.REDIS_QUEUE_BLAST_ORDER; const theQueue = new QueueMQ(queueName, { connection: RedisConn }); const theQueueScheduler = new QueueSchedulerMQ(queueName, { connection: RedisConn }); const Logger = LibWinston.initialize(queueName); class MyEmitter extends EventEmitter { }; const myEmitter = new MyEmitter(); /** * bisa multiple queue tanpa new Queue(name) lagi, tinggal dibedain aja nama processornya ketika queue.add(nameProcessor) * worker hanya memproses nama sesuai dengan nameProcessor bukan nama ketika new Queue(name) * concurrency di worker adalah maksimal yang bisa diproses dalam satu waktu bersamaan * attempts di queue adalah maksimal percobaan ketika gagal */ const LibQueueBlastOrder = { name: queueName, queue: theQueue, addQueue: function (data) { this.queue.add(queueName, data, { removeOnComplete: false, // false just for debugging delay: data.delay, }); }, pauseQueue: function () { this.queue.pause(); }, resumeQueue: function () { this.queue.resume(); }, getJobs: async function () { return await this.queue.getJobs(); }, getRepeatableJobs: async function () { return await this.queue.getRepeatableJobs(); }, setWorker: function () { try { const processorFile = path.join(__dirname, '../workers/BlastOrderWorker.js'); const schedulerWorker = new WorkerMQ(queueName, processorFile, { connection: RedisConn, concurrency: 1, }) schedulerWorker.on('completed', async (job, returnValue) => { Logger.log('info', `${queueName} completed: ${JSON.stringify(returnValue)}`); }) schedulerWorker.on('failed', async (job, failedReason) => { Logger.log('error', `${queueName} failed: ${JSON.stringify(failedReason)}`); }); schedulerWorker.on('error', async (errMsg) => { Logger.log('error', `${queueName} error: ${JSON.stringify(errMsg)}`); }); myEmitter.on(queueName + 'ShutDown', () => { // gracefull shutdown schedulerWorker.close(); }); } catch (e) { Logger.log('error', `${queueName} worker_error: ${JSON.stringify(e, Object.getOwnPropertyNames(e))}`); } }, drainQueue: function () { this.queue.drain(); }, obliterateQueue: function () { this.queue.obliterate(); }, workerShutDown: function () { myEmitter.emit(queueName + 'ShutDown'); } } module.exports = LibQueueBlastOrder