const mysql = require('mysql2/promise'); const path = require('path'); const fs = require('fs'); const DATA_DIR = path.join(__dirname, '../../web/data'); // Ensure data directory exists (for export.json) if (!fs.existsSync(DATA_DIR)) fs.mkdirSync(DATA_DIR, { recursive: true }); const paths = { export: path.join(DATA_DIR, 'export.json') }; const pool = mysql.createPool({ host: '127.0.0.1', port: 3306, user: 'cc-ticket-machine', password: 'cc-ticket-machine', database: 'cc-ticket-machine', waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); const DEFAULT_LUA_VERSIONS = { ticketmachine: 'v1.5.8', gate: 'v1.5.8' }; function normalizeLuaVersions(input) { const src = (input && typeof input === 'object') ? input : {}; return { ticketmachine: String(src.ticketmachine || DEFAULT_LUA_VERSIONS.ticketmachine), gate: String(src.gate || DEFAULT_LUA_VERSIONS.gate) }; } function normalizeConfig(input) { const src = (input && typeof input === 'object') ? input : {}; return { ...src, api_base: String(src.api_base || 'http://127.0.0.1:23333/api'), current_station: (src.current_station && typeof src.current_station === 'object') ? src.current_station : { name: 'Station1', code: '01-01' }, transfers: Array.isArray(src.transfers) ? src.transfers : [], promotion: { name: String(src?.promotion?.name || ''), discount: Number(src?.promotion?.discount ?? 1) }, lua_versions: normalizeLuaVersions(src.lua_versions) }; } // In-memory cache for synchronous read access const cache = { config: normalizeConfig({}), stations: [], lines: [], fares: [], orders: [], orderIndex: {}, ticketIndex: {}, icCards: [], icCardIndex: {}, icCardEvents: [], statsTicket: [], statsGate: [] }; const DataService = { paths, // Kept for compatibility if anything accesses paths.export init: async () => { try { // Create Tables const conn = await pool.getConnection(); try { await conn.query(`CREATE TABLE IF NOT EXISTS kv_store (k VARCHAR(255) PRIMARY KEY, v JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS stations (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS \`lines\` (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS fares (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS orders (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS order_index (order_code VARCHAR(255) PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS ticket_index (ticket_id VARCHAR(255) PRIMARY KEY, data JSON, last_update_ts BIGINT)`); await conn.query(`CREATE TABLE IF NOT EXISTS ic_cards (card_id VARCHAR(255) PRIMARY KEY, data JSON, last_update_ts BIGINT)`); await conn.query(`CREATE TABLE IF NOT EXISTS ic_card_events (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS logs (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS ticket_events (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS stats_ticket (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); await conn.query(`CREATE TABLE IF NOT EXISTS stats_gate (id INT AUTO_INCREMENT PRIMARY KEY, data JSON)`); // Load Cache const [configs] = await conn.query('SELECT v FROM kv_store WHERE k = ?', ['config']); if (configs.length > 0) cache.config = normalizeConfig(configs[0].v); else await conn.query('INSERT INTO kv_store (k, v) VALUES (?, ?)', ['config', JSON.stringify(cache.config)]); const [stations] = await conn.query('SELECT data FROM stations'); cache.stations = stations.map(r => r.data); const [lines] = await conn.query('SELECT data FROM `lines`'); cache.lines = lines.map(r => r.data); const [fares] = await conn.query('SELECT data FROM fares'); cache.fares = fares.map(r => r.data); const [orders] = await conn.query('SELECT data FROM orders'); cache.orders = orders.map(r => r.data); const [orderIndices] = await conn.query('SELECT order_code, data FROM order_index'); orderIndices.forEach(r => { cache.orderIndex[r.order_code] = r.data; }); const [ticketIndices] = await conn.query('SELECT ticket_id, data FROM ticket_index'); ticketIndices.forEach(r => { cache.ticketIndex[r.ticket_id] = r.data; }); const [icCards] = await conn.query('SELECT card_id, data FROM ic_cards'); cache.icCardIndex = {}; icCards.forEach(r => { cache.icCardIndex[r.card_id] = r.data; }); cache.icCards = Object.values(cache.icCardIndex).sort((a, b) => Number(b.last_update_ts || 0) - Number(a.last_update_ts || 0)); const [icCardEvents] = await conn.query('SELECT data FROM ic_card_events ORDER BY id ASC'); cache.icCardEvents = icCardEvents.map((r) => r.data); const [statsT] = await conn.query('SELECT data FROM stats_ticket'); cache.statsTicket = statsT.map(r => r.data); const [statsG] = await conn.query('SELECT data FROM stats_gate'); cache.statsGate = statsG.map(r => r.data); console.log('DataService initialized with MySQL'); } finally { conn.release(); } } catch (e) { console.error('Failed to initialize DataService:', e); // Fallback or exit? For now, we continue but cache might be empty } }, // Config getConfig: () => cache.config, saveConfig: async (cfg) => { cache.config = normalizeConfig(cfg); await pool.query('INSERT INTO kv_store (k, v) VALUES (?, ?) ON DUPLICATE KEY UPDATE v = ?', ['config', JSON.stringify(cache.config), JSON.stringify(cache.config)]); }, // Stations getStations: () => cache.stations, saveStations: async (list) => { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM stations'); if (list.length > 0) { const placeholders = list.map(() => '(?)').join(','); const values = list.map(item => JSON.stringify(item)); await conn.query('INSERT INTO stations (data) VALUES ' + placeholders, values); } await conn.commit(); cache.stations = list; } catch (e) { await conn.rollback(); console.error('saveStations error', e); throw e; } finally { conn.release(); } }, // Lines getLines: () => cache.lines, saveLines: async (list) => { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM `lines`'); if (list.length > 0) { const placeholders = list.map(() => '(?)').join(','); const values = list.map(item => JSON.stringify(item)); await conn.query('INSERT INTO `lines` (data) VALUES ' + placeholders, values); } await conn.commit(); cache.lines = list; } catch (e) { await conn.rollback(); console.error('saveLines error', e); throw e; } finally { conn.release(); } }, // Fares getFares: () => cache.fares, saveFares: async (list) => { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM fares'); if (list.length > 0) { const placeholders = list.map(() => '(?)').join(','); const values = list.map(item => JSON.stringify(item)); await conn.query(`INSERT INTO fares (data) VALUES ${placeholders}`, values); } await conn.commit(); cache.fares = list; } catch (e) { await conn.rollback(); console.error('saveFares error', e); throw e; } finally { conn.release(); } }, // Orders getOrders: () => cache.orders, saveOrders: async (list) => { // Optimization: If we assume append-only, we can just insert the last one. // But the API passes the whole list. For correctness with the current API: // Full replace is inefficient for orders. But mimicking file overwrite. // Better: logic.js pushes to list and calls saveOrders. // We should probably just insert the new ones if we could track diffs. // For now, doing full replace to be safe with existing logic, or optimized if possible. // Wait, the API `router.post('/orders')` does: list.push(rec); saveOrders(list); // I can optimize this in `saveOrders` if I knew it was an append. // But to be safe and simple: const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM orders'); if (list.length > 0) { // Split into chunks if too large? const placeholders = list.map(() => '(?)').join(','); const values = list.map(item => JSON.stringify(item)); // Be careful with max packet size. // If list is huge, this crashes. // But for this task, we assume reasonable size. await conn.query(`INSERT INTO orders (data) VALUES ${placeholders}`, values); } await conn.commit(); cache.orders = list; } catch (e) { await conn.rollback(); console.error('saveOrders error', e); throw e; } finally { conn.release(); } }, getOrderIndex: () => cache.orderIndex, saveOrderIndex: async (idx) => { // This is also potentially huge. // `router.post('/orders')` does: idx[code] = rec; saveOrderIndex(idx); // Ideally we should just upsert the single entry. // But since we receive the whole object, we can't easily know which one changed without diffing. // However, for the sake of the migration task, I will do a truncate/insert loop or similar. // Actually, `order_index` table has `order_code` PK. // I can iterate and UPSERT all? That's slow. // Given "Development" context, maybe I just clear and insert all. // OR, I can accept that `saveOrderIndex` is heavy. // Better approach: modifying `router.post('/orders')` to NOT call saveOrderIndex with the whole object, but call a new method `addOrder(rec)`. // But I want to minimize changes to `api.js`. // Let's implement full save for now. const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM order_index'); const entries = Object.entries(idx); if (entries.length > 0) { // Batch insert for (let i = 0; i < entries.length; i += 100) { const batch = entries.slice(i, i + 100); const q = `INSERT INTO order_index (order_code, data) VALUES ${batch.map(()=>'(?,?)').join(',')}`; const params = batch.flatMap(([k,v]) => [k, JSON.stringify(v)]); await conn.query(q, params); } } await conn.commit(); cache.orderIndex = idx; } catch (e) { await conn.rollback(); console.error('saveOrderIndex error', e); throw e; } finally { conn.release(); } }, // IC Cards getIcCards: () => cache.icCards, getIcCardIndex: () => cache.icCardIndex, saveIcCards: async (list) => { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM ic_cards'); if (list.length > 0) { for (let i = 0; i < list.length; i += 100) { const batch = list.slice(i, i + 100).map((item) => ({ ...item, card_id: String(item?.card_id || '').trim(), last_update_ts: Number(item?.last_update_ts || Date.now()) })).filter((item) => item.card_id); if (batch.length === 0) continue; const q = `INSERT INTO ic_cards (card_id, data, last_update_ts) VALUES ${batch.map(() => '(?,?,?)').join(',')}`; const params = batch.flatMap((item) => [item.card_id, JSON.stringify(item), item.last_update_ts]); await conn.query(q, params); } } await conn.commit(); cache.icCardIndex = {}; list.forEach((item) => { const id = String(item?.card_id || '').trim(); if (id) cache.icCardIndex[id] = item; }); cache.icCards = Object.values(cache.icCardIndex).sort((a, b) => Number(b.last_update_ts || 0) - Number(a.last_update_ts || 0)); } catch (e) { await conn.rollback(); console.error('saveIcCards error', e); throw e; } finally { conn.release(); } }, upsertIcCard: async (update) => { const id = String(update?.card_id || '').trim(); if (!id) return null; const cur = cache.icCardIndex[id] || {}; const merged = { ...cur, ...update, card_id: id, last_update_ts: Date.now() }; cache.icCardIndex[id] = merged; cache.icCards = Object.values(cache.icCardIndex).sort((a, b) => Number(b.last_update_ts || 0) - Number(a.last_update_ts || 0)); try { await pool.query( 'INSERT INTO ic_cards (card_id, data, last_update_ts) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE data = ?, last_update_ts = ?', [id, JSON.stringify(merged), merged.last_update_ts, JSON.stringify(merged), merged.last_update_ts] ); } catch (e) { console.error('upsertIcCard error', e); } return merged; }, deleteIcCard: async (cardId) => { const id = String(cardId || '').trim(); if (!id) return; delete cache.icCardIndex[id]; cache.icCards = Object.values(cache.icCardIndex).sort((a, b) => Number(b.last_update_ts || 0) - Number(a.last_update_ts || 0)); try { await pool.query('DELETE FROM ic_cards WHERE card_id = ?', [id]); } catch (e) { console.error('deleteIcCard error', e); } }, appendIcCardEvent: async (entry) => { cache.icCardEvents.push(entry); try { await pool.query('INSERT INTO ic_card_events (data) VALUES (?)', [JSON.stringify(entry)]); } catch (e) { console.error('appendIcCardEvent error', e); } }, getIcCardEvents: async (cardId) => { const id = String(cardId || '').trim(); try { return cache.icCardEvents.filter((item) => String(item?.card_id || '').trim() === id); } catch (e) { console.error('getIcCardEvents error', e); return []; } }, // Logs (Async Read, Append Write) appendLog: async (entry) => { try { await pool.query('INSERT INTO logs (data) VALUES (?)', [JSON.stringify(entry)]); } catch(e) { console.error('appendLog error', e); } }, readLogs: async ({ max = 200, category, type, q, since, until } = {}) => { try { const limit = Math.min(5000, Math.max(1, Number(max) || 200)); const fetchLimit = Math.min(20000, Math.max(limit, limit * 10)); const [rows] = await pool.query('SELECT data FROM logs ORDER BY id DESC LIMIT ?', [fetchLimit]); let list = rows.map(r => r.data); const cat = (category == null) ? '' : String(category).trim().toLowerCase(); if (cat) list = list.filter(x => String(x?.category || '').trim().toLowerCase() === cat); const typeRaw = (type == null) ? '' : String(type).trim(); if (typeRaw) { const typeList = typeRaw.split(',').map(s => s.trim()).filter(Boolean).map(s => s.toLowerCase()); list = list.filter(x => typeList.includes(String(x?.type || '').trim().toLowerCase())); } const qRaw = (q == null) ? '' : String(q).trim().toLowerCase(); if (qRaw) { list = list.filter(x => { try { return JSON.stringify(x || {}).toLowerCase().includes(qRaw); } catch (e) { return false; } }); } const toTs = (v) => { if (v == null) return null; if (typeof v === 'number' && Number.isFinite(v)) return v; const s = String(v).trim(); if (!s) return null; const n = Number(s); if (Number.isFinite(n)) return n; const d = Date.parse(s); return Number.isFinite(d) ? d : null; }; const sinceTs = toTs(since); const untilTs = toTs(until); if (sinceTs != null || untilTs != null) { list = list.filter(x => { const d = Date.parse(String(x?.ts || '')); if (!Number.isFinite(d)) return false; if (sinceTs != null && d < sinceTs) return false; if (untilTs != null && d > untilTs) return false; return true; }); } return list.slice(0, limit); } catch (e) { return []; } }, readLastLogs: async function (max = 200) { return this.readLogs({ max }); }, // Ticket Events (Async Read, Append Write) appendTicketEvent: async (ev) => { try { await pool.query('INSERT INTO ticket_events (data) VALUES (?)', [JSON.stringify(ev)]); } catch(e) { console.error('appendTicketEvent error', e); } }, readAllTicketEvents: async () => { // Changed to async! try { const [rows] = await pool.query('SELECT data FROM ticket_events ORDER BY id ASC'); return rows.map(r => r.data); } catch(e) { return []; } }, // Optimized method for filtering by ticket_id (if I update callers) getTicketEvents: async (ticketId) => { try { // We can't easily filter JSON in WHERE clause efficiently without generated columns. // But for small scale it's fine. Or fetch all and filter in app (like original). // `SELECT data FROM ticket_events WHERE data->>"$.ticket_id" = ?` const [rows] = await pool.query('SELECT data FROM ticket_events WHERE data->"$.ticket_id" = ? ORDER BY id ASC', [ticketId]); return rows.map(r => r.data); } catch(e) { return []; } }, // Ticket Index getTicketIndex: () => cache.ticketIndex, saveTicketIndex: async (idx) => { // Same issue as orderIndex. Heavy. const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query('DELETE FROM ticket_index'); const entries = Object.entries(idx); if (entries.length > 0) { for (let i = 0; i < entries.length; i += 100) { const batch = entries.slice(i, i + 100); const q = `INSERT INTO ticket_index (ticket_id, data, last_update_ts) VALUES ${batch.map(()=>'(?,?,?)').join(',')}`; const params = batch.flatMap(([k,v]) => [k, JSON.stringify(v), v.last_update_ts || 0]); await conn.query(q, params); } } await conn.commit(); cache.ticketIndex = idx; } catch (e) { await conn.rollback(); console.error('saveTicketIndex error', e); throw e; } finally { conn.release(); } }, upsertTicketIndex: async (update) => { const id = String(update.ticket_id || '').trim(); if (!id) return; const cur = cache.ticketIndex[id] || {}; const merged = { ...cur, ...update, last_update_ts: Date.now() }; cache.ticketIndex[id] = merged; // Efficient single update try { await pool.query('INSERT INTO ticket_index (ticket_id, data, last_update_ts) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE data = ?, last_update_ts = ?', [id, JSON.stringify(merged), merged.last_update_ts, JSON.stringify(merged), merged.last_update_ts]); } catch(e) { console.error('upsertTicketIndex error', e); } return merged; }, // Stats appendStatTicket: async (item) => { cache.statsTicket.push(item); try { await pool.query('INSERT INTO stats_ticket (data) VALUES (?)', [JSON.stringify(item)]); } catch(e) { console.error('appendStatTicket error', e); } }, getStatsTicket: () => cache.statsTicket, appendStatGate: async (item) => { cache.statsGate.push(item); try { await pool.query('INSERT INTO stats_gate (data) VALUES (?)', [JSON.stringify(item)]); } catch(e) { console.error('appendStatGate error', e); } }, getStatsGate: () => cache.statsGate, // Export buildExportPayload: () => ({ config: DataService.getConfig(), stations: DataService.getStations(), lines: DataService.getLines(), fares: DataService.getFares(), stats_ticket: DataService.getStatsTicket(), stats_gate: DataService.getStatsGate(), }), saveExport: () => { try { fs.writeFileSync(paths.export, JSON.stringify(DataService.buildExportPayload(), null, 2)); } catch(e){} } }; module.exports = DataService;