93 lines
3.1 KiB
JavaScript
Executable File
93 lines
3.1 KiB
JavaScript
Executable File
const path = require('path');
|
|
const RedisConn = require('./LibRedisConn');
|
|
const QueueMQ = require('bullmq').Queue;
|
|
const QueueSchedulerMQ = require('bullmq').QueueScheduler;
|
|
const WorkerMQ = require('bullmq').Worker;
|
|
// const JobMQ = require('bullmq').Job;
|
|
const LibWinston = require('./LibWinston');
|
|
const EventEmitter = require('events');
|
|
|
|
const schedulerName = process.env.REDIS_SCHEDULER_DRV_UP_LOC_IDLE;
|
|
const theQueue = new QueueMQ(schedulerName, { connection: RedisConn });
|
|
const theQueueScheduler = new QueueSchedulerMQ(schedulerName, { connection: RedisConn });
|
|
|
|
const Logger = LibWinston.initialize(schedulerName);
|
|
|
|
class MyEmitter extends EventEmitter { };
|
|
const myEmitter = new MyEmitter();
|
|
|
|
/**
|
|
* bisa multiple queue tanpa new Queue(name) lagi, tinggal dibedain aja nama processornya ketika queue.add(nameProcessor)
|
|
* worker hanya memproses nama sesuai dengan nameProcessor bukan nama ketika new Queue(name)
|
|
* concurrency di worker adalah maksimal yang bisa diproses dalam satu waktu bersamaan
|
|
* attempts di queue adalah maksimal percobaan ketika gagal
|
|
*/
|
|
|
|
const LibSchedulerDrvUpLocIdle = {
|
|
name: schedulerName,
|
|
queue: theQueue,
|
|
runQueueScheduler: function (data) {
|
|
// * * * * * // every minute
|
|
// */3 * * * * // every 3 minute
|
|
// 0 */1 * * * // every hour
|
|
this.queue.add(schedulerName, data, {
|
|
removeOnComplete: false, // false just for debugging
|
|
repeat: {
|
|
cron: process.env.SCHEDULE_DRV_UP_LOC_IDLE_TIME,
|
|
},
|
|
attempts: 0,
|
|
});
|
|
},
|
|
addQueue: function (data) {
|
|
this.queue.add(schedulerName, data, {
|
|
removeOnComplete: false, // false just for debugging
|
|
});
|
|
},
|
|
pauseQueue: function () {
|
|
this.queue.pause();
|
|
},
|
|
resumeQueue: function () {
|
|
this.queue.resume();
|
|
},
|
|
getJobs: async function () {
|
|
return await this.queue.getJobs();
|
|
},
|
|
getRepeatableJobs: async function () {
|
|
return await this.queue.getRepeatableJobs();
|
|
},
|
|
setWorker: function () {
|
|
try {
|
|
const processorFile = path.join(__dirname, '../workers/DrvUpLocIdleWorker.js');
|
|
const schedulerWorker = new WorkerMQ(schedulerName, processorFile, {
|
|
connection: RedisConn,
|
|
concurrency: 1,
|
|
})
|
|
schedulerWorker.on('completed', async (job, returnValue) => {
|
|
Logger.log('info', `${schedulerName} completed: ${JSON.stringify(returnValue)}`);
|
|
})
|
|
schedulerWorker.on('failed', async (job, failedReason) => {
|
|
Logger.log('error', `${schedulerName} failed: ${JSON.stringify(failedReason)}`);
|
|
});
|
|
schedulerWorker.on('error', async (errMsg) => {
|
|
Logger.log('error', `${schedulerName} error: ${JSON.stringify(errMsg)}`);
|
|
});
|
|
myEmitter.on(schedulerName + 'ShutDown', () => {
|
|
// gracefull shutdown
|
|
schedulerWorker.close();
|
|
});
|
|
} catch (e) {
|
|
Logger.log('error', `${schedulerName} worker_error: ${JSON.stringify(e, Object.getOwnPropertyNames(e))}`);
|
|
}
|
|
},
|
|
drainQueue: function () {
|
|
this.queue.drain();
|
|
},
|
|
obliterateQueue: function () {
|
|
this.queue.obliterate();
|
|
},
|
|
workerShutDown: function () {
|
|
myEmitter.emit(schedulerName + 'ShutDown');
|
|
}
|
|
}
|
|
|
|
module.exports = LibSchedulerDrvUpLocIdle |