Files
gps-backend/cron/TripsWorker.js
2025-10-08 16:21:44 +07:00

275 lines
11 KiB
JavaScript

const path = require("path")
require("dotenv").config({ path: path.resolve(__dirname, "../.env") })
const cron = require("node-cron")
const db = require("../config/dbConnCron")
const moment = require("moment")
const TIMEFIX = 25200
cron.schedule("0 0 * * * *", job)
cron.schedule("*/5 * * * *", tripGrouping)
// job()
// tripGrouping()
async function job() {
console.log("Monthly table job executed:", moment().format("YYYY-MM-DD HH:mm:ss"))
const currentDate = moment()
const lasYearDate = currentDate.clone().subtract(1, "years")
const databaseName = process.env.DATABASE
for (let i = 0; i <= 14; i++) {
const targetMonth = lasYearDate.clone().add(i, "months")
const yy = targetMonth.format("YY") // Two-digit year
const mm = targetMonth.format("MM") // Two-digit month
const tableName = `tracks_${yy}${mm}`
try {
// Check if table exists
console.log(`Checking existence of table '${tableName}'...`)
const checkQuery = `
SELECT TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
`
const checkParams = [databaseName, tableName]
const existenceResult = await db.query(checkQuery, checkParams)
if (existenceResult.length === 0) {
// Table does not exist; create it
const createQuery = `
CREATE TABLE ${tableName} (
id bigint NOT NULL AUTO_INCREMENT,
original_hex text,
protocol enum('gt06','tk119','smartphone') DEFAULT NULL,
action enum('login','heartbeat','location','alarm','other') DEFAULT NULL,
device_id varchar(16) DEFAULT NULL,
latitude double DEFAULT NULL,
longitude double DEFAULT NULL,
speed int DEFAULT NULL COMMENT 'km/h',
orientation int NOT NULL DEFAULT '0' COMMENT 'in 360 degrees',
ignition int NOT NULL DEFAULT '0' COMMENT 'pengapian. 1=>on, 2=>off, 3=>acc_low, 4=>acc_high',
stts_engine int NOT NULL DEFAULT '0' COMMENT '1=>idling, 2=>moving, 3=>stopping',
stts_gps int NOT NULL DEFAULT '0' COMMENT '1=>on, 2=>off',
length_gps int NOT NULL DEFAULT '0' COMMENT 'length of GPS information,',
pos_stlt_gps int NOT NULL DEFAULT '0' COMMENT 'quantity of positioning satellites',
pos_type_gps int NOT NULL DEFAULT '0' COMMENT 'GPS real-time/differential positioning. 1=>diff_positioning,2=>realtime_positioning',
is_pos_gps int NOT NULL DEFAULT '0' COMMENT 'GPS having been positioning or not. 1=>not,2=>positioning',
stts_gsm int NOT NULL DEFAULT '0' COMMENT '1=>no signal, 2=>extremely weak signal 3=>very weak signal, 4=>good signal, 5=>strong signal',
stts_oil_electricity int NOT NULL DEFAULT '0' COMMENT '1=>on, 2=>off',
stts_alarm int NOT NULL DEFAULT '0' COMMENT '1=>normal,2=>shock,3=>power_cut,4=>low_battery,5=>sos',
stts_charge int NOT NULL DEFAULT '0' COMMENT '1=>off,2=>on',
stts_acc int NOT NULL DEFAULT '0' COMMENT '1=>low,2=>high',
stts_volt int NOT NULL DEFAULT '0' COMMENT '1=>shutdown,2=>extreme_low_battery,3=>very_low_batter,4=>low_battery,5=>medium,6=>high,7=>very_high',
stts_switch int NOT NULL DEFAULT '0' COMMENT '1=>off, 2=>on',
stts_reverse_geo int NOT NULL DEFAULT '0' COMMENT '1=>success, 2=>not, 3=>error, 4=>lost',
pre_milleage double NOT NULL DEFAULT '0' COMMENT 'in km. distance from prev to now',
sum_milleage double NOT NULL DEFAULT '0' COMMENT 'in km. summary device milleage. calculated based on this device',
vhc_milleage double NOT NULL DEFAULT '0' COMMENT 'in km. summary vhc milleage. calculated based on vhc, every change vhc this will get last milleage on that vhc',
drv_milleage double NOT NULL DEFAULT '0' COMMENT 'in km. summary drv milleage. calculated based on drv, every change drv this will get last milleage on that drv',
vhc_id int NOT NULL DEFAULT '0',
drv_id int NOT NULL DEFAULT '0',
source int NOT NULL DEFAULT '1' COMMENT '1=>gps_tracker, 2=>smartphone',
crt int NOT NULL,
crt_format datetime NOT NULL,
crt_d int NOT NULL DEFAULT '0' COMMENT 'from device/terminal',
crt_d_format datetime DEFAULT NULL,
crt_s int NOT NULL DEFAULT '0' COMMENT 'from server (receive time)',
crt_s_format datetime DEFAULT NULL,
crt_device_raw bigint DEFAULT '0' COMMENT 'device raw time',
daily_trip_id int unsigned DEFAULT NULL,
PRIMARY KEY (id),
KEY tracks_device_id (device_id),
KEY tracks_vhc_id (vhc_id),
KEY tracks_drv_id (drv_id),
KEY idx_crt_d (crt_d),
KEY idx_vhc_crt_d (vhc_id,crt_d),
KEY idx_vhc_id_engine (vhc_id,stts_engine),
KEY idx_vhc_lat_long_crt (vhc_id,latitude,longitude,crt),
KEY idx_geo_stts_id (stts_reverse_geo,latitude,longitude,id),
KEY idx_gps_valid_geo (latitude,longitude,stts_reverse_geo),
KEY idx_gps_order (id),
KEY t_gps_tracks_latitude_IDX (latitude,longitude,vhc_id,crt_d,action) USING BTREE,
KEY t_gps_tracks_crt_s_IDX (crt_s,action) USING BTREE,
KEY t_gps_tracks_stts_reverse_geo_IDX (stts_reverse_geo,action,id,latitude,longitude) USING BTREE
) ENGINE=MyISAM AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
`
const createParams = [] // No parameters for this CREATE statement
await db.query(createQuery, createParams)
console.log(`Table '${tableName}' created successfully.`)
} else {
console.log(`Table '${tableName}' already exists.`)
}
} catch (error) {
console.error(`Error processing table '${tableName}':`, error.message)
}
}
// drop tables older than 12 months
const dropMonth = currentDate.clone().subtract(13, "months")
const dropYy = dropMonth.format("YY")
const dropMm = dropMonth.format("MM")
const dropTableName = `tracks_${dropYy}${dropMm}`
try {
const dropQuery = `DROP TABLE IF EXISTS ${dropTableName}`
await db.query(dropQuery, [])
console.log(`Table '${dropTableName}' dropped successfully (if it existed).`)
} catch (error) {
console.error(`Error dropping table '${dropTableName}':`, error.message)
}
// insert prev data
for (let i = 0; i <= 12; i++) {
const histMonth = lasYearDate.clone().add(i, "months")
const histYy = histMonth.format("YY")
const histMm = histMonth.format("MM")
const histTableName = `tracks_${histYy}${histMm}`
console.log(`Processing history insertion for table '${histTableName}'...`)
try {
// const q1 = `SELECT count(*) jmlData FROM ${histTableName}`
// const r1 = await db.query(q1)
// const jmlData = r1[0].jmlData
// if (jmlData == 0) {
const startOfMonth = histMonth.clone().startOf("month").unix() - TIMEFIX
const endOfMonth = histMonth.clone().endOf("month").unix() - TIMEFIX
console.log(
`Inserting data into history table '${histTableName}' for records from ${startOfMonth} to ${endOfMonth}`
)
const q2 = `
INSERT INTO ${histTableName}
SELECT *, null FROM t_gps_tracks a
WHERE
crt_d < ? AND crt_d >= ?
and action = 'location'
and not exists (
select 1 from ${histTableName} b
where b.id = a.id
)
`
const d2 = [endOfMonth, startOfMonth]
const r2 = await db.query(q2, d2)
console.log(`Inserted ${r2.affectedRows} rows into '${histTableName}'`)
// } else {
// console.log(`Table '${histTableName}' already has data (${jmlData} rows). Skipping insertion.`)
// }
} catch (error) {
console.error(`Error inserting data into history table '${histTableName}':`, error.message)
}
}
console.log("Monthly table job completed.")
}
async function tripGrouping() {
console.log("Trip grouping job executed:", moment().format("YYYY-MM-DD HH:mm:ss"))
for (let i = 0; i <= 1; i++) {
const lastMonth = moment().subtract(1, "months")
const histMonth = lastMonth.clone().add(i, "months")
const histYy = histMonth.format("YY")
const histMm = histMonth.format("MM")
const histTableName = `tracks_${histYy}${histMm}`
console.log(`Processing history insertion for table '${histTableName}'...`)
try {
console.time(`Trip grouping for ${histTableName}`)
const startOfMonth = histMonth.clone().startOf("month").unix() - TIMEFIX
const endOfMonth = histMonth.clone().endOf("month").unix() - TIMEFIX
const q2 = `
insert into trips
(id,name,nopol1,vhc_id,mileage,start,finish,startMileage,finishMileage,startLoc,finishLoc,pool_code,dc_code,row_count)
WITH
gaps AS (
SELECT
-- previous gap since previous row > 1 hour (3600s)
CASE
WHEN (crt_d - LAG(crt_d, 1, NULL) OVER (PARTITION BY vhc_id ORDER BY crt_d)) > 3600
THEN 1 ELSE 0
END AS isStop,
t.*
FROM ${histTableName} t
WHERE
t.latitude IS NOT NULL
AND t.longitude IS NOT NULL
AND t.action = 'location'
AND t.crt_d BETWEEN ? AND ?
)
, trips AS (
SELECT
-- mark the start of a trip when ignition=4 and previous ignition <> 4
CASE
WHEN ignition = 4
AND LAG(ignition, 1, 0) OVER (PARTITION BY vhc_id ORDER BY crt_d) <> 4
or LAG(isStop, 1, 0) over (PARTITION BY vhc_id ORDER BY crt_d) = 1
THEN 1 ELSE 0
END AS trip_start,
g.*
FROM gaps g
)
, numbered AS (
SELECT
*,
-- assign a trip_id by cumulative sum of trip_start
SUM(trip_start) OVER (PARTITION BY vhc_id ORDER BY crt_d) AS trip_id
FROM trips
where
ignition = 4
and isStop = 0
),
agg AS (
SELECT
v.id,
v.name,
v.nopol1,
vhc_id,
ROW_NUMBER() OVER (PARTITION BY v.id ORDER BY MIN(a.crt_d)) AS trip_id,
SUM(pre_milleage) AS mileage,
MIN(a.crt_d) AS start,
MAX(a.crt_d) AS finish,
MIN(a.vhc_milleage) AS startMileage,
MAX(a.vhc_milleage) AS finishMileage,
(SELECT fulladdress FROM t_gps_tracks_address WHERE master_id = MIN(a.id) LIMIT 1) AS startLoc,
(SELECT fulladdress FROM t_gps_tracks_address WHERE master_id = MAX(a.id) LIMIT 1) AS finishLoc,
COUNT(*) AS row_count
FROM t_vehicles v
LEFT JOIN numbered a ON a.vhc_id = v.id
WHERE
v.dlt is null and trip_id != 0
GROUP BY v.id, a.trip_id
HAVING COUNT(*) > 1
)
SELECT
agg.id,name,nopol1,vhc_id,mileage,start,finish,startMileage,finishMileage,startLoc,finishLoc,
tvd.pool_code, tvd.dc_code,
row_count
FROM agg agg
join t_vehicles_detail tvd on tvd.vid = agg.id
ORDER BY agg.id, trip_id
ON DUPLICATE KEY UPDATE
mileage = values(mileage),
start = values(start),
finish = values(finish),
startMileage = values(startMileage),
finishMileage = values(finishMileage),
startLoc = values(startLoc),
finishLoc = values(finishLoc),
row_count = values(row_count)
`
const d2 = [startOfMonth, endOfMonth]
const r2 = await db.query(q2, d2)
console.log(`Inserted ${r2.affectedRows} rows into 'trips' table from '${histTableName}'`)
console.timeEnd(`Trip grouping for ${histTableName}`)
} catch (error) {
console.error(`Error inserting data into history table '${histTableName}':`, error.message)
}
}
console.log("Trip grouping job completed.")
}
// Keep the process running
// console.log("Cron scheduler ")
process.on("SIGINT", () => {
console.log("Stopping cron scheduler...")
process.exit(0)
})