Files
akmon/create_gateway_reporting.sql
2026-01-20 08:04:15 +08:00

152 lines
4.9 KiB
PL/PgSQL

-- Gateway reporting tables for MQTT gateway nodes and periodic heartbeats
-- Requires Supabase/Postgres environment. Assumes helper is_admin(uid uuid) exists.
-- UUID generation
create extension if not exists pgcrypto;
-- 1) Gateway registry
create table if not exists public.gateway_nodes (
id uuid primary key default gen_random_uuid(),
name text,
mqtt_client_id text unique not null,
version text,
region text,
owner_user_id uuid references auth.users(id),
tags jsonb,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_gateway_nodes_owner on public.gateway_nodes(owner_user_id);
create or replace function public.set_updated_at()
returns trigger language plpgsql as $$
begin
new.updated_at = now();
return new;
end;$$;
drop trigger if exists trg_gateway_nodes_updated_at on public.gateway_nodes;
create trigger trg_gateway_nodes_updated_at
before update on public.gateway_nodes
for each row execute function public.set_updated_at();
alter table public.gateway_nodes enable row level security;
-- RLS: owners and admins can read; only admins can write via client. Service role bypasses RLS.
do $$
begin
if not exists (
select 1 from pg_policies where schemaname='public' and tablename='gateway_nodes' and policyname='gateway_nodes_select') then
create policy gateway_nodes_select on public.gateway_nodes
for select using (
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
);
end if;
if not exists (
select 1 from pg_policies where schemaname='public' and tablename='gateway_nodes' and policyname='gateway_nodes_update_owner') then
create policy gateway_nodes_update_owner on public.gateway_nodes
for update using (
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
) with check (
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
);
end if;
if not exists (
select 1 from pg_policies where schemaname='public' and tablename='gateway_nodes' and policyname='gateway_nodes_insert_owner') then
create policy gateway_nodes_insert_owner on public.gateway_nodes
for insert with check (
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
);
end if;
end$$;
-- 2) Heartbeats (periodic runtime metrics). Use service role to insert.
create table if not exists public.gateway_heartbeats (
id uuid primary key default gen_random_uuid(),
gateway_id uuid not null references public.gateway_nodes(id) on delete cascade,
ts timestamptz not null default now(),
uptime_sec integer,
mem_rss_mb integer,
heap_used_mb integer,
mqtt_connected boolean,
kafka_connected boolean,
redis_connected boolean,
msgs_in integer default 0,
msgs_out integer default 0,
msgs_dropped integer default 0,
errors integer default 0,
acl_denied integer default 0,
kafka_produced integer default 0,
extra jsonb
);
create index if not exists idx_gateway_heartbeats_gid_ts on public.gateway_heartbeats(gateway_id, ts desc);
alter table public.gateway_heartbeats enable row level security;
do $$
begin
if not exists (
select 1 from pg_policies where schemaname='public' and tablename='gateway_heartbeats' and policyname='gateway_heartbeats_select') then
create policy gateway_heartbeats_select on public.gateway_heartbeats
for select using (
exists (
select 1 from public.gateway_nodes g
where g.id = gateway_id
and (g.owner_user_id = auth.uid() or coalesce(public.is_admin(auth.uid()), false))
)
);
end if;
end$$;
-- 3) Latest status view per gateway
create or replace view public.gateway_status_latest as
select distinct on (g.id)
g.id as gateway_id,
g.name,
g.mqtt_client_id,
g.version,
g.region,
h.ts as last_ts,
h.uptime_sec,
h.mem_rss_mb,
h.heap_used_mb,
h.mqtt_connected,
h.kafka_connected,
h.redis_connected,
h.msgs_in,
h.msgs_out,
h.msgs_dropped,
h.errors,
h.acl_denied,
h.kafka_produced
from public.gateway_nodes g
left join public.gateway_heartbeats h
on h.gateway_id = g.id
order by g.id, h.ts desc;
-- 4) Daily rollup (materialized view). Refresh nightly via scheduler.
create materialized view if not exists public.gateway_daily_stats as
select
date_trunc('day', h.ts)::date as day,
h.gateway_id,
sum(h.msgs_in) as msgs_in,
sum(h.msgs_out) as msgs_out,
sum(h.msgs_dropped) as msgs_dropped,
sum(h.errors) as errors,
sum(h.acl_denied) as acl_denied,
sum(h.kafka_produced) as kafka_produced,
avg(h.mem_rss_mb) as avg_mem_rss_mb,
avg(h.heap_used_mb) as avg_heap_used_mb
from public.gateway_heartbeats h
group by 1, 2
with no data;
create index if not exists idx_gateway_daily_stats on public.gateway_daily_stats(day, gateway_id);
-- Note: schedule periodic
-- select cron.schedule('refresh_gateway_daily_stats', '0 3 * * *', $$
-- refresh materialized view concurrently public.gateway_daily_stats;
-- $$);