-- Chat schema for 1:1 and group conversations (Supabase-friendly) -- Tables: chat_conversations, chat_participants, chat_messages, chat_notifications create table if not exists chat_conversations ( id uuid primary key default gen_random_uuid(), title text, is_group boolean not null default false, owner_id uuid references public.ak_users(id) on delete set null, last_message_at timestamptz, metadata jsonb, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); create table if not exists chat_participants ( id uuid primary key default gen_random_uuid(), conversation_id uuid not null references chat_conversations(id) on delete cascade, user_id uuid not null references public.ak_users(id) on delete cascade, role text not null default 'member' check (role in ('owner','admin','member')), joined_at timestamptz not null default now(), last_read_at timestamptz, is_muted boolean not null default false, settings jsonb, created_at timestamptz not null default now(), updated_at timestamptz not null default now(), unique(conversation_id, user_id) ); create table if not exists chat_messages ( id uuid primary key default gen_random_uuid(), conversation_id uuid not null references chat_conversations(id) on delete cascade, sender_id uuid not null references public.ak_users(id) on delete cascade, content text not null, content_type text not null default 'text' check (content_type in ('text','markdown','json','image','file','audio')), reply_to uuid references chat_messages(id) on delete set null, metadata jsonb, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); create index if not exists idx_chat_messages_conv_time on chat_messages(conversation_id, created_at desc); create index if not exists idx_chat_participants_conv on chat_participants(conversation_id); create index if not exists idx_chat_participants_user on chat_participants(user_id); create index if not exists idx_chat_notifications_user_time on chat_notifications(user_id, created_at desc); create table if not exists chat_notifications ( id uuid primary key default gen_random_uuid(), user_id uuid not null references public.ak_users(id) on delete cascade, conversation_id uuid references chat_conversations(id) on delete cascade, message_id uuid references chat_messages(id) on delete cascade, type text not null default 'message', is_read boolean not null default false, created_at timestamptz not null default now() ); -- updated_at triggers create or replace function public.set_updated_at() returns trigger language plpgsql as $$ begin new.updated_at = now(); return new; end $$; drop trigger if exists trg_chat_conversations_updated on chat_conversations; create trigger trg_chat_conversations_updated before update on chat_conversations for each row execute function public.set_updated_at(); drop trigger if exists trg_chat_participants_updated on chat_participants; create trigger trg_chat_participants_updated before update on chat_participants for each row execute function public.set_updated_at(); drop trigger if exists trg_chat_messages_updated on chat_messages; create trigger trg_chat_messages_updated before update on chat_messages for each row execute function public.set_updated_at(); -- On new message: update conversation last_message_at and create notifications for participants except sender create or replace function public.chat_on_message_insert() returns trigger language plpgsql as $$ begin -- update last_message_at update chat_conversations set last_message_at = new.created_at where id = new.conversation_id; -- insert notifications for all participants except sender insert into chat_notifications(user_id, conversation_id, message_id, type, is_read, created_at) select p.user_id, new.conversation_id, new.id, 'message', false, now() from chat_participants p where p.conversation_id = new.conversation_id and p.user_id <> new.sender_id; return new; end $$; drop trigger if exists trg_chat_messages_after_insert on chat_messages; create trigger trg_chat_messages_after_insert after insert on chat_messages for each row execute function public.chat_on_message_insert(); -- RLS (simplified) alter table chat_conversations enable row level security; alter table chat_participants enable row level security; alter table chat_messages enable row level security; alter table chat_notifications enable row level security; -- Helper to check whether a user is owner/admin of a conversation without triggering RLS recursion create or replace function public.chat_user_is_manager(p_conversation_id uuid, p_user_id uuid) returns boolean language sql security definer set search_path = public as $$ select exists ( select 1 from chat_participants where conversation_id = p_conversation_id and user_id = p_user_id and role in ('owner', 'admin') ); $$; revoke all on function public.chat_user_is_manager(uuid, uuid) from public; grant execute on function public.chat_user_is_manager(uuid, uuid) to authenticated; -- Policies: users can access conversations they participate in create policy chat_conversations_select on chat_conversations for select to authenticated using ( exists (select 1 from chat_participants p where p.conversation_id = chat_conversations.id and p.user_id = auth.uid()) ); create policy chat_conversations_insert on chat_conversations for insert to authenticated with check (true); create policy chat_conversations_update on chat_conversations for update to authenticated using ( public.chat_user_is_manager(chat_conversations.id, auth.uid()) ) with check ( public.chat_user_is_manager(chat_conversations.id, auth.uid()) ); -- Split policies for clarity: SELECT/UPDATE/DELETE cover members and admins; INSERT allows admins to add others drop policy if exists chat_participants_all on chat_participants; create policy chat_participants_select on chat_participants for select to authenticated using ( user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid()) ); create policy chat_participants_update on chat_participants for update to authenticated using ( user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid()) ) with check ( user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid()) ); create policy chat_participants_delete on chat_participants for delete to authenticated using ( user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid()) ); create policy chat_participants_insert on chat_participants for insert to authenticated with check ( -- allow user to add self OR owners/admins to add others user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid()) ); create policy chat_messages_select on chat_messages for select to authenticated using ( exists (select 1 from chat_participants p where p.conversation_id = chat_messages.conversation_id and p.user_id = auth.uid()) ); create policy chat_messages_insert on chat_messages for insert to authenticated with check ( exists (select 1 from chat_participants p where p.conversation_id = chat_messages.conversation_id and p.user_id = auth.uid()) ); create policy chat_notifications_select on chat_notifications for select to authenticated using (user_id = auth.uid()); create policy chat_notifications_update on chat_notifications for update to authenticated using (user_id = auth.uid()) with check (user_id = auth.uid()); -- --------------------------------------------------------------------------- -- Chat MQTT downlinks: queue for device/edge commands tied to conversations -- --------------------------------------------------------------------------- create table if not exists chat_mqtt_downlinks ( id uuid primary key default gen_random_uuid(), conversation_id uuid not null references chat_conversations(id) on delete cascade, -- optional: specific participant user as target (用于映射设备/用户) target_user_id uuid references public.ak_users(id) on delete set null, -- 可选:显式下发表的 topic,未提供时由网关按约定推导(如 device/{user_id}/down) topic text, -- 载荷与编码:utf8/base64/json payload text not null, payload_encoding text not null default 'utf8' check (payload_encoding in ('utf8','base64','json')), qos smallint not null default 1 check (qos in (0,1,2)), retain boolean not null default false, -- 生命周期 status text not null default 'pending' check (status in ('pending','sent','acked','failed','expired','canceled')), scheduled_at timestamptz not null default now(), expires_at timestamptz, sent_at timestamptz, ack_at timestamptz, retry_count integer not null default 0, last_error text, metadata jsonb, -- 关联/审计 correlation_id uuid, created_by uuid not null references public.ak_users(id) on delete cascade, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); create index if not exists idx_chat_mqtt_downlinks_conv_status_time on chat_mqtt_downlinks(conversation_id, status, scheduled_at asc); create index if not exists idx_chat_mqtt_downlinks_creator_time on chat_mqtt_downlinks(created_by, created_at desc); create index if not exists idx_chat_mqtt_downlinks_topic on chat_mqtt_downlinks(topic); drop trigger if exists trg_chat_mqtt_downlinks_updated on chat_mqtt_downlinks; create trigger trg_chat_mqtt_downlinks_updated before update on chat_mqtt_downlinks for each row execute function public.set_updated_at(); alter table chat_mqtt_downlinks enable row level security; -- 下发表的访问控制: -- 1) 参与该会话的用户可读取对应 conversation 的下发记录; -- 2) 插入:必须是会话参与者,且 created_by = auth.uid() -- 3) 更新/删除:消息创建者,或会话 owner/admin 可操作(用于重试、取消、标记 ACK 等) create policy chat_mqtt_downlinks_select on chat_mqtt_downlinks for select to authenticated using ( exists ( select 1 from chat_participants p where p.conversation_id = chat_mqtt_downlinks.conversation_id and p.user_id = auth.uid() ) ); create policy chat_mqtt_downlinks_insert on chat_mqtt_downlinks for insert to authenticated with check ( created_by = auth.uid() and exists ( select 1 from chat_participants p where p.conversation_id = chat_mqtt_downlinks.conversation_id and p.user_id = auth.uid() ) ); create policy chat_mqtt_downlinks_update on chat_mqtt_downlinks for update to authenticated using ( created_by = auth.uid() or exists ( select 1 from chat_participants p where p.conversation_id = chat_mqtt_downlinks.conversation_id and p.user_id = auth.uid() and p.role in ('owner','admin') ) ) with check ( created_by = auth.uid() or exists ( select 1 from chat_participants p where p.conversation_id = chat_mqtt_downlinks.conversation_id and p.user_id = auth.uid() and p.role in ('owner','admin') ) ); create policy chat_mqtt_downlinks_delete on chat_mqtt_downlinks for delete to authenticated using ( created_by = auth.uid() or exists ( select 1 from chat_participants p where p.conversation_id = chat_mqtt_downlinks.conversation_id and p.user_id = auth.uid() and p.role in ('owner','admin') ) ); -- 可选:待发送视图,供后台/报表快速查询 create or replace view chat_mqtt_downlinks_pending as select * from chat_mqtt_downlinks where status = 'pending' and scheduled_at <= now(); -- --------------------------------------------------------------------------- -- Gateway reporting (chat_gateway_nodes + chat_gateway_heartbeats) -- --------------------------------------------------------------------------- -- Nodes registry: one row per running gateway clientId create table if not exists chat_gateway_nodes ( id uuid primary key default gen_random_uuid(), name text not null, mqtt_client_id text not null unique, version text, region text, tags jsonb, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); drop trigger if exists trg_chat_gateway_nodes_updated on chat_gateway_nodes; create trigger trg_chat_gateway_nodes_updated before update on chat_gateway_nodes for each row execute function public.set_updated_at(); alter table chat_gateway_nodes enable row level security; -- Read-all for authenticated (dashboard/ops) create policy chat_gateway_nodes_select on chat_gateway_nodes for select to authenticated using (true); -- Heartbeats: periodic metrics from gateway create table if not exists chat_gateway_heartbeats ( id uuid primary key default gen_random_uuid(), gateway_id uuid not null references chat_gateway_nodes(id) on delete cascade, created_at timestamptz not null default now(), uptime_sec integer, mem_rss_mb integer, heap_used_mb integer, mqtt_connected boolean, kafka_connected boolean, redis_connected boolean, msgs_in integer, msgs_out integer, msgs_dropped integer, errors integer, acl_denied integer, kafka_produced integer, extra jsonb ); create index if not exists idx_chat_gateway_heartbeats_gateway_time on chat_gateway_heartbeats(gateway_id, created_at desc); alter table chat_gateway_heartbeats enable row level security; -- Read-all for authenticated (dashboard/ops) create policy chat_gateway_heartbeats_select on chat_gateway_heartbeats for select to authenticated using (true); -- Latest status per gateway (view) create or replace view chat_gateway_status_latest as with last as ( select distinct on (gateway_id) gateway_id, id as heartbeat_id, created_at, uptime_sec, mem_rss_mb, heap_used_mb, mqtt_connected, kafka_connected, redis_connected, msgs_in, msgs_out, msgs_dropped, errors, acl_denied, kafka_produced, extra from chat_gateway_heartbeats order by gateway_id, created_at desc ) select l.*, n.name, n.mqtt_client_id, n.version, n.region, n.tags, n.updated_at as node_updated_at from last l join chat_gateway_nodes n on n.id = l.gateway_id;