import supa, { supaReady } from '@/components/supadb/aksupainstance.uts' import AkSupaRealtime from '@/components/supadb/aksuparealtime.uts' import { WS_URL, SUPA_KEY } from '@/ak/config.uts' import type { AkReqResponse } from '@/uni_modules/ak-req/index.uts' import MediaCacheService from '@/utils/mediaCacheService.uts' export type AudioErrorEvent = { errCode ?: number errMsg ?: string } export type ChatAudioContext = { src : string autoplay : boolean play ?: () => void pause ?: () => void stop ?: () => void destroy ?: () => void onError ?: (callback : (res : AudioErrorEvent) => void) => void } // Helper describing the subset of inner audio context api we use export type ViewMessage = { id : string conversation_id : string sender_id : string content : string content_type : string reply_to : string | null metadata : UTSJSONObject | null created_at : string updated_at : string ingress_type ?: string | null durationSeconds ?: number; locationTitle ?: string; videoSrc : string; timeText : string; avatar : string; senderName ?: string | null; isMe : boolean; bubbleExtraClass : string; rowExtraClass : string; anchorId : string; bubbleInlineStyle : string; ingressType ?: string | null; ingressLabel : string; }; export type MemberProfile = { id : string, name : string, avatar ?: string | null, initials : string } export type ChatConversationView = { id : string title : string | null is_group : boolean owner_id : string | null last_message_at : string | null metadata : UTSJSONObject | null created_at : string updated_at : string memberNames ?: string; members ?: Array; avatarMembers ?: Array; unreadCount ?: number; } export type ChatConversation = { id : string title : string | null is_group : boolean owner_id : string | null last_message_at : string | null metadata : UTSJSONObject | null created_at : string updated_at : string } export type SendDeviceTextDownlinkParams = { conversationId : string createdBy : string text : string targetUserId ?: string | null qos ?: number | null retain ?: boolean | null } export type ChatMessage = { id : string conversation_id : string sender_id : string content : string content_type : string reply_to : string | null metadata : UTSJSONObject | null created_at : string updated_at : string ingress_type ?: string | null sender_profile ?: UserOption | null } export type ChatNotification = { id : string user_id : string conversation_id : string | null message_id : string | null type : string is_read : boolean created_at : string } export type ChatParticipant = { id : string conversation_id : string user_id : string role : string joined_at : string last_read_at : string | null is_muted : boolean settings : UTSJSONObject | null created_at : string updated_at : string } export type SendAudioMessageParams = { conversationId : string senderId : string s3Url : string durationMs ?: number | null mime ?: string | null sizeBytes ?: number | null } export type ChatParticipantWithProfile = { id : string conversation_id : string user_id : string role : string joined_at : string last_read_at : string | null is_muted : boolean settings : UTSJSONObject | null created_at : string updated_at : string user ?: UserOption | null } export type SubscriptionDisposer = { dispose : () => void } export type UserOption = { id : string username ?: string | null nickname ?: string | null real_name ?: string | null email ?: string | null avatar_url ?: string | null phone ?: string | null } export type CreateConversationPayload = { title ?: string | null isGroup ?: boolean memberIds ?: Array } async function ensureSupaSession() { try { const ok = await supaReady if (!ok) { console.warn('Supabase session not established; subsequent requests may fail') } } catch (err) { console.error('Supabase session initialization error', err) } } function extractRealtimeRecord(payload : any) : UTSJSONObject | null { try { if (payload == null) return null const wrapper = payload as UTSJSONObject const directNew = wrapper.get('new') as UTSJSONObject | null if (directNew != null) return directNew const directRecord = wrapper.get('record') as UTSJSONObject | null if (directRecord != null) return directRecord const dataSection = wrapper.get('data') as UTSJSONObject | null if (dataSection != null) { const dataRecord = dataSection.get('record') as UTSJSONObject | null if (dataRecord != null) return dataRecord const dataNew = dataSection.get('new') as UTSJSONObject | null if (dataNew != null) return dataNew } } catch (_) { } return null } function toChatMessage(row : UTSJSONObject, senderProfile : UserOption | null) : ChatMessage | null { try { const id = row.getString('id') as string | null const conversationId = row.getString('conversation_id') as string | null const senderId = row.getString('sender_id') as string | null const content = row.getString('content') as string | null const contentType = row.getString('content_type') as string | null const createdAt = row.getString('created_at') as string | null const updatedAt = row.getString('updated_at') as string | null if (id == null || conversationId == null || senderId == null || content == null || contentType == null || createdAt == null || updatedAt == null) { return null } const replyToRaw = row.get('reply_to') const replyTo = typeof replyToRaw == 'string' ? replyToRaw : null const metadata = row.get('metadata') as UTSJSONObject | null const ingressRaw = row.get('ingress_type') const ingressType = typeof ingressRaw == 'string' ? ingressRaw : null return { id: id, conversation_id: conversationId, sender_id: senderId, content: content, content_type: contentType, reply_to: replyTo, metadata: metadata ?? null, created_at: createdAt, updated_at: updatedAt, ingress_type: ingressType, sender_profile: senderProfile ?? null } } catch (_) { return null } } function toChatNotification(row : UTSJSONObject) : ChatNotification | null { try { const id = row.getString('id') as string | null const userId = row.getString('user_id') as string | null const typeValue = row.getString('type') as string | null const createdAt = row.getString('created_at') as string | null if (id == null || userId == null || typeValue == null || createdAt == null) { return null } const conversationIdRaw = row.get('conversation_id') const conversationId = typeof conversationIdRaw == 'string' ? conversationIdRaw : null const messageIdRaw = row.get('message_id') const messageId = typeof messageIdRaw == 'string' ? messageIdRaw : null const isReadRaw = row.get('is_read') const isRead = typeof isReadRaw == 'boolean' ? isReadRaw : false return { id: id, user_id: userId, conversation_id: conversationId, message_id: messageId, type: typeValue, is_read: isRead, created_at: createdAt } } catch (_) { return null } } export class ChatDataService { private static rt : AkSupaRealtime | null = null private static initialized = false private static realtimeReady : Promise | null = null private static lastRealtimeErrorToastAt = 0 private static senderProfileCache = new Map() private static notifyRealtimeError(err : any) { try { console.error('Realtime connection error', err) const now = Date.now() if (now - this.lastRealtimeErrorToastAt > 5000) { this.lastRealtimeErrorToastAt = now try { uni.showToast({ title: '实时通道连接失败', icon: 'none' }) } catch (_) { } } } catch (_) { } } private static getCachedSenderProfile(userId : string | null) : UserOption | null { if (typeof userId !== 'string' || userId == '') return null return this.senderProfileCache.get(userId) ?? null } private static async ensureSenderProfiles(userIds : Array) : Promise { const ids = Array.from(new Set(userIds.filter((id) => typeof id == 'string' && id !== ''))) if (ids.length == 0) return const missing : Array = [] for (let i = 0; i < ids.length; i++) { const id = ids[i] as string if (!this.senderProfileCache.has(id)) { missing.push(id) } } if (missing.length == 0) return await ensureSupaSession() try { const res = await supa .from('ak_users') .select('id, username, nickname, real_name, email, avatar_url, phone', {}) .in('id', missing as any[]) .executeAs() if (!(res.status >= 200 && res.status < 300) || res.data == null) return const arr = Array.isArray(res.data) ? (res.data as any[]) : [res.data] for (let i = 0; i < arr.length; i++) { const item = arr[i] as UTSJSONObject | null if (item == null) continue const id = item['id'] as string | null if (typeof id !== 'string' || id == '') continue const profile : UserOption = { id: id as string, username: item['username'] as string | null ?? null, nickname: item['nickname'] as string | null ?? null, real_name: item['real_name'] as string | null ?? null, email: item['email'] as string | null ?? null, avatar_url: item['avatar_url'] as string | null ?? null, phone: item['phone'] as string | null ?? null } this.senderProfileCache.set(id as string, profile) } } catch (err) { console.error('ensureSenderProfiles error', err) } } private static async ensureRealtime() { if (this.rt != null && this.rt.isOpen == true) { this.initialized = true return } if (this.realtimeReady != null) { await this.realtimeReady return } await ensureSupaSession() let token : string | null = null try { const session = supa.getSession() if (session != null) { token = session.session?.access_token } } catch (_) { } let resolveReady : (() => void) | null = null let rejectReady : ((reason : any) => void) | null = null const readyPromise = new Promise((resolve, reject) => { resolveReady = () => { resolve() } rejectReady = reject }) this.realtimeReady = readyPromise this.rt = new AkSupaRealtime({ url: WS_URL, channel: 'realtime:*', apikey: SUPA_KEY, token: token ?? null, onMessage: (_data : any) => { }, onOpen: (_res : any) => { this.initialized = true const resolver = resolveReady resolveReady = null rejectReady = null this.realtimeReady = null resolver?.() }, onError: (err : any) => { if (resolveReady != null) { const rejector = rejectReady resolveReady = null rejectReady = null this.realtimeReady = null rejector?.(err) } }, onClose: (_res : any) => { this.initialized = false this.rt = null if (resolveReady != null) { const rejector = rejectReady resolveReady = null rejectReady = null this.realtimeReady = null rejector?.(new Error('Realtime connection closed before ready')) } } }) this.rt.connect() const timeoutMs = 8000 const timeoutHandle = setTimeout(() => { if (resolveReady != null) { const rejector = rejectReady resolveReady = null rejectReady = null this.realtimeReady = null rejector?.(new Error('Realtime connection timeout')) } }, timeoutMs) try { await readyPromise } finally { clearTimeout(timeoutHandle) } } static async subscribeMessages(conversationId : string, onInsert : (msg : ChatMessage) => void) : Promise { try { await this.ensureRealtime() } catch (err) { this.notifyRealtimeError(err) this.closeRealtime() return { dispose: () => { } } } console.log('subscribeMessages',conversationId) const guard = { active: true } this.rt?.subscribePostgresChanges({ event: 'INSERT', schema: 'public', table: 'chat_messages', onChange: (payload) => { console.log('realtime payload',payload) try { if (guard.active != true || payload == null) return const newRow = extractRealtimeRecord(payload) if (newRow == null) { return } const convId = newRow.getString('conversation_id') as string | null if (convId != null && convId == conversationId) { const senderId = newRow.getString('sender_id') as string | null const emitMessage = (profile : UserOption | null) => { if (guard.active != true) return const message = toChatMessage(newRow, profile) if (message != null) onInsert(message) } if (senderId != null && senderId !== '') { const cachedProfile = this.getCachedSenderProfile(senderId) if (cachedProfile == null) { this.ensureSenderProfiles([senderId as string]).then(() => { if (guard.active != true) return const updatedProfile = this.getCachedSenderProfile(senderId) emitMessage(updatedProfile ?? null) }) return } emitMessage(cachedProfile) } else { emitMessage(null) } } } catch (err) { console.error('realtime payload handling error', err) } } }) return { dispose: () => { guard.active = false } } } static async subscribeNotifications(userId : string, onInsert : (n : ChatNotification) => void) : Promise { try { await this.ensureRealtime() } catch (err) { this.notifyRealtimeError(err) this.closeRealtime() return { dispose: () => { } } } const guard = { active: true } this.rt?.subscribePostgresChanges({ event: 'INSERT', schema: 'public', table: 'chat_notifications', onChange: (payload : any) => { try { const rec = extractRealtimeRecord(payload) if (guard.active != true || rec == null) return const userIdStr = rec.getString('user_id') as string | null if (userIdStr != null && userIdStr == userId) { const notification = toChatNotification(rec) if (notification != null) onInsert(notification) } } catch (_) { } } }) return { dispose: () => { guard.active = false } } } static closeRealtime() { try { this.rt?.close({}) } catch (_) { } this.rt = null this.initialized = false this.realtimeReady = null } static async listMyConversations(userId : string) : Promise>> { await ensureSupaSession() const res = await supa .from('chat_conversations') .select('*', {}) .order('last_message_at', { ascending: false }) .executeAs() return res as AkReqResponse> } static async getConversation(conversationId : string) : Promise> { await ensureSupaSession() const res = await supa .from('chat_conversations') .select('*', {}) .eq('id', conversationId) .single() .executeAs() return res as AkReqResponse } static async listMessages(conversationId : string, limit = 100) : Promise>> { await ensureSupaSession() console.log('listMessages') const res = await supa .from('chat_messages') .select('*', {}) .eq('conversation_id', conversationId) .order('created_at', { ascending: true }) .limit(limit) .executeAs() try { const raw = res.data const list : Array = [] if (Array.isArray(raw)) { for (const item of raw as any[]) { if (item != null) { list.push(item as UTSJSONObject) } } } else if (raw != null) { list.push(raw as UTSJSONObject) } const senderIds : Array = [] for (let i = 0; i < list.length; i++) { const item = list[i] let senderId : string | null = null if (item != null) { senderId = item.getString('sender_id') as string | null if (senderId == null) { senderId = item['sender_id'] as string | null } } if (typeof senderId == 'string' && senderId !== '') { senderIds.push(senderId!!) } } if (senderIds.length > 0) { await this.ensureSenderProfiles(senderIds) for (let j = 0; j < list.length; j++) { const item = list[j] if (item != null) { let senderId : string | null = item.getString('sender_id') as string | null if (senderId == null) { senderId = item['sender_id'] as string | null } if (typeof senderId == 'string' && senderId !== '') { const profile = this.getCachedSenderProfile(senderId) if (profile != null) { item['sender_profile'] = profile } } } } } } catch (err) { console.error('listMessages profile hydrate error', err) } console.log('listMessages ',res,conversationId) return res as AkReqResponse> } static async sendMessage(conversationId : string, senderId : string, content : string) : Promise> { await ensureSupaSession() const payload = { conversation_id: conversationId, sender_id: senderId, content, content_type: 'text', ingress_type: 'manual', created_at: new Date().toISOString(), updated_at: new Date().toISOString() } as UTSJSONObject const res = await supa .from('chat_messages') .insert(payload) .single() .executeAs() return res as AkReqResponse } static async listNotifications(userId : string, limit = 50) : Promise>> { await ensureSupaSession() const res = await supa .from('chat_notifications') .select('*', {}) .eq('user_id', userId) .order('created_at', { ascending: false }) .limit(limit) .executeAs() return res as AkReqResponse> } static async markConversationNotificationsRead(userId : string, conversationId : string) : Promise>> { await ensureSupaSession() const res = await supa .from('chat_notifications') .update({ is_read: true }) .eq('user_id', userId) .eq('conversation_id', conversationId) .eq('type', 'message') .eq('is_read', false) .select('id', {}) .executeAs() return res as AkReqResponse> } static async markNotificationRead(notificationId : string) : Promise>> { await ensureSupaSession() const res = await supa .from('chat_notifications') .update({ is_read: true }) .eq('id', notificationId) .eq('is_read', false) .select('id', {}) .executeAs() return res as AkReqResponse> } static async searchUsers(keyword : string, limit = 20) : Promise>> { await ensureSupaSession() let query = supa .from('ak_users') .select('id, username, nickname, email', {}) .limit(limit) const k = keyword.trim() if (k !== '') { query = query.or(`username.ilike.%${k}%,nickname.ilike.%${k}%,email.ilike.%${k}%`) } const res = await query.executeAs() return res as AkReqResponse> } // Create a conversation and add participants. The creator becomes owner. static async createConversation(ownerId : string, payload : CreateConversationPayload) : Promise> { await ensureSupaSession() const title = payload.title ?? null const isGroup = payload.isGroup ?? ((payload.memberIds?.length ?? 0) > 1) const memberIds = (payload.memberIds ?? []).filter((id) => id !== ownerId) // 1) create conversation const convRes = await supa .from('chat_conversations') .insert({ title, is_group: isGroup, owner_id: ownerId } as UTSJSONObject) .single() .executeAs() if (convRes.status != null && (convRes.status < 200 || convRes.status >= 300 || convRes.data == null)) { return convRes as AkReqResponse } const conv = convRes.data as ChatConversation // 2) insert owner participant first const ownerRow = { conversation_id: conv.id, user_id: ownerId, role: 'owner' } const ownerIns = await supa .from('chat_participants') .insert(ownerRow) .single() .executeAs() if (!(ownerIns.status >= 200 && ownerIns.status < 300)) { // return conversation as created but warn via status return convRes as AkReqResponse } // 3) insert other members (if any) in bulk if (memberIds.length > 0) { const rows = memberIds.map((uid) => ({ conversation_id: conv.id, user_id: uid, role: 'member' })) await supa.from('chat_participants').insert(rows as any[] as UTSJSONObject[]).executeAs() } return convRes as AkReqResponse } // Insert a downlink row to chat_mqtt_downlinks; gateway will publish to MQTT static async sendDeviceTextDownlink(params : SendDeviceTextDownlinkParams) : Promise> { await ensureSupaSession() const row = { conversation_id: params.conversationId, target_user_id: params.targetUserId ?? null, topic: null, // let gateway derive topic by target_user_id payload: params.text, payload_encoding: 'utf8', qos: (params.qos ?? 1), retain: (params.retain ?? false), status: 'pending', scheduled_at: new Date().toISOString(), created_by: params.createdBy } as UTSJSONObject; const res = await supa .from('chat_mqtt_downlinks') .insert(row) .single() .executeAs() return res } // For 1:1 conversation, derive the peer id (the other participant) static async getPeerId(conversationId : string, myId : string) : Promise { await ensureSupaSession() const res = await supa .from('chat_participants') .select('*', {}) .eq('conversation_id', conversationId) .order('joined_at', { ascending: true }) .limit(10) .executeAs() if (!(res.status >= 200 && res.status < 300) || res.data == null) return null const arr = res.data as any[] if (arr.length == 2) { const a = arr[0] as UTSJSONObject; const b = arr[1] as UTSJSONObject const aId = a['user_id'] as string; const bId = b['user_id'] as string return aId == myId ? bId : (bId == myId ? aId : null) } // not strict 1:1 return null } // Invite additional members to an existing conversation (owner/admin only) static async inviteMembers(conversationId : string, memberIds : Array) : Promise>> { await ensureSupaSession() const ids = Array.from(new Set(memberIds)) if (ids.length == 0) { // Simulate a 200 empty insert response return { status: 200, data: [] as ChatParticipant[], error: null, headers: {} } as AkReqResponse> } const rows = ids.map((uid) => ({ conversation_id: conversationId, user_id: uid, role: 'member' })) const res = await supa .from('chat_participants') .insert(rows as any[] as UTSJSONObject[]) .executeAs() return res as AkReqResponse> } static async listParticipants(conversationId : string) : Promise>> { await ensureSupaSession() const res = await supa .from('chat_participants') .select('*', {}) .eq('conversation_id', conversationId) .order('joined_at', { ascending: true }) .executeAs() return res as AkReqResponse> } static async listParticipantsWithProfile(conversationId : string) : Promise>> { await ensureSupaSession() const res = await supa .from('chat_participants') .select('id, conversation_id, user_id, role, joined_at, last_read_at, is_muted, settings, created_at, updated_at, user:ak_users(id, nickname, username, email, real_name, avatar_url,phone)', {}) .eq('conversation_id', conversationId) .order('joined_at', { ascending: true }) .executeAs() return res as AkReqResponse> } static async sendAudioMessage(params : SendAudioMessageParams) : Promise> { await ensureSupaSession() const metadata = { duration_ms: params.durationMs ?? null, mime: params.mime ?? 'audio/mpeg', size: params.sizeBytes ?? null } as UTSJSONObject const payload = { conversation_id: params.conversationId, sender_id: params.senderId, content: params.s3Url, content_type: 'audio', metadata, ingress_type: 'manual', created_at: new Date().toISOString(), updated_at: new Date().toISOString() } const res = await supa .from('chat_messages') .insert(payload) .single() .executeAs() return res as AkReqResponse } // Simple audio playback helper using inner audio context (uni-app API) static async playAudio(url : string) : Promise { try { const cached = await MediaCacheService.getCachedPath(url) console.log(cached,url) const audio = uni.createInnerAudioContext() audio.src = url audio.autoplay = true audio.onError((e:ICreateInnerAudioContextFail) => { console.log('audio error', e.errCode, e.errMsg) }) return audio } catch (e) { console.log('audio not supported', e) return null } } // Video caching helper: return a playable local path for