tripGrouping
This commit is contained in:
@ -7,7 +7,9 @@ const moment = require("moment")
|
|||||||
const TIMEFIX = 25200
|
const TIMEFIX = 25200
|
||||||
|
|
||||||
cron.schedule("0 0 * * * *", job)
|
cron.schedule("0 0 * * * *", job)
|
||||||
|
cron.schedule("*/5 * * * *", tripGrouping)
|
||||||
// job()
|
// job()
|
||||||
|
// tripGrouping()
|
||||||
|
|
||||||
async function job() {
|
async function job() {
|
||||||
console.log("Monthly table job executed:", moment().format("YYYY-MM-DD HH:mm:ss"))
|
console.log("Monthly table job executed:", moment().format("YYYY-MM-DD HH:mm:ss"))
|
||||||
@ -115,7 +117,7 @@ async function job() {
|
|||||||
console.error(`Error dropping table '${dropTableName}':`, error.message)
|
console.error(`Error dropping table '${dropTableName}':`, error.message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert prev data (3 months ago) to history table
|
// insert prev data
|
||||||
for (let i = 0; i <= 12; i++) {
|
for (let i = 0; i <= 12; i++) {
|
||||||
const histMonth = lasYearDate.clone().add(i, "months")
|
const histMonth = lasYearDate.clone().add(i, "months")
|
||||||
const histYy = histMonth.format("YY")
|
const histYy = histMonth.format("YY")
|
||||||
@ -160,6 +162,108 @@ async function job() {
|
|||||||
|
|
||||||
async function tripGrouping() {
|
async function tripGrouping() {
|
||||||
console.log("Trip grouping job executed:", moment().format("YYYY-MM-DD HH:mm:ss"))
|
console.log("Trip grouping job executed:", moment().format("YYYY-MM-DD HH:mm:ss"))
|
||||||
|
for (let i = 0; i <= 12; i++) {
|
||||||
|
const lasYearDate = moment().subtract(1, "years")
|
||||||
|
const histMonth = lasYearDate.clone().add(i, "months")
|
||||||
|
const histYy = histMonth.format("YY")
|
||||||
|
const histMm = histMonth.format("MM")
|
||||||
|
const histTableName = `tracks_${histYy}${histMm}`
|
||||||
|
console.log(`Processing history insertion for table '${histTableName}'...`)
|
||||||
|
|
||||||
|
try {
|
||||||
|
console.time(`Trip grouping for ${histTableName}`)
|
||||||
|
const startOfMonth = histMonth.clone().startOf("month").unix() - TIMEFIX
|
||||||
|
const endOfMonth = histMonth.clone().endOf("month").unix() - TIMEFIX
|
||||||
|
|
||||||
|
const q2 = `
|
||||||
|
insert into trips
|
||||||
|
(id,name,nopol1,vhc_id,mileage,start,finish,startMileage,finishMileage,startLoc,finishLoc,pool_code,dc_code,row_count)
|
||||||
|
WITH
|
||||||
|
gaps AS (
|
||||||
|
SELECT
|
||||||
|
-- previous gap since previous row > 1 hour (3600s)
|
||||||
|
CASE
|
||||||
|
WHEN (crt_d - LAG(crt_d, 1, NULL) OVER (PARTITION BY vhc_id ORDER BY crt_d)) > 3600
|
||||||
|
THEN 1 ELSE 0
|
||||||
|
END AS isStop,
|
||||||
|
t.*
|
||||||
|
FROM ${histTableName} t
|
||||||
|
WHERE
|
||||||
|
t.latitude IS NOT NULL
|
||||||
|
AND t.longitude IS NOT NULL
|
||||||
|
AND t.action = 'location'
|
||||||
|
AND t.crt_d BETWEEN ? AND ?
|
||||||
|
)
|
||||||
|
, trips AS (
|
||||||
|
SELECT
|
||||||
|
-- mark the start of a trip when ignition=4 and previous ignition <> 4
|
||||||
|
CASE
|
||||||
|
WHEN ignition = 4
|
||||||
|
AND LAG(ignition, 1, 0) OVER (PARTITION BY vhc_id ORDER BY crt_d) <> 4
|
||||||
|
or LAG(isStop, 1, 0) over (PARTITION BY vhc_id ORDER BY crt_d) = 1
|
||||||
|
THEN 1 ELSE 0
|
||||||
|
END AS trip_start,
|
||||||
|
g.*
|
||||||
|
FROM gaps g
|
||||||
|
)
|
||||||
|
, numbered AS (
|
||||||
|
SELECT
|
||||||
|
*,
|
||||||
|
-- assign a trip_id by cumulative sum of trip_start
|
||||||
|
SUM(trip_start) OVER (PARTITION BY vhc_id ORDER BY crt_d) AS trip_id
|
||||||
|
FROM trips
|
||||||
|
where
|
||||||
|
ignition = 4
|
||||||
|
and isStop = 0
|
||||||
|
),
|
||||||
|
agg AS (
|
||||||
|
SELECT
|
||||||
|
v.id,
|
||||||
|
v.name,
|
||||||
|
v.nopol1,
|
||||||
|
vhc_id,
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY v.id ORDER BY MIN(a.crt_d)) AS trip_id,
|
||||||
|
SUM(pre_milleage) AS mileage,
|
||||||
|
MIN(a.crt_d) AS start,
|
||||||
|
MAX(a.crt_d) AS finish,
|
||||||
|
MIN(a.vhc_milleage) AS startMileage,
|
||||||
|
MAX(a.vhc_milleage) AS finishMileage,
|
||||||
|
(SELECT fulladdress FROM t_gps_tracks_address WHERE master_id = MIN(a.id) LIMIT 1) AS startLoc,
|
||||||
|
(SELECT fulladdress FROM t_gps_tracks_address WHERE master_id = MAX(a.id) LIMIT 1) AS finishLoc,
|
||||||
|
COUNT(*) AS row_count
|
||||||
|
FROM t_vehicles v
|
||||||
|
LEFT JOIN numbered a ON a.vhc_id = v.id
|
||||||
|
WHERE
|
||||||
|
v.dlt is null and trip_id != 0
|
||||||
|
GROUP BY v.id, a.trip_id
|
||||||
|
HAVING COUNT(*) > 1
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
agg.id,name,nopol1,vhc_id,mileage,start,finish,startMileage,finishMileage,startLoc,finishLoc,
|
||||||
|
tvd.pool_code, tvd.dc_code,
|
||||||
|
row_count
|
||||||
|
FROM agg agg
|
||||||
|
join t_vehicles_detail tvd on tvd.vid = agg.id
|
||||||
|
ORDER BY agg.id, trip_id
|
||||||
|
ON DUPLICATE KEY UPDATE
|
||||||
|
mileage = values(mileage),
|
||||||
|
start = values(start),
|
||||||
|
finish = values(finish),
|
||||||
|
startMileage = values(startMileage),
|
||||||
|
finishMileage = values(finishMileage),
|
||||||
|
startLoc = values(startLoc),
|
||||||
|
finishLoc = values(finishLoc),
|
||||||
|
row_count = values(row_count)
|
||||||
|
`
|
||||||
|
const d2 = [startOfMonth, endOfMonth]
|
||||||
|
const r2 = await db.query(q2, d2)
|
||||||
|
console.log(`Inserted ${r2.affectedRows} rows into 'trips' table from '${histTableName}'`)
|
||||||
|
console.timeEnd(`Trip grouping for ${histTableName}`)
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`Error inserting data into history table '${histTableName}':`, error.message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
console.log("Trip grouping job completed.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep the process running
|
// Keep the process running
|
||||||
|
|||||||
Reference in New Issue
Block a user