Files
akmon/doc_chat/create_chat_tables.sql
2026-01-20 08:04:15 +08:00

319 lines
14 KiB
PL/PgSQL
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
-- Chat schema for 1:1 and group conversations (Supabase-friendly)
-- Tables: chat_conversations, chat_participants, chat_messages, chat_notifications
create table if not exists chat_conversations (
id uuid primary key default gen_random_uuid(),
title text,
is_group boolean not null default false,
owner_id uuid references public.ak_users(id) on delete set null,
last_message_at timestamptz,
metadata jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists chat_participants (
id uuid primary key default gen_random_uuid(),
conversation_id uuid not null references chat_conversations(id) on delete cascade,
user_id uuid not null references public.ak_users(id) on delete cascade,
role text not null default 'member' check (role in ('owner','admin','member')),
joined_at timestamptz not null default now(),
last_read_at timestamptz,
is_muted boolean not null default false,
settings jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
unique(conversation_id, user_id)
);
create table if not exists chat_messages (
id uuid primary key default gen_random_uuid(),
conversation_id uuid not null references chat_conversations(id) on delete cascade,
sender_id uuid not null references public.ak_users(id) on delete cascade,
content text not null,
content_type text not null default 'text' check (content_type in ('text','markdown','json','image','file','audio')),
reply_to uuid references chat_messages(id) on delete set null,
metadata jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_chat_messages_conv_time on chat_messages(conversation_id, created_at desc);
create index if not exists idx_chat_participants_conv on chat_participants(conversation_id);
create index if not exists idx_chat_participants_user on chat_participants(user_id);
create index if not exists idx_chat_notifications_user_time on chat_notifications(user_id, created_at desc);
create table if not exists chat_notifications (
id uuid primary key default gen_random_uuid(),
user_id uuid not null references public.ak_users(id) on delete cascade,
conversation_id uuid references chat_conversations(id) on delete cascade,
message_id uuid references chat_messages(id) on delete cascade,
type text not null default 'message',
is_read boolean not null default false,
created_at timestamptz not null default now()
);
-- updated_at triggers
create or replace function public.set_updated_at()
returns trigger language plpgsql as $$
begin
new.updated_at = now();
return new;
end $$;
drop trigger if exists trg_chat_conversations_updated on chat_conversations;
create trigger trg_chat_conversations_updated before update on chat_conversations for each row execute function public.set_updated_at();
drop trigger if exists trg_chat_participants_updated on chat_participants;
create trigger trg_chat_participants_updated before update on chat_participants for each row execute function public.set_updated_at();
drop trigger if exists trg_chat_messages_updated on chat_messages;
create trigger trg_chat_messages_updated before update on chat_messages for each row execute function public.set_updated_at();
-- On new message: update conversation last_message_at and create notifications for participants except sender
create or replace function public.chat_on_message_insert()
returns trigger language plpgsql as $$
begin
-- update last_message_at
update chat_conversations set last_message_at = new.created_at where id = new.conversation_id;
-- insert notifications for all participants except sender
insert into chat_notifications(user_id, conversation_id, message_id, type, is_read, created_at)
select p.user_id, new.conversation_id, new.id, 'message', false, now()
from chat_participants p
where p.conversation_id = new.conversation_id
and p.user_id <> new.sender_id;
return new;
end $$;
drop trigger if exists trg_chat_messages_after_insert on chat_messages;
create trigger trg_chat_messages_after_insert
after insert on chat_messages
for each row execute function public.chat_on_message_insert();
-- RLS (simplified)
alter table chat_conversations enable row level security;
alter table chat_participants enable row level security;
alter table chat_messages enable row level security;
alter table chat_notifications enable row level security;
-- Helper to check whether a user is owner/admin of a conversation without triggering RLS recursion
create or replace function public.chat_user_is_manager(p_conversation_id uuid, p_user_id uuid)
returns boolean
language sql
security definer
set search_path = public
as $$
select exists (
select 1
from chat_participants
where conversation_id = p_conversation_id
and user_id = p_user_id
and role in ('owner', 'admin')
);
$$;
revoke all on function public.chat_user_is_manager(uuid, uuid) from public;
grant execute on function public.chat_user_is_manager(uuid, uuid) to authenticated;
-- Policies: users can access conversations they participate in
create policy chat_conversations_select on chat_conversations for select to authenticated using (
exists (select 1 from chat_participants p where p.conversation_id = chat_conversations.id and p.user_id = auth.uid())
);
create policy chat_conversations_insert on chat_conversations for insert to authenticated with check (true);
create policy chat_conversations_update on chat_conversations for update to authenticated using (
public.chat_user_is_manager(chat_conversations.id, auth.uid())
) with check (
public.chat_user_is_manager(chat_conversations.id, auth.uid())
);
-- Split policies for clarity: SELECT/UPDATE/DELETE cover members and admins; INSERT allows admins to add others
drop policy if exists chat_participants_all on chat_participants;
create policy chat_participants_select on chat_participants for select to authenticated using (
user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid())
);
create policy chat_participants_update on chat_participants for update to authenticated using (
user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid())
) with check (
user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid())
);
create policy chat_participants_delete on chat_participants for delete to authenticated using (
user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid())
);
create policy chat_participants_insert on chat_participants for insert to authenticated with check (
-- allow user to add self OR owners/admins to add others
user_id = auth.uid() or public.chat_user_is_manager(chat_participants.conversation_id, auth.uid())
);
create policy chat_messages_select on chat_messages for select to authenticated using (
exists (select 1 from chat_participants p where p.conversation_id = chat_messages.conversation_id and p.user_id = auth.uid())
);
create policy chat_messages_insert on chat_messages for insert to authenticated with check (
exists (select 1 from chat_participants p where p.conversation_id = chat_messages.conversation_id and p.user_id = auth.uid())
);
create policy chat_notifications_select on chat_notifications for select to authenticated using (user_id = auth.uid());
create policy chat_notifications_update on chat_notifications for update to authenticated using (user_id = auth.uid()) with check (user_id = auth.uid());
-- ---------------------------------------------------------------------------
-- Chat MQTT downlinks: queue for device/edge commands tied to conversations
-- ---------------------------------------------------------------------------
create table if not exists chat_mqtt_downlinks (
id uuid primary key default gen_random_uuid(),
conversation_id uuid not null references chat_conversations(id) on delete cascade,
-- optional: specific participant user as target (用于映射设备/用户)
target_user_id uuid references public.ak_users(id) on delete set null,
-- 可选:显式下发表的 topic未提供时由网关按约定推导如 device/{user_id}/down
topic text,
-- 载荷与编码utf8/base64/json
payload text not null,
payload_encoding text not null default 'utf8' check (payload_encoding in ('utf8','base64','json')),
qos smallint not null default 1 check (qos in (0,1,2)),
retain boolean not null default false,
-- 生命周期
status text not null default 'pending' check (status in ('pending','sent','acked','failed','expired','canceled')),
scheduled_at timestamptz not null default now(),
expires_at timestamptz,
sent_at timestamptz,
ack_at timestamptz,
retry_count integer not null default 0,
last_error text,
metadata jsonb,
-- 关联/审计
correlation_id uuid,
created_by uuid not null references public.ak_users(id) on delete cascade,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_chat_mqtt_downlinks_conv_status_time on chat_mqtt_downlinks(conversation_id, status, scheduled_at asc);
create index if not exists idx_chat_mqtt_downlinks_creator_time on chat_mqtt_downlinks(created_by, created_at desc);
create index if not exists idx_chat_mqtt_downlinks_topic on chat_mqtt_downlinks(topic);
drop trigger if exists trg_chat_mqtt_downlinks_updated on chat_mqtt_downlinks;
create trigger trg_chat_mqtt_downlinks_updated before update on chat_mqtt_downlinks for each row execute function public.set_updated_at();
alter table chat_mqtt_downlinks enable row level security;
-- 下发表的访问控制:
-- 1) 参与该会话的用户可读取对应 conversation 的下发记录;
-- 2) 插入:必须是会话参与者,且 created_by = auth.uid()
-- 3) 更新/删除:消息创建者,或会话 owner/admin 可操作(用于重试、取消、标记 ACK 等)
create policy chat_mqtt_downlinks_select on chat_mqtt_downlinks for select to authenticated using (
exists (
select 1 from chat_participants p
where p.conversation_id = chat_mqtt_downlinks.conversation_id
and p.user_id = auth.uid()
)
);
create policy chat_mqtt_downlinks_insert on chat_mqtt_downlinks for insert to authenticated with check (
created_by = auth.uid() and exists (
select 1 from chat_participants p
where p.conversation_id = chat_mqtt_downlinks.conversation_id
and p.user_id = auth.uid()
)
);
create policy chat_mqtt_downlinks_update on chat_mqtt_downlinks for update to authenticated using (
created_by = auth.uid() or exists (
select 1 from chat_participants p
where p.conversation_id = chat_mqtt_downlinks.conversation_id
and p.user_id = auth.uid()
and p.role in ('owner','admin')
)
) with check (
created_by = auth.uid() or exists (
select 1 from chat_participants p
where p.conversation_id = chat_mqtt_downlinks.conversation_id
and p.user_id = auth.uid()
and p.role in ('owner','admin')
)
);
create policy chat_mqtt_downlinks_delete on chat_mqtt_downlinks for delete to authenticated using (
created_by = auth.uid() or exists (
select 1 from chat_participants p
where p.conversation_id = chat_mqtt_downlinks.conversation_id
and p.user_id = auth.uid()
and p.role in ('owner','admin')
)
);
-- 可选:待发送视图,供后台/报表快速查询
create or replace view chat_mqtt_downlinks_pending as
select * from chat_mqtt_downlinks where status = 'pending' and scheduled_at <= now();
-- ---------------------------------------------------------------------------
-- Gateway reporting (chat_gateway_nodes + chat_gateway_heartbeats)
-- ---------------------------------------------------------------------------
-- Nodes registry: one row per running gateway clientId
create table if not exists chat_gateway_nodes (
id uuid primary key default gen_random_uuid(),
name text not null,
mqtt_client_id text not null unique,
version text,
region text,
tags jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
drop trigger if exists trg_chat_gateway_nodes_updated on chat_gateway_nodes;
create trigger trg_chat_gateway_nodes_updated
before update on chat_gateway_nodes
for each row execute function public.set_updated_at();
alter table chat_gateway_nodes enable row level security;
-- Read-all for authenticated (dashboard/ops)
create policy chat_gateway_nodes_select on chat_gateway_nodes
for select to authenticated using (true);
-- Heartbeats: periodic metrics from gateway
create table if not exists chat_gateway_heartbeats (
id uuid primary key default gen_random_uuid(),
gateway_id uuid not null references chat_gateway_nodes(id) on delete cascade,
created_at timestamptz not null default now(),
uptime_sec integer,
mem_rss_mb integer,
heap_used_mb integer,
mqtt_connected boolean,
kafka_connected boolean,
redis_connected boolean,
msgs_in integer,
msgs_out integer,
msgs_dropped integer,
errors integer,
acl_denied integer,
kafka_produced integer,
extra jsonb
);
create index if not exists idx_chat_gateway_heartbeats_gateway_time
on chat_gateway_heartbeats(gateway_id, created_at desc);
alter table chat_gateway_heartbeats enable row level security;
-- Read-all for authenticated (dashboard/ops)
create policy chat_gateway_heartbeats_select on chat_gateway_heartbeats
for select to authenticated using (true);
-- Latest status per gateway (view)
create or replace view chat_gateway_status_latest as
with last as (
select distinct on (gateway_id)
gateway_id, id as heartbeat_id, created_at, uptime_sec, mem_rss_mb,
heap_used_mb, mqtt_connected, kafka_connected, redis_connected,
msgs_in, msgs_out, msgs_dropped, errors, acl_denied, kafka_produced, extra
from chat_gateway_heartbeats
order by gateway_id, created_at desc
)
select l.*, n.name, n.mqtt_client_id, n.version, n.region, n.tags, n.updated_at as node_updated_at
from last l
join chat_gateway_nodes n on n.id = l.gateway_id;