Files
akmon/components/supadb/aksuparealtime.uts
2026-01-20 08:04:15 +08:00

282 lines
9.3 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Postgres 变更订阅参数类型(强类型导出,便于 UTS Android 复用)
export type PostgresChangesSubscribeParams = {
event : string;
schema : string;
table : string;
filter ?: string;
topic ?: string;
onChange : (payload : any) => void;
};
type PostgresChangeListener = {
topic : string;
event : string;
schema : string;
table : string;
filter : string | null;
onChange : (payload : any) => void;
};
export type AkSupaRealtimeOptions = {
url : string; // ws/wss 地址
channel : string; // 订阅频道
token ?: string; // 可选鉴权token
apikey ?: string; // 可选supabase apikey
onMessage : (data : UTSJSONObject) => void;
onOpen ?: (res : any) => void;
onClose ?: (res : any) => void;
onError ?: (err : any) => void;
};
export class AkSupaRealtime {
ws : SocketTask | null = null;
options : AkSupaRealtimeOptions | null = null;
isOpen : boolean = false;
heartbeatTimer : any = 0;
joinedTopics : Set<string> = new Set<string>();
listeners : Array<PostgresChangeListener> = [];
constructor(options : AkSupaRealtimeOptions) {
this.options = options;
}
connect() {
const opts = this.options;
if (opts == null) return;
// 拼接 apikey 和 vsn=1.0.0 到 ws url
let wsUrl = opts.url;
// apikey 兼容 query 已有参数和无参数两种情况
if (opts.apikey != null && opts.apikey !== "") {
const hasQuery = wsUrl.indexOf('?') != -1;
// 移除已有 apikey 参数,避免重复
wsUrl = wsUrl.replace(/([&?])apikey=[^&]*/g, '$1').replace(/[?&]$/, '');
wsUrl += (hasQuery ? '&' : '?') + 'apikey=' + encodeURIComponent('' + opts.apikey);
}
if (wsUrl.indexOf('vsn=') == -1) {
wsUrl += (wsUrl.indexOf('?') == -1 ? '?' : '&') + 'vsn=1.0.0';
}
this.ws = uni.connectSocket({
url: wsUrl,
success: (res) => { console.log(res); },
fail: (err) => { if (opts.onError != null) opts.onError?.(err); }
});
if (this.ws != null) {
const wsTask = this.ws;
wsTask?.onOpen((result : OnSocketOpenCallbackResult) => {
this.isOpen = true;
console.log('onopen', result)
if (opts.onOpen != null) opts.onOpen?.(result);
// 启动 heartbeat 定时器
this.startHeartbeat();
});
wsTask?.onMessage((msg) => {
console.log(msg)
let data : UTSJSONObject | null = null;
try {
const msgData = (typeof msg == 'object' && msg.data !== null) ? msg.data : msg;
data = typeof msgData == 'string' ? JSON.parse(msgData) as UTSJSONObject : msgData as UTSJSONObject;
} catch (e) { }
// 处理 pong
if (
data != null &&
data.event == 'phx_reply' &&
typeof data.payload == 'object' &&
data.payload != null &&
(data.payload as UTSJSONObject).status != null &&
(data.payload as UTSJSONObject).status == 'ok' &&
(data.payload as UTSJSONObject).response != null &&
(data.payload as UTSJSONObject).response == 'heartbeat'
) {
// 收到 pong可用于续约
// 可选:重置定时器
}
console.log(data)
if (data != null) this.dispatchPostgresChange(data);
if (opts?.onMessage != null) opts.onMessage?.(data ?? ({} as UTSJSONObject));
});
wsTask?.onClose((res) => {
console.log('onclose', res)
this.isOpen = false;
this.joinedTopics.clear();
this.listeners = [];
if (opts.onClose != null) opts.onClose?.(res);
this.stopHeartbeat();
});
wsTask?.onError((err) => {
console.log(err)
if (opts.onError != null) opts.onError?.(err);
this.stopHeartbeat();
});
}
}
send(options : SendSocketMessageOptions) {
const wsTask = this.ws;
if (wsTask != null && this.isOpen) {
console.log('send:', options)
// 兼容 uni-app-x send API支持 success/fail 回调
// 只允许 SendSocketMessageOptions 类型,避免 UTSJSONObject 混用
let sendData : any = options.data;
// 若 data 不是字符串,自动序列化
if (typeof sendData !== 'string') {
sendData = JSON.stringify(sendData);
}
options.success ?? ((res) => {
if (typeof options.success == 'function') options.success?.(res)
})
options.fail ?? ((err : any) => {
console.log(err)
const opts = this.options;
if (opts != null && opts.onError != null) opts.onError?.(err);
})
wsTask.send(options);
}
}
close(options : CloseSocketOptions) {
this.ws?.close(options);
}
/**
* 订阅 Postgres 变更事件(如 INSERT/UPDATE/DELETE
* @param params { event: 'INSERT'|'UPDATE'|'DELETE', schema: string, table: string, onChange: (payload: any) => void }
*/
/**
* 订阅 Postgres 变更事件(如 INSERT/UPDATE/DELETE
* @param params { event: 'INSERT'|'UPDATE'|'DELETE', schema: string, table: string, onChange: (payload: any) => void }
*/
/**
* 订阅 Postgres 变更事件(如 INSERT/UPDATE/DELETE
* @param params { event: 'INSERT'|'UPDATE'|'DELETE', schema: string, table: string, onChange: (payload: any) => void }
*/
/**
* 订阅 Postgres 变更事件(如 INSERT/UPDATE/DELETE
* @param params { event: 'INSERT'|'UPDATE'|'DELETE', schema: string, table: string, onChange: (payload: any) => void }
*/
/**
* 订阅 Postgres 变更事件(如 INSERT/UPDATE/DELETE
* @param params { event: 'INSERT'|'UPDATE'|'DELETE', schema: string, table: string, onChange: (payload: any) => void }
*/
public subscribePostgresChanges(params : PostgresChangesSubscribeParams) : void {
const opts = this.options;
if (this.isOpen !== true || opts == null) {
throw new Error('WebSocket 未连接');
}
const topic = params.topic != null && params.topic !== '' ? params.topic : `realtime:${params.schema}:${params.table}`;
this.joinTopicIfNeeded(topic, params);
this.listeners.push({
topic: topic,
event: params.event,
schema: params.schema,
table: params.table,
filter: params.filter != null ? params.filter : null,
onChange: params.onChange
});
}
startHeartbeat() {
this.stopHeartbeat();
console.log('make heartbeat')
// 每 30 秒发送一次 heartbeat官方建议
this.heartbeatTimer = setInterval(() => {
console.log('should startHeartbeat')
if (this.isOpen && this.ws != null) {
const heartbeatMsg = {
topic: 'phoenix',
event: 'heartbeat',
payload: {},
ref: Date.now().toString()
};
this.send({ data: JSON.stringify(heartbeatMsg) });
}
}, 30000);
}
stopHeartbeat() {
console.log('stop heartbeat')
if (typeof this.heartbeatTimer == 'number' && this.heartbeatTimer > 0) {
clearInterval(this.heartbeatTimer as number);
this.heartbeatTimer = 0;
}
}
private joinTopicIfNeeded(topic : string, params : PostgresChangesSubscribeParams) {
if (topic == null || topic == '') return;
if (this.joinedTopics.has(topic)) return;
let changeConfig : any = null;
if (params.filter != null && params.filter !== '') {
changeConfig = {
event: params.event,
schema: params.schema,
table: params.table,
filter: params.filter
};
} else {
changeConfig = {
event: params.event,
schema: params.schema,
table: params.table
};
}
const joinMsg = {
event: 'phx_join',
payload: {
config: {
broadcast: { self: false, ack: false },
postgres_changes: [changeConfig],
presence: { key: '', enabled: false },
private: false
},
access_token: this.options != null && this.options.token != null ? this.options.token : null
},
ref: Date.now().toString(),
topic: topic
};
this.send({ data: JSON.stringify(joinMsg) });
this.joinedTopics.add(topic);
}
private dispatchPostgresChange(data : UTSJSONObject) : void {
if (data.event !== 'postgres_changes') return;
const topic = typeof data.topic == 'string' ? data.topic : '';
const payload = data.payload as UTSJSONObject | null;
if (payload == null) return;
const dataSection = payload.get('data') as UTSJSONObject | null;
let payloadEvent = payload.getString('event') as string | null;
if ((payloadEvent == null || payloadEvent == '') && dataSection != null) {
const typeValue = dataSection.getString('type') as string | null;
if (typeValue != null && typeValue !== '') payloadEvent = typeValue;
}
let schemaName = payload.getString('schema') as string | null;
if ((schemaName == null || schemaName == '') && dataSection != null) {
const dataSchema = dataSection.getString('schema') as string | null;
if (dataSchema != null && dataSchema !== '') schemaName = dataSchema;
}
let tableName = payload.getString('table') as string | null;
if ((tableName == null || tableName == '') && dataSection != null) {
const dataTable = dataSection.getString('table') as string | null;
if (dataTable != null && dataTable !== '') tableName = dataTable;
}
const filterValue = payload.getString('filter') as string | null;
for (let i = 0; i < this.listeners.length; i++) {
const listener = this.listeners[i];
if (listener.topic !== topic) continue;
if (listener.event !== '*' && payloadEvent != null && listener.event !== payloadEvent) continue;
if (schemaName != null && listener.schema !== schemaName) continue;
if (tableName != null && listener.table !== tableName) continue;
if (
listener.filter != null && listener.filter !== '' &&
filterValue != null && listener.filter !== filterValue
) continue;
if (typeof listener.onChange == 'function') {
const changeData = dataSection != null ? dataSection : payload;
listener.onChange(changeData);
}
}
}
}
export default AkSupaRealtime;