import supa, { supaReady } from '@/components/supadb/aksupainstance.uts' import AkSupaRealtime from '@/components/supadb/aksuparealtime.uts' import { WS_URL, SUPA_KEY } from '@/ak/config.uts' export type GatewayNode = { id: string name: string mqtt_client_id: string version: string | null region: string | null tags: string | null updated_at: string } export type GatewayRealtimeHandlers = { onInitial: (nodes: GatewayNode[]) => void onInsert: (node: GatewayNode) => void onUpdate: (node: GatewayNode) => void onDelete: (id: string) => void onError: (err: any) => void } export type GatewayRealtimeSubscription = { dispose: () => void } type SubscriptionGuard = { active: boolean } type ActiveSubscription = { handlers: GatewayRealtimeHandlers guard: SubscriptionGuard } async function ensureSupaSession(): Promise { try { await supaReady } catch (err) { console.error('Supabase session initialization error', err) } } function toGatewayNode(row: UTSJSONObject): GatewayNode | null { try { const id = row.getString('id') const name = row.getString('name') const mqttClientId = row.getString('mqtt_client_id') const updatedAt = row.getString('updated_at') if (id == null || name == null || mqttClientId == null || updatedAt == null) { return null } return { id: id, name: name, mqtt_client_id: mqttClientId, version: row.getString('version'), region: row.getString('region'), tags: JSON.stringify(row.get('tags')), // Simple stringify for display updated_at: updatedAt } } catch (e) { console.error('toGatewayNode error', e) return null } } function extractRecord(payload: any): UTSJSONObject | null { try { if (payload == null) return null const wrapper = payload as UTSJSONObject const directNew = wrapper.get('new') as UTSJSONObject | null if (directNew != null) return directNew const record = wrapper.get('record') as UTSJSONObject | null if (record != null) return record return null } catch (err) { console.error('extractRecord error', err) } return null } function extractOldRecord(payload: any): UTSJSONObject | null { try { if (payload == null) return null const wrapper = payload as UTSJSONObject const directOld = wrapper.get('old') as UTSJSONObject | null if (directOld != null) return directOld const oldRecord = wrapper.get('old_record') as UTSJSONObject | null if (oldRecord != null) return oldRecord return null } catch (err) { console.error('extractOldRecord error', err) } return null } export class GatewayRealtimeService { private static rt: AkSupaRealtime | null = null private static initializing: Promise | null = null private static initialized: boolean = false private static subscriptions: Array = [] private static reconnectAttempts: number = 0 private static reconnectTimer: number | null = null private static reconnecting: boolean = false private static async waitForRealtimeOpen(timeoutMs: number = 5000): Promise { const start = Date.now() while (true) { if (this.rt != null && this.rt.isOpen == true) { return } if (Date.now() - start > timeoutMs) { throw new Error('Realtime socket not ready') } await new Promise((resolve) => { setTimeout(() => resolve(), 50) }) } } private static clearReconnectTimer(): void { const timer = this.reconnectTimer if (timer != null) { clearTimeout(timer) this.reconnectTimer = null } } private static handleSocketClose(origin: string | null): void { this.initialized = false this.rt = null if (this.subscriptions.length == 0) { this.clearReconnectTimer() this.reconnectAttempts = 0 return } if (this.reconnecting == true) { return } console.warn('[GatewayRealtimeService] realtime closed, scheduling reconnect', origin ?? 'unknown') this.scheduleReconnect() } private static async scheduleReconnect(): Promise { if (this.reconnectTimer != null) return if (this.subscriptions.every((sub) => sub.guard.active !== true)) { this.subscriptions = [] this.reconnectAttempts = 0 return } const attempt = this.reconnectAttempts + 1 this.reconnectAttempts = attempt const baseDelay = Math.min(Math.pow(2, attempt - 1) * 1000, 10000) const jitter = Math.floor(Math.random() * 200) const delay = baseDelay + jitter console.log('[GatewayRealtimeService] reconnect scheduled in', delay, 'ms (attempt', attempt, ')') this.reconnectTimer = setTimeout(() => { const runner = async () => { this.reconnectTimer = null if (this.subscriptions.every((sub) => sub.guard.active !== true)) { this.subscriptions = [] this.reconnectAttempts = 0 return } this.reconnecting = true try { await this.ensureRealtime() await this.waitForRealtimeOpen(3000) this.reconnectAttempts = 0 await this.resubscribeAll() } catch (err) { console.error('reconnect attempt failed', err) this.scheduleReconnect() } finally { this.reconnecting = false } } runner().catch((err) => { console.error('reconnect timer runner error', err) }) }, delay) } private static async resubscribeAll(): Promise { let hadFailure = false for (let i = 0; i < this.subscriptions.length; i++) { const record = this.subscriptions[i] if (record.guard.active !== true) continue try { await this.performSubscription(record) } catch (err) { console.error('resubscribe error', err) record.handlers?.onError?.(err) hadFailure = true } } if (hadFailure) { this.scheduleReconnect() } } private static removeSubscription(record: ActiveSubscription): void { const index = this.subscriptions.indexOf(record) if (index >= 0) { this.subscriptions.splice(index, 1) } if (this.subscriptions.length == 0) { this.clearReconnectTimer() this.reconnectAttempts = 0 } } private static async performSubscription(record: ActiveSubscription): Promise { if (record.guard.active !== true) return const handlers = record.handlers const maxAttempts = 2 let attempt = 0 // 1. Fetch Initial Data try { await ensureSupaSession() const { data, error } = await supa.from('chat_gateway_nodes').select('*').order('name', { ascending: true }) if (error != null) { throw new Error(error.message) } const nodes: GatewayNode[] = [] if (data != null) { const list = data as UTSArray for (let i = 0; i < list.length; i++) { const node = toGatewayNode(list[i]) if (node != null) { nodes.push(node) } } } if (record.guard.active) { handlers.onInitial(nodes) } } catch (fetchErr) { console.error('Initial fetch failed', fetchErr) handlers.onError(fetchErr) } // 2. Subscribe to Realtime while (attempt < maxAttempts) { attempt += 1 if (record.guard.active !== true) return try { if (this.rt == null || this.rt.isOpen !== true) { await this.ensureRealtime() await this.waitForRealtimeOpen(2000) } const realtime = this.rt if (realtime == null) { throw new Error('Realtime client not initialized') } console.log('[GatewayRealtimeService] subscribing to chat_gateway_nodes') realtime.subscribePostgresChanges({ event: '*', schema: 'public', table: 'chat_gateway_nodes', filter: 'mqtt_client_id=neq.null', topic: 'realtime:gateway_nodes', onChange: (payload: any) => { if (record.guard.active !== true) return const wrapper = payload as UTSJSONObject let eventType = wrapper.getString('type') if (eventType == null) { eventType = wrapper.getString('eventType') } if (eventType == 'INSERT') { const row = extractRecord(payload) if (row != null) { const node = toGatewayNode(row) if (node != null) handlers.onInsert(node) } } else if (eventType == 'UPDATE') { const row = extractRecord(payload) if (row != null) { const node = toGatewayNode(row) if (node != null) handlers.onUpdate(node) } } else if (eventType == 'DELETE') { const oldRow = extractOldRecord(payload) if (oldRow != null) { const id = oldRow.getString('id') if (id != null) handlers.onDelete(id) } } } }) console.log('[GatewayRealtimeService] subscription ready') return } catch (subscribeErr) { console.error('performSubscription error', subscribeErr, 'attempt', attempt) if (attempt >= maxAttempts) { throw subscribeErr } await new Promise((resolve) => { setTimeout(() => resolve(), 200) }) } } } private static async ensureRealtime(): Promise { if (this.rt != null && this.rt.isOpen == true) { this.initialized = true return } if (this.initializing != null) { await this.initializing return } await ensureSupaSession() let token: string | null = null try { const session = supa.getSession() if (session != null) { token = session.session?.access_token ?? null } } catch (_) { } let resolveReady: ((value: void) => void) | null = null let rejectReady: ((reason: any) => void) | null = null const readyPromise = new Promise((resolve, reject) => { resolveReady = resolve as (value: void) => void rejectReady = reject }) this.initializing = readyPromise const realtime = new AkSupaRealtime({ url: WS_URL, channel: 'realtime:gateway_nodes', apikey: SUPA_KEY, token: token, onMessage: (_data: any) => { }, onOpen: (_res: any) => { this.initialized = true this.reconnectAttempts = 0 this.clearReconnectTimer() this.reconnecting = false const resolver = resolveReady resolveReady = null rejectReady = null this.initializing = null resolver?.() }, onError: (err: any) => { if (resolveReady != null) { const rejector = rejectReady resolveReady = null rejectReady = null this.initializing = null rejector?.(err) } }, onClose: (_res: any) => { if (resolveReady != null) { const rejector = rejectReady resolveReady = null rejectReady = null this.initializing = null rejector?.(new Error('Realtime connection closed before ready')) } else { this.handleSocketClose('close') } } }) this.rt = realtime this.rt?.connect() try { await readyPromise } finally { this.initializing = null } } static async subscribeGateways(handlers: GatewayRealtimeHandlers): Promise { const guard: SubscriptionGuard = { active: true } const record: ActiveSubscription = { handlers: handlers, guard: guard } this.subscriptions.push(record) try { await this.performSubscription(record) } catch (err) { console.error('subscribeGateways failed', err) guard.active = false this.removeSubscription(record) handlers.onError(err) return { dispose: () => { } } } return { dispose: () => { if (guard.active !== true) return guard.active = false this.removeSubscription(record) } } } static closeRealtime(): void { try { if (this.rt != null) { this.rt.close({ code: 1000, reason: 'manual close' }) } } catch (err) { console.error('closeRealtime error', err) } finally { this.rt = null this.initialized = false this.initializing = null this.reconnecting = false this.clearReconnectTimer() this.reconnectAttempts = 0 } } } export default GatewayRealtimeService