Files
akmon/utils/gatewayRealtimeService.uts
2026-01-20 08:04:15 +08:00

428 lines
12 KiB
Plaintext

import supa, { supaReady } from '@/components/supadb/aksupainstance.uts'
import AkSupaRealtime from '@/components/supadb/aksuparealtime.uts'
import { WS_URL, SUPA_KEY } from '@/ak/config.uts'
export type GatewayNode = {
id: string
name: string
mqtt_client_id: string
version: string | null
region: string | null
tags: string | null
updated_at: string
}
export type GatewayRealtimeHandlers = {
onInitial: (nodes: GatewayNode[]) => void
onInsert: (node: GatewayNode) => void
onUpdate: (node: GatewayNode) => void
onDelete: (id: string) => void
onError: (err: any) => void
}
export type GatewayRealtimeSubscription = {
dispose: () => void
}
type SubscriptionGuard = {
active: boolean
}
type ActiveSubscription = {
handlers: GatewayRealtimeHandlers
guard: SubscriptionGuard
}
async function ensureSupaSession(): Promise<void> {
try {
await supaReady
} catch (err) {
console.error('Supabase session initialization error', err)
}
}
function toGatewayNode(row: UTSJSONObject): GatewayNode | null {
try {
const id = row.getString('id')
const name = row.getString('name')
const mqttClientId = row.getString('mqtt_client_id')
const updatedAt = row.getString('updated_at')
if (id == null || name == null || mqttClientId == null || updatedAt == null) {
return null
}
return {
id: id,
name: name,
mqtt_client_id: mqttClientId,
version: row.getString('version'),
region: row.getString('region'),
tags: JSON.stringify(row.get('tags')), // Simple stringify for display
updated_at: updatedAt
}
} catch (e) {
console.error('toGatewayNode error', e)
return null
}
}
function extractRecord(payload: any): UTSJSONObject | null {
try {
if (payload == null) return null
const wrapper = payload as UTSJSONObject
const directNew = wrapper.get('new') as UTSJSONObject | null
if (directNew != null) return directNew
const record = wrapper.get('record') as UTSJSONObject | null
if (record != null) return record
return null
} catch (err) {
console.error('extractRecord error', err)
}
return null
}
function extractOldRecord(payload: any): UTSJSONObject | null {
try {
if (payload == null) return null
const wrapper = payload as UTSJSONObject
const directOld = wrapper.get('old') as UTSJSONObject | null
if (directOld != null) return directOld
const oldRecord = wrapper.get('old_record') as UTSJSONObject | null
if (oldRecord != null) return oldRecord
return null
} catch (err) {
console.error('extractOldRecord error', err)
}
return null
}
export class GatewayRealtimeService {
private static rt: AkSupaRealtime | null = null
private static initializing: Promise<void> | null = null
private static initialized: boolean = false
private static subscriptions: Array<ActiveSubscription> = []
private static reconnectAttempts: number = 0
private static reconnectTimer: number | null = null
private static reconnecting: boolean = false
private static async waitForRealtimeOpen(timeoutMs: number = 5000): Promise<void> {
const start = Date.now()
while (true) {
if (this.rt != null && this.rt.isOpen == true) {
return
}
if (Date.now() - start > timeoutMs) {
throw new Error('Realtime socket not ready')
}
await new Promise<void>((resolve) => {
setTimeout(() => resolve(), 50)
})
}
}
private static clearReconnectTimer(): void {
const timer = this.reconnectTimer
if (timer != null) {
clearTimeout(timer)
this.reconnectTimer = null
}
}
private static handleSocketClose(origin: string | null): void {
this.initialized = false
this.rt = null
if (this.subscriptions.length == 0) {
this.clearReconnectTimer()
this.reconnectAttempts = 0
return
}
if (this.reconnecting == true) {
return
}
console.warn('[GatewayRealtimeService] realtime closed, scheduling reconnect', origin ?? 'unknown')
this.scheduleReconnect()
}
private static async scheduleReconnect(): Promise<void> {
if (this.reconnectTimer != null) return
if (this.subscriptions.every((sub) => sub.guard.active !== true)) {
this.subscriptions = []
this.reconnectAttempts = 0
return
}
const attempt = this.reconnectAttempts + 1
this.reconnectAttempts = attempt
const baseDelay = Math.min(Math.pow(2, attempt - 1) * 1000, 10000)
const jitter = Math.floor(Math.random() * 200)
const delay = baseDelay + jitter
console.log('[GatewayRealtimeService] reconnect scheduled in', delay, 'ms (attempt', attempt, ')')
this.reconnectTimer = setTimeout(() => {
const runner = async () => {
this.reconnectTimer = null
if (this.subscriptions.every((sub) => sub.guard.active !== true)) {
this.subscriptions = []
this.reconnectAttempts = 0
return
}
this.reconnecting = true
try {
await this.ensureRealtime()
await this.waitForRealtimeOpen(3000)
this.reconnectAttempts = 0
await this.resubscribeAll()
} catch (err) {
console.error('reconnect attempt failed', err)
this.scheduleReconnect()
} finally {
this.reconnecting = false
}
}
runner().catch((err) => {
console.error('reconnect timer runner error', err)
})
}, delay)
}
private static async resubscribeAll(): Promise<void> {
let hadFailure = false
for (let i = 0; i < this.subscriptions.length; i++) {
const record = this.subscriptions[i]
if (record.guard.active !== true) continue
try {
await this.performSubscription(record)
} catch (err) {
console.error('resubscribe error', err)
record.handlers?.onError?.(err)
hadFailure = true
}
}
if (hadFailure) {
this.scheduleReconnect()
}
}
private static removeSubscription(record: ActiveSubscription): void {
const index = this.subscriptions.indexOf(record)
if (index >= 0) {
this.subscriptions.splice(index, 1)
}
if (this.subscriptions.length == 0) {
this.clearReconnectTimer()
this.reconnectAttempts = 0
}
}
private static async performSubscription(record: ActiveSubscription): Promise<void> {
if (record.guard.active !== true) return
const handlers = record.handlers
const maxAttempts = 2
let attempt = 0
// 1. Fetch Initial Data
try {
await ensureSupaSession()
const { data, error } = await supa.from('chat_gateway_nodes').select('*').order('name', { ascending: true })
if (error != null) {
throw new Error(error.message)
}
const nodes: GatewayNode[] = []
if (data != null) {
const list = data as UTSArray<UTSJSONObject>
for (let i = 0; i < list.length; i++) {
const node = toGatewayNode(list[i])
if (node != null) {
nodes.push(node)
}
}
}
if (record.guard.active) {
handlers.onInitial(nodes)
}
} catch (fetchErr) {
console.error('Initial fetch failed', fetchErr)
handlers.onError(fetchErr)
}
// 2. Subscribe to Realtime
while (attempt < maxAttempts) {
attempt += 1
if (record.guard.active !== true) return
try {
if (this.rt == null || this.rt.isOpen !== true) {
await this.ensureRealtime()
await this.waitForRealtimeOpen(2000)
}
const realtime = this.rt
if (realtime == null) {
throw new Error('Realtime client not initialized')
}
console.log('[GatewayRealtimeService] subscribing to chat_gateway_nodes')
realtime.subscribePostgresChanges({
event: '*',
schema: 'public',
table: 'chat_gateway_nodes',
filter: 'mqtt_client_id=neq.null',
topic: 'realtime:gateway_nodes',
onChange: (payload: any) => {
if (record.guard.active !== true) return
const wrapper = payload as UTSJSONObject
let eventType = wrapper.getString('type')
if (eventType == null) {
eventType = wrapper.getString('eventType')
}
if (eventType == 'INSERT') {
const row = extractRecord(payload)
if (row != null) {
const node = toGatewayNode(row)
if (node != null) handlers.onInsert(node)
}
} else if (eventType == 'UPDATE') {
const row = extractRecord(payload)
if (row != null) {
const node = toGatewayNode(row)
if (node != null) handlers.onUpdate(node)
}
} else if (eventType == 'DELETE') {
const oldRow = extractOldRecord(payload)
if (oldRow != null) {
const id = oldRow.getString('id')
if (id != null) handlers.onDelete(id)
}
}
}
})
console.log('[GatewayRealtimeService] subscription ready')
return
} catch (subscribeErr) {
console.error('performSubscription error', subscribeErr, 'attempt', attempt)
if (attempt >= maxAttempts) {
throw subscribeErr
}
await new Promise<void>((resolve) => {
setTimeout(() => resolve(), 200)
})
}
}
}
private static async ensureRealtime(): Promise<void> {
if (this.rt != null && this.rt.isOpen == true) {
this.initialized = true
return
}
if (this.initializing != null) {
await this.initializing
return
}
await ensureSupaSession()
let token: string | null = null
try {
const session = supa.getSession()
if (session != null) {
token = session.session?.access_token ?? null
}
} catch (_) { }
let resolveReady: ((value: void) => void) | null = null
let rejectReady: ((reason: any) => void) | null = null
const readyPromise = new Promise<void>((resolve, reject) => {
resolveReady = resolve as (value: void) => void
rejectReady = reject
})
this.initializing = readyPromise
const realtime = new AkSupaRealtime({
url: WS_URL,
channel: 'realtime:gateway_nodes',
apikey: SUPA_KEY,
token: token,
onMessage: (_data: any) => { },
onOpen: (_res: any) => {
this.initialized = true
this.reconnectAttempts = 0
this.clearReconnectTimer()
this.reconnecting = false
const resolver = resolveReady
resolveReady = null
rejectReady = null
this.initializing = null
resolver?.()
},
onError: (err: any) => {
if (resolveReady != null) {
const rejector = rejectReady
resolveReady = null
rejectReady = null
this.initializing = null
rejector?.(err)
}
},
onClose: (_res: any) => {
if (resolveReady != null) {
const rejector = rejectReady
resolveReady = null
rejectReady = null
this.initializing = null
rejector?.(new Error('Realtime connection closed before ready'))
} else {
this.handleSocketClose('close')
}
}
})
this.rt = realtime
this.rt?.connect()
try {
await readyPromise
} finally {
this.initializing = null
}
}
static async subscribeGateways(handlers: GatewayRealtimeHandlers): Promise<GatewayRealtimeSubscription> {
const guard: SubscriptionGuard = { active: true }
const record: ActiveSubscription = {
handlers: handlers,
guard: guard
}
this.subscriptions.push(record)
try {
await this.performSubscription(record)
} catch (err) {
console.error('subscribeGateways failed', err)
guard.active = false
this.removeSubscription(record)
handlers.onError(err)
return { dispose: () => { } }
}
return {
dispose: () => {
if (guard.active !== true) return
guard.active = false
this.removeSubscription(record)
}
}
}
static closeRealtime(): void {
try {
if (this.rt != null) {
this.rt.close({ code: 1000, reason: 'manual close' })
}
} catch (err) {
console.error('closeRealtime error', err)
} finally {
this.rt = null
this.initialized = false
this.initializing = null
this.reconnecting = false
this.clearReconnectTimer()
this.reconnectAttempts = 0
}
}
}
export default GatewayRealtimeService