152 lines
4.9 KiB
PL/PgSQL
152 lines
4.9 KiB
PL/PgSQL
-- Gateway reporting tables for MQTT gateway nodes and periodic heartbeats
|
|
-- Requires Supabase/Postgres environment. Assumes helper is_admin(uid uuid) exists.
|
|
|
|
-- UUID generation
|
|
create extension if not exists pgcrypto;
|
|
|
|
-- 1) Gateway registry
|
|
create table if not exists public.gateway_nodes (
|
|
id uuid primary key default gen_random_uuid(),
|
|
name text,
|
|
mqtt_client_id text unique not null,
|
|
version text,
|
|
region text,
|
|
owner_user_id uuid references auth.users(id),
|
|
tags jsonb,
|
|
created_at timestamptz not null default now(),
|
|
updated_at timestamptz not null default now()
|
|
);
|
|
|
|
create index if not exists idx_gateway_nodes_owner on public.gateway_nodes(owner_user_id);
|
|
|
|
create or replace function public.set_updated_at()
|
|
returns trigger language plpgsql as $$
|
|
begin
|
|
new.updated_at = now();
|
|
return new;
|
|
end;$$;
|
|
|
|
drop trigger if exists trg_gateway_nodes_updated_at on public.gateway_nodes;
|
|
create trigger trg_gateway_nodes_updated_at
|
|
before update on public.gateway_nodes
|
|
for each row execute function public.set_updated_at();
|
|
|
|
alter table public.gateway_nodes enable row level security;
|
|
|
|
-- RLS: owners and admins can read; only admins can write via client. Service role bypasses RLS.
|
|
do $$
|
|
begin
|
|
if not exists (
|
|
select 1 from pg_policies where schemaname='public' and tablename='gateway_nodes' and policyname='gateway_nodes_select') then
|
|
create policy gateway_nodes_select on public.gateway_nodes
|
|
for select using (
|
|
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
|
|
);
|
|
end if;
|
|
if not exists (
|
|
select 1 from pg_policies where schemaname='public' and tablename='gateway_nodes' and policyname='gateway_nodes_update_owner') then
|
|
create policy gateway_nodes_update_owner on public.gateway_nodes
|
|
for update using (
|
|
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
|
|
) with check (
|
|
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
|
|
);
|
|
end if;
|
|
if not exists (
|
|
select 1 from pg_policies where schemaname='public' and tablename='gateway_nodes' and policyname='gateway_nodes_insert_owner') then
|
|
create policy gateway_nodes_insert_owner on public.gateway_nodes
|
|
for insert with check (
|
|
auth.uid() = owner_user_id or coalesce(public.is_admin(auth.uid()), false)
|
|
);
|
|
end if;
|
|
end$$;
|
|
|
|
-- 2) Heartbeats (periodic runtime metrics). Use service role to insert.
|
|
create table if not exists public.gateway_heartbeats (
|
|
id uuid primary key default gen_random_uuid(),
|
|
gateway_id uuid not null references public.gateway_nodes(id) on delete cascade,
|
|
ts timestamptz not null default now(),
|
|
uptime_sec integer,
|
|
mem_rss_mb integer,
|
|
heap_used_mb integer,
|
|
mqtt_connected boolean,
|
|
kafka_connected boolean,
|
|
redis_connected boolean,
|
|
msgs_in integer default 0,
|
|
msgs_out integer default 0,
|
|
msgs_dropped integer default 0,
|
|
errors integer default 0,
|
|
acl_denied integer default 0,
|
|
kafka_produced integer default 0,
|
|
extra jsonb
|
|
);
|
|
|
|
create index if not exists idx_gateway_heartbeats_gid_ts on public.gateway_heartbeats(gateway_id, ts desc);
|
|
|
|
alter table public.gateway_heartbeats enable row level security;
|
|
|
|
do $$
|
|
begin
|
|
if not exists (
|
|
select 1 from pg_policies where schemaname='public' and tablename='gateway_heartbeats' and policyname='gateway_heartbeats_select') then
|
|
create policy gateway_heartbeats_select on public.gateway_heartbeats
|
|
for select using (
|
|
exists (
|
|
select 1 from public.gateway_nodes g
|
|
where g.id = gateway_id
|
|
and (g.owner_user_id = auth.uid() or coalesce(public.is_admin(auth.uid()), false))
|
|
)
|
|
);
|
|
end if;
|
|
end$$;
|
|
|
|
-- 3) Latest status view per gateway
|
|
create or replace view public.gateway_status_latest as
|
|
select distinct on (g.id)
|
|
g.id as gateway_id,
|
|
g.name,
|
|
g.mqtt_client_id,
|
|
g.version,
|
|
g.region,
|
|
h.ts as last_ts,
|
|
h.uptime_sec,
|
|
h.mem_rss_mb,
|
|
h.heap_used_mb,
|
|
h.mqtt_connected,
|
|
h.kafka_connected,
|
|
h.redis_connected,
|
|
h.msgs_in,
|
|
h.msgs_out,
|
|
h.msgs_dropped,
|
|
h.errors,
|
|
h.acl_denied,
|
|
h.kafka_produced
|
|
from public.gateway_nodes g
|
|
left join public.gateway_heartbeats h
|
|
on h.gateway_id = g.id
|
|
order by g.id, h.ts desc;
|
|
|
|
-- 4) Daily rollup (materialized view). Refresh nightly via scheduler.
|
|
create materialized view if not exists public.gateway_daily_stats as
|
|
select
|
|
date_trunc('day', h.ts)::date as day,
|
|
h.gateway_id,
|
|
sum(h.msgs_in) as msgs_in,
|
|
sum(h.msgs_out) as msgs_out,
|
|
sum(h.msgs_dropped) as msgs_dropped,
|
|
sum(h.errors) as errors,
|
|
sum(h.acl_denied) as acl_denied,
|
|
sum(h.kafka_produced) as kafka_produced,
|
|
avg(h.mem_rss_mb) as avg_mem_rss_mb,
|
|
avg(h.heap_used_mb) as avg_heap_used_mb
|
|
from public.gateway_heartbeats h
|
|
group by 1, 2
|
|
with no data;
|
|
|
|
create index if not exists idx_gateway_daily_stats on public.gateway_daily_stats(day, gateway_id);
|
|
|
|
-- Note: schedule periodic
|
|
-- select cron.schedule('refresh_gateway_daily_stats', '0 3 * * *', $$
|
|
-- refresh materialized view concurrently public.gateway_daily_stats;
|
|
-- $$);
|