diff --git a/cron/TripsWorker.js b/cron/TripsWorker.js index df61db0..e65c16e 100644 --- a/cron/TripsWorker.js +++ b/cron/TripsWorker.js @@ -7,7 +7,9 @@ const moment = require("moment") const TIMEFIX = 25200 cron.schedule("0 0 * * * *", job) +cron.schedule("*/5 * * * *", tripGrouping) // job() +// tripGrouping() async function job() { console.log("Monthly table job executed:", moment().format("YYYY-MM-DD HH:mm:ss")) @@ -115,7 +117,7 @@ async function job() { console.error(`Error dropping table '${dropTableName}':`, error.message) } - // insert prev data (3 months ago) to history table + // insert prev data for (let i = 0; i <= 12; i++) { const histMonth = lasYearDate.clone().add(i, "months") const histYy = histMonth.format("YY") @@ -160,6 +162,108 @@ async function job() { async function tripGrouping() { console.log("Trip grouping job executed:", moment().format("YYYY-MM-DD HH:mm:ss")) + for (let i = 0; i <= 12; i++) { + const lasYearDate = moment().subtract(1, "years") + const histMonth = lasYearDate.clone().add(i, "months") + const histYy = histMonth.format("YY") + const histMm = histMonth.format("MM") + const histTableName = `tracks_${histYy}${histMm}` + console.log(`Processing history insertion for table '${histTableName}'...`) + + try { + console.time(`Trip grouping for ${histTableName}`) + const startOfMonth = histMonth.clone().startOf("month").unix() - TIMEFIX + const endOfMonth = histMonth.clone().endOf("month").unix() - TIMEFIX + + const q2 = ` + insert into trips + (id,name,nopol1,vhc_id,mileage,start,finish,startMileage,finishMileage,startLoc,finishLoc,pool_code,dc_code,row_count) + WITH + gaps AS ( + SELECT + -- previous gap since previous row > 1 hour (3600s) + CASE + WHEN (crt_d - LAG(crt_d, 1, NULL) OVER (PARTITION BY vhc_id ORDER BY crt_d)) > 3600 + THEN 1 ELSE 0 + END AS isStop, + t.* + FROM ${histTableName} t + WHERE + t.latitude IS NOT NULL + AND t.longitude IS NOT NULL + AND t.action = 'location' + AND t.crt_d BETWEEN ? AND ? + ) + , trips AS ( + SELECT + -- mark the start of a trip when ignition=4 and previous ignition <> 4 + CASE + WHEN ignition = 4 + AND LAG(ignition, 1, 0) OVER (PARTITION BY vhc_id ORDER BY crt_d) <> 4 + or LAG(isStop, 1, 0) over (PARTITION BY vhc_id ORDER BY crt_d) = 1 + THEN 1 ELSE 0 + END AS trip_start, + g.* + FROM gaps g + ) + , numbered AS ( + SELECT + *, + -- assign a trip_id by cumulative sum of trip_start + SUM(trip_start) OVER (PARTITION BY vhc_id ORDER BY crt_d) AS trip_id + FROM trips + where + ignition = 4 + and isStop = 0 + ), + agg AS ( + SELECT + v.id, + v.name, + v.nopol1, + vhc_id, + ROW_NUMBER() OVER (PARTITION BY v.id ORDER BY MIN(a.crt_d)) AS trip_id, + SUM(pre_milleage) AS mileage, + MIN(a.crt_d) AS start, + MAX(a.crt_d) AS finish, + MIN(a.vhc_milleage) AS startMileage, + MAX(a.vhc_milleage) AS finishMileage, + (SELECT fulladdress FROM t_gps_tracks_address WHERE master_id = MIN(a.id) LIMIT 1) AS startLoc, + (SELECT fulladdress FROM t_gps_tracks_address WHERE master_id = MAX(a.id) LIMIT 1) AS finishLoc, + COUNT(*) AS row_count + FROM t_vehicles v + LEFT JOIN numbered a ON a.vhc_id = v.id + WHERE + v.dlt is null and trip_id != 0 + GROUP BY v.id, a.trip_id + HAVING COUNT(*) > 1 + ) + SELECT + agg.id,name,nopol1,vhc_id,mileage,start,finish,startMileage,finishMileage,startLoc,finishLoc, + tvd.pool_code, tvd.dc_code, + row_count + FROM agg agg + join t_vehicles_detail tvd on tvd.vid = agg.id + ORDER BY agg.id, trip_id + ON DUPLICATE KEY UPDATE + mileage = values(mileage), + start = values(start), + finish = values(finish), + startMileage = values(startMileage), + finishMileage = values(finishMileage), + startLoc = values(startLoc), + finishLoc = values(finishLoc), + row_count = values(row_count) + ` + const d2 = [startOfMonth, endOfMonth] + const r2 = await db.query(q2, d2) + console.log(`Inserted ${r2.affectedRows} rows into 'trips' table from '${histTableName}'`) + console.timeEnd(`Trip grouping for ${histTableName}`) + } catch (error) { + console.error(`Error inserting data into history table '${histTableName}':`, error.message) + } + } + console.log("Trip grouping job completed.") } // Keep the process running