erp-suite/apps/shared-libs/core/database/policies/rls-policies.sql

515 lines
17 KiB
PL/PgSQL

-- ============================================================================
-- SHARED RLS POLICIES - ERP-Suite Core Library
-- ============================================================================
-- Purpose: Centralized Row-Level Security policies for multi-tenant isolation
-- Location: apps/shared-libs/core/database/policies/rls-policies.sql
-- Usage: Applied dynamically via apply-rls.ts functions
-- ============================================================================
-- ============================================================================
-- HELPER FUNCTIONS FOR RLS
-- ============================================================================
-- Function: Get current tenant ID from session context
CREATE OR REPLACE FUNCTION get_current_tenant_id()
RETURNS UUID AS $$
BEGIN
RETURN NULLIF(current_setting('app.current_tenant_id', true), '')::UUID;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END;
$$ LANGUAGE plpgsql STABLE SECURITY DEFINER;
COMMENT ON FUNCTION get_current_tenant_id() IS
'Retrieves the tenant_id from the current session context for RLS policies.
Returns NULL if not set. Used by all tenant isolation policies.';
-- Function: Get current user ID from session context
CREATE OR REPLACE FUNCTION get_current_user_id()
RETURNS UUID AS $$
BEGIN
RETURN NULLIF(current_setting('app.current_user_id', true), '')::UUID;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END;
$$ LANGUAGE plpgsql STABLE SECURITY DEFINER;
COMMENT ON FUNCTION get_current_user_id() IS
'Retrieves the user_id from the current session context.
Used for user-specific RLS policies (read/write own data).';
-- Function: Get current user role from session context
CREATE OR REPLACE FUNCTION get_current_user_role()
RETURNS TEXT AS $$
BEGIN
RETURN current_setting('app.current_user_role', true);
EXCEPTION
WHEN OTHERS THEN
RETURN 'guest';
END;
$$ LANGUAGE plpgsql STABLE SECURITY DEFINER;
COMMENT ON FUNCTION get_current_user_role() IS
'Retrieves the user role from the current session context.
Used for role-based access control in RLS policies. Defaults to "guest".';
-- Function: Check if current user is admin
CREATE OR REPLACE FUNCTION is_current_user_admin()
RETURNS BOOLEAN AS $$
BEGIN
RETURN get_current_user_role() IN ('admin', 'super_admin', 'system_admin');
END;
$$ LANGUAGE plpgsql STABLE SECURITY DEFINER;
COMMENT ON FUNCTION is_current_user_admin() IS
'Returns TRUE if the current user has an admin role.
Used for admin bypass policies.';
-- ============================================================================
-- GENERIC RLS POLICY TEMPLATES
-- ============================================================================
-- POLICY 1: TENANT_ISOLATION_POLICY
-- Purpose: Ensures users can only access data from their own tenant
-- Usage: Apply to all tables with tenant_id column
-- ============================================================================
/*
TEMPLATE FOR TENANT_ISOLATION_POLICY:
ALTER TABLE {schema}.{table} ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation_{table}
ON {schema}.{table}
FOR ALL
TO authenticated
USING (tenant_id = get_current_tenant_id())
WITH CHECK (tenant_id = get_current_tenant_id());
COMMENT ON POLICY tenant_isolation_{table} ON {schema}.{table} IS
'Multi-tenant isolation: Users can only access records from their own tenant.
Applied to all operations (SELECT, INSERT, UPDATE, DELETE).';
*/
-- ============================================================================
-- POLICY 2: USER_DATA_POLICY
-- Purpose: Restricts access to data created by or assigned to the current user
-- Usage: Apply to tables with created_by or assigned_to columns
-- ============================================================================
/*
TEMPLATE FOR USER_DATA_POLICY:
ALTER TABLE {schema}.{table} ENABLE ROW LEVEL SECURITY;
CREATE POLICY user_data_{table}
ON {schema}.{table}
FOR ALL
TO authenticated
USING (
tenant_id = get_current_tenant_id()
AND (
created_by = get_current_user_id()
OR assigned_to = get_current_user_id()
OR owner_id = get_current_user_id()
)
)
WITH CHECK (
tenant_id = get_current_tenant_id()
AND (
created_by = get_current_user_id()
OR assigned_to = get_current_user_id()
OR owner_id = get_current_user_id()
)
);
COMMENT ON POLICY user_data_{table} ON {schema}.{table} IS
'User-level isolation: Users can only access their own records.
Checks: created_by, assigned_to, or owner_id matches current user.';
*/
-- ============================================================================
-- POLICY 3: READ_OWN_DATA_POLICY
-- Purpose: Allows users to read only their own data (more permissive for SELECT)
-- Usage: Apply when users need read access to own data but restricted write
-- ============================================================================
/*
TEMPLATE FOR READ_OWN_DATA_POLICY:
ALTER TABLE {schema}.{table} ENABLE ROW LEVEL SECURITY;
CREATE POLICY read_own_data_{table}
ON {schema}.{table}
FOR SELECT
TO authenticated
USING (
tenant_id = get_current_tenant_id()
AND (
created_by = get_current_user_id()
OR assigned_to = get_current_user_id()
OR owner_id = get_current_user_id()
)
);
COMMENT ON POLICY read_own_data_{table} ON {schema}.{table} IS
'Read access: Users can view records they created, are assigned to, or own.
SELECT only - write operations controlled by separate policies.';
*/
-- ============================================================================
-- POLICY 4: WRITE_OWN_DATA_POLICY
-- Purpose: Allows users to insert/update/delete only their own data
-- Usage: Companion to READ_OWN_DATA_POLICY for write operations
-- ============================================================================
/*
TEMPLATE FOR WRITE_OWN_DATA_POLICY:
ALTER TABLE {schema}.{table} ENABLE ROW LEVEL SECURITY;
-- INSERT policy
CREATE POLICY write_own_data_insert_{table}
ON {schema}.{table}
FOR INSERT
TO authenticated
WITH CHECK (
tenant_id = get_current_tenant_id()
AND created_by = get_current_user_id()
);
-- UPDATE policy
CREATE POLICY write_own_data_update_{table}
ON {schema}.{table}
FOR UPDATE
TO authenticated
USING (
tenant_id = get_current_tenant_id()
AND (
created_by = get_current_user_id()
OR owner_id = get_current_user_id()
)
)
WITH CHECK (
tenant_id = get_current_tenant_id()
AND (
created_by = get_current_user_id()
OR owner_id = get_current_user_id()
)
);
-- DELETE policy
CREATE POLICY write_own_data_delete_{table}
ON {schema}.{table}
FOR DELETE
TO authenticated
USING (
tenant_id = get_current_tenant_id()
AND (
created_by = get_current_user_id()
OR owner_id = get_current_user_id()
)
);
COMMENT ON POLICY write_own_data_insert_{table} ON {schema}.{table} IS
'Write access (INSERT): Users can only create records for themselves.';
COMMENT ON POLICY write_own_data_update_{table} ON {schema}.{table} IS
'Write access (UPDATE): Users can only update their own records.';
COMMENT ON POLICY write_own_data_delete_{table} ON {schema}.{table} IS
'Write access (DELETE): Users can only delete their own records.';
*/
-- ============================================================================
-- POLICY 5: ADMIN_BYPASS_POLICY
-- Purpose: Allows admin users to bypass RLS restrictions for support/management
-- Usage: Apply as permissive policy to allow admin full access
-- ============================================================================
/*
TEMPLATE FOR ADMIN_BYPASS_POLICY:
ALTER TABLE {schema}.{table} ENABLE ROW LEVEL SECURITY;
CREATE POLICY admin_bypass_{table}
ON {schema}.{table}
FOR ALL
TO authenticated
USING (is_current_user_admin())
WITH CHECK (is_current_user_admin());
COMMENT ON POLICY admin_bypass_{table} ON {schema}.{table} IS
'Admin bypass: Admin users (admin, super_admin, system_admin) have full access.
Use for support, troubleshooting, and system management.
Security: Only assign admin roles to trusted users.';
*/
-- ============================================================================
-- UTILITY FUNCTION: Apply RLS Policies Dynamically
-- ============================================================================
-- Function: Apply tenant isolation policy to a table
CREATE OR REPLACE FUNCTION apply_tenant_isolation_policy(
p_schema TEXT,
p_table TEXT,
p_tenant_column TEXT DEFAULT 'tenant_id'
)
RETURNS VOID AS $$
DECLARE
v_policy_name TEXT;
BEGIN
v_policy_name := 'tenant_isolation_' || p_table;
-- Enable RLS
EXECUTE format('ALTER TABLE %I.%I ENABLE ROW LEVEL SECURITY', p_schema, p_table);
-- Drop policy if exists
EXECUTE format(
'DROP POLICY IF EXISTS %I ON %I.%I',
v_policy_name, p_schema, p_table
);
-- Create policy
EXECUTE format(
'CREATE POLICY %I ON %I.%I FOR ALL TO authenticated USING (%I = get_current_tenant_id()) WITH CHECK (%I = get_current_tenant_id())',
v_policy_name, p_schema, p_table, p_tenant_column, p_tenant_column
);
-- Add comment
EXECUTE format(
'COMMENT ON POLICY %I ON %I.%I IS %L',
v_policy_name, p_schema, p_table,
'Multi-tenant isolation: Users can only access records from their own tenant.'
);
RAISE NOTICE 'Applied tenant_isolation_policy to %.%', p_schema, p_table;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION apply_tenant_isolation_policy IS
'Applies tenant isolation RLS policy to a table.
Parameters:
- p_schema: Schema name
- p_table: Table name
- p_tenant_column: Column name for tenant isolation (default: tenant_id)';
-- Function: Apply admin bypass policy to a table
CREATE OR REPLACE FUNCTION apply_admin_bypass_policy(
p_schema TEXT,
p_table TEXT
)
RETURNS VOID AS $$
DECLARE
v_policy_name TEXT;
BEGIN
v_policy_name := 'admin_bypass_' || p_table;
-- Enable RLS (if not already enabled)
EXECUTE format('ALTER TABLE %I.%I ENABLE ROW LEVEL SECURITY', p_schema, p_table);
-- Drop policy if exists
EXECUTE format(
'DROP POLICY IF EXISTS %I ON %I.%I',
v_policy_name, p_schema, p_table
);
-- Create policy
EXECUTE format(
'CREATE POLICY %I ON %I.%I FOR ALL TO authenticated USING (is_current_user_admin()) WITH CHECK (is_current_user_admin())',
v_policy_name, p_schema, p_table
);
-- Add comment
EXECUTE format(
'COMMENT ON POLICY %I ON %I.%I IS %L',
v_policy_name, p_schema, p_table,
'Admin bypass: Admin users have full access for support and management.'
);
RAISE NOTICE 'Applied admin_bypass_policy to %.%', p_schema, p_table;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION apply_admin_bypass_policy IS
'Applies admin bypass RLS policy to a table.
Admins can access all records regardless of tenant or ownership.
Parameters:
- p_schema: Schema name
- p_table: Table name';
-- Function: Apply user data policy to a table
CREATE OR REPLACE FUNCTION apply_user_data_policy(
p_schema TEXT,
p_table TEXT,
p_user_columns TEXT[] DEFAULT ARRAY['created_by', 'assigned_to', 'owner_id']::TEXT[]
)
RETURNS VOID AS $$
DECLARE
v_policy_name TEXT;
v_using_clause TEXT;
v_column TEXT;
v_conditions TEXT[] := ARRAY[]::TEXT[];
BEGIN
v_policy_name := 'user_data_' || p_table;
-- Build USING clause with provided user columns
FOREACH v_column IN ARRAY p_user_columns
LOOP
v_conditions := array_append(v_conditions, format('%I = get_current_user_id()', v_column));
END LOOP;
v_using_clause := 'tenant_id = get_current_tenant_id() AND (' || array_to_string(v_conditions, ' OR ') || ')';
-- Enable RLS
EXECUTE format('ALTER TABLE %I.%I ENABLE ROW LEVEL SECURITY', p_schema, p_table);
-- Drop policy if exists
EXECUTE format(
'DROP POLICY IF EXISTS %I ON %I.%I',
v_policy_name, p_schema, p_table
);
-- Create policy
EXECUTE format(
'CREATE POLICY %I ON %I.%I FOR ALL TO authenticated USING (%s) WITH CHECK (%s)',
v_policy_name, p_schema, p_table, v_using_clause, v_using_clause
);
-- Add comment
EXECUTE format(
'COMMENT ON POLICY %I ON %I.%I IS %L',
v_policy_name, p_schema, p_table,
'User-level isolation: Users can only access their own records based on: ' || array_to_string(p_user_columns, ', ')
);
RAISE NOTICE 'Applied user_data_policy to %.% using columns: %', p_schema, p_table, array_to_string(p_user_columns, ', ');
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION apply_user_data_policy IS
'Applies user data RLS policy to a table.
Users can only access records they created, are assigned to, or own.
Parameters:
- p_schema: Schema name
- p_table: Table name
- p_user_columns: Array of column names to check (default: created_by, assigned_to, owner_id)';
-- Function: Apply complete RLS policies to a table (tenant + admin)
CREATE OR REPLACE FUNCTION apply_complete_rls_policies(
p_schema TEXT,
p_table TEXT,
p_tenant_column TEXT DEFAULT 'tenant_id',
p_include_admin_bypass BOOLEAN DEFAULT TRUE
)
RETURNS VOID AS $$
BEGIN
-- Apply tenant isolation
PERFORM apply_tenant_isolation_policy(p_schema, p_table, p_tenant_column);
-- Apply admin bypass if requested
IF p_include_admin_bypass THEN
PERFORM apply_admin_bypass_policy(p_schema, p_table);
END IF;
RAISE NOTICE 'Applied complete RLS policies to %.%', p_schema, p_table;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION apply_complete_rls_policies IS
'Applies a complete set of RLS policies (tenant isolation + optional admin bypass).
Parameters:
- p_schema: Schema name
- p_table: Table name
- p_tenant_column: Column name for tenant isolation (default: tenant_id)
- p_include_admin_bypass: Whether to include admin bypass policy (default: TRUE)';
-- ============================================================================
-- EXAMPLE USAGE
-- ============================================================================
/*
-- Example 1: Apply tenant isolation to a single table
SELECT apply_tenant_isolation_policy('core', 'partners');
-- Example 2: Apply complete policies (tenant + admin) to a table
SELECT apply_complete_rls_policies('inventory', 'products');
-- Example 3: Apply user data policy
SELECT apply_user_data_policy('projects', 'tasks', ARRAY['created_by', 'assigned_to']::TEXT[]);
-- Example 4: Apply admin bypass only
SELECT apply_admin_bypass_policy('financial', 'invoices');
-- Example 5: Apply to multiple tables at once
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'core'
AND table_name IN ('partners', 'addresses', 'notes', 'attachments')
LOOP
PERFORM apply_complete_rls_policies('core', r.table_name);
END LOOP;
END $$;
*/
-- ============================================================================
-- MIGRATION HELPERS
-- ============================================================================
-- Function: Check if RLS is enabled on a table
CREATE OR REPLACE FUNCTION is_rls_enabled(p_schema TEXT, p_table TEXT)
RETURNS BOOLEAN AS $$
DECLARE
v_enabled BOOLEAN;
BEGIN
SELECT relrowsecurity INTO v_enabled
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = p_schema AND c.relname = p_table;
RETURN COALESCE(v_enabled, FALSE);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION is_rls_enabled IS
'Check if RLS is enabled on a specific table.
Returns TRUE if enabled, FALSE otherwise.';
-- Function: List all RLS policies on a table
CREATE OR REPLACE FUNCTION list_rls_policies(p_schema TEXT, p_table TEXT)
RETURNS TABLE(policy_name NAME, policy_cmd TEXT, policy_using TEXT, policy_check TEXT) AS $$
BEGIN
RETURN QUERY
SELECT
pol.polname::NAME as policy_name,
CASE pol.polcmd
WHEN 'r' THEN 'SELECT'
WHEN 'a' THEN 'INSERT'
WHEN 'w' THEN 'UPDATE'
WHEN 'd' THEN 'DELETE'
WHEN '*' THEN 'ALL'
END as policy_cmd,
pg_get_expr(pol.polqual, pol.polrelid) as policy_using,
pg_get_expr(pol.polwithcheck, pol.polrelid) as policy_check
FROM pg_policy pol
JOIN pg_class c ON c.oid = pol.polrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = p_schema AND c.relname = p_table;
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION list_rls_policies IS
'List all RLS policies configured on a specific table.
Returns: policy_name, policy_cmd, policy_using, policy_check';
-- ============================================================================
-- END OF RLS POLICIES
-- ============================================================================