Webhooks & Event Triggers

v0.4.8

Updated for nself v0.4.8

React to database changes and external events with Hasura Event Triggers and webhook integrations for real-time data processing and external service notifications.

Overview

Webhooks in nself enable real-time reactions to database changes through Hasura Event Triggers, scheduled events, and external webhook integrations. Create webhook handlers using custom service templates like express-ts, nest-ts, or fastapi for automatic processing, notifications, and data synchronization.

Event Triggers

Database Event Triggers

Automatically trigger webhooks when database rows are inserted, updated, or deleted:

# Event triggers fire on:
# - INSERT operations (new records)
# - UPDATE operations (modified records)  
# - DELETE operations (removed records)
# - Manual events (triggered via GraphQL)

# Common use cases:
# - Send welcome email when user registers
# - Update search index when content changes
# - Sync data with external systems
# - Generate notifications
# - Audit logging
# - Cache invalidation

Setting Up Event Triggers

Configure event triggers in Hasura Console:

  1. Go to "Events" tab in Hasura Console
  2. Click "Create Event Trigger"
  3. Select table and operations (INSERT/UPDATE/DELETE)
  4. Configure webhook URL
  5. Set retry logic and headers

Webhook Implementation

Creating a Webhook Handler Service

Create a webhook handler using custom service templates:

# In .env file - create a webhook service
# Available templates: express-ts, nest-ts, fastapi, etc.
CS_1=webhooks:nest-ts:3001:/webhooks

# Rebuild to generate
nself build && nself start

NestJS Webhook Handler

// services/webhooks/src/webhooks/user.controller.ts
import { Controller, Post, Body, Headers, Logger } from '@nestjs/common';
import { EmailService } from '../services/email.service';
import { AnalyticsService } from '../services/analytics.service';

interface EventTriggerPayload {
  event: {
    session_variables: Record<string, any>;
    op: 'INSERT' | 'UPDATE' | 'DELETE';
    data: {
      old: any;
      new: any;
    };
  };
  created_at: string;
  id: string;
  delivery_info: {
    max_retries: number;
    current_retry: number;
  };
  trigger: {
    name: string;
  };
  table: {
    schema: string;
    name: string;
  };
}

@Controller('webhooks')
export class UserWebhookController {
  private readonly logger = new Logger(UserWebhookController.name);

  constructor(
    private emailService: EmailService,
    private analyticsService: AnalyticsService,
  ) {}

  @Post('user-created')
  async handleUserCreated(
    @Body() payload: EventTriggerPayload,
    @Headers('x-hasura-admin-secret') adminSecret: string,
  ) {
    // Verify webhook authenticity
    if (adminSecret !== process.env.HASURA_GRAPHQL_ADMIN_SECRET) {
      this.logger.warn('Invalid admin secret in webhook request');
      return { status: 'unauthorized' };
    }

    try {
      const { event } = payload;
      const newUser = event.data.new;

      this.logger.log(`Processing user created event for: ${newUser.email}`);

      // Send welcome email
      await this.emailService.sendWelcomeEmail({
        email: newUser.email,
        name: newUser.name,
        userId: newUser.id,
      });

      // Track user registration in analytics
      await this.analyticsService.trackEvent({
        event: 'user_registered',
        userId: newUser.id,
        properties: {
          email: newUser.email,
          name: newUser.name,
          registration_method: newUser.registration_method || 'email',
        },
      });

      // Create user profile
      await this.createDefaultUserProfile(newUser);

      this.logger.log(`Successfully processed user created event for: ${newUser.email}`);

      return {
        status: 'success',
        message: 'User created event processed successfully',
      };
    } catch (error) {
      this.logger.error('Error processing user created event:', error);
      
      // Return error to trigger retry
      throw new Error(`Failed to process user created event: ${error.message}`);
    }
  }

  @Post('user-updated')
  async handleUserUpdated(@Body() payload: EventTriggerPayload) {
    const { event } = payload;
    const oldUser = event.data.old;
    const newUser = event.data.new;

    // Check what fields changed
    const changedFields = this.getChangedFields(oldUser, newUser);

    if (changedFields.includes('email')) {
      // Email changed - send verification email
      await this.emailService.sendEmailVerification(newUser.email, newUser.id);
    }

    if (changedFields.includes('name')) {
      // Update external services with new name
      await this.syncUserNameToExternalServices(newUser);
    }

    return { status: 'success' };
  }

  private async createDefaultUserProfile(user: any) {
    // Create default user profile
    // This could be a GraphQL mutation or direct database insert
  }

  private getChangedFields(oldData: any, newData: any): string[] {
    const changed = [];
    for (const key in newData) {
      if (oldData[key] !== newData[key]) {
        changed.push(key);
      }
    }
    return changed;
  }
}

Python Webhook Handler

# services/webhooks/order_processing.py
# Create with: CS_2=order-webhooks:fastapi:3002:/order-webhooks
from flask import Flask, request, jsonify
import os
import logging
from typing import Dict, Any
import requests

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.route('/webhooks/order-completed', methods=['POST'])
def handle_order_completed():
    try:
        # Verify webhook authenticity
        admin_secret = request.headers.get('x-hasura-admin-secret')
        if admin_secret != os.getenv('HASURA_GRAPHQL_ADMIN_SECRET'):
            return jsonify({'status': 'unauthorized'}), 401

        payload = request.get_json()
        event = payload['event']
        new_order = event['data']['new']
        
        logger.info(f"Processing order completed: {new_order['id']}")

        # Process the order
        result = process_order_completion(new_order)
        
        if result['success']:
            return jsonify({
                'status': 'success',
                'message': 'Order processed successfully'
            })
        else:
            # Return error to trigger retry
            return jsonify({
                'status': 'error',
                'message': result['error']
            }), 500

    except Exception as e:
        logger.error(f"Error processing order webhook: {str(e)}")
        return jsonify({
            'status': 'error',
            'message': 'Failed to process order'
        }), 500

def process_order_completion(order: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # Send order confirmation email
        send_order_confirmation_email(order)
        
        # Update inventory
        update_inventory(order['items'])
        
        # Create shipment
        shipment = create_shipment(order)
        
        # Send to fulfillment service
        notify_fulfillment_service(order, shipment)
        
        # Update analytics
        track_order_completion(order)
        
        return {'success': True}
        
    except Exception as e:
        logger.error(f"Order processing failed: {str(e)}")
        return {'success': False, 'error': str(e)}

def send_order_confirmation_email(order: Dict[str, Any]):
    # Send email using your email service
    pass

def update_inventory(items: list):
    # Update product inventory
    pass

def create_shipment(order: Dict[str, Any]):
    # Create shipment record
    return {'id': 'ship_123', 'tracking_number': 'TRK123456'}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=3002)

Scheduled Events

Cron-based Webhooks

Schedule webhooks to run at specific intervals:

# Create scheduled event in Hasura Console:
# Name: daily-report-generation
# Webhook: http://nestjs:3001/webhooks/scheduled/daily-report
# Schedule: 0 9 * * * (9 AM daily)
# Payload: {"report_type": "daily_summary"}

@Post('scheduled/daily-report')
async handleDailyReport(@Body() payload: any) {
  try {
    const reportType = payload.report_type;
    
    // Generate daily report
    const report = await this.reportService.generateDailyReport();
    
    // Send report to stakeholders
    await this.emailService.sendDailyReport(report);
    
    // Store report in database
    await this.reportService.saveReport(report);
    
    return {
      status: 'success',
      message: 'Daily report generated and sent',
      report_id: report.id
    };
  } catch (error) {
    throw new Error(`Failed to generate daily report: ${error.message}`);
  }
}

Common Scheduled Events

# Examples of scheduled webhooks:

# Daily cleanup (2 AM)
0 2 * * * -> /webhooks/scheduled/cleanup
- Delete expired sessions
- Archive old logs  
- Clean temporary files

# Weekly reports (Monday 9 AM)
0 9 * * 1 -> /webhooks/scheduled/weekly-report
- Generate usage reports
- Send to administrators
- Update dashboards

# Monthly billing (1st of month, midnight)
0 0 1 * * -> /webhooks/scheduled/billing
- Calculate usage charges
- Generate invoices
- Process payments

# Hourly health check
0 * * * * -> /webhooks/scheduled/health-check
- Check external services
- Verify database connections
- Send alerts if needed

External Webhooks

Receiving External Webhooks

Handle webhooks from external services like payment providers, email services, etc:

// Handle Stripe webhook
@Post('external/stripe')
async handleStripeWebhook(
  @Body() payload: any,
  @Headers('stripe-signature') signature: string,
) {
  // Verify webhook signature
  const isValid = this.stripeService.verifyWebhook(payload, signature);
  if (!isValid) {
    throw new BadRequestException('Invalid webhook signature');
  }

  const event = payload;
  
  switch (event.type) {
    case 'payment_intent.succeeded':
      await this.handlePaymentSuccess(event.data.object);
      break;
      
    case 'payment_intent.payment_failed':
      await this.handlePaymentFailure(event.data.object);
      break;
      
    case 'customer.subscription.created':
      await this.handleSubscriptionCreated(event.data.object);
      break;
      
    case 'customer.subscription.deleted':
      await this.handleSubscriptionCancelled(event.data.object);
      break;
      
    default:
      this.logger.log(`Unhandled Stripe event: ${event.type}`);
  }

  return { status: 'success' };
}

Webhook Security

// Verify webhook signatures
import crypto from 'crypto';

function verifyWebhookSignature(
  payload: string,
  signature: string,
  secret: string
): boolean {
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(payload, 'utf8')
    .digest('hex');
    
  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(expectedSignature)
  );
}

// GitHub webhook verification
@Post('external/github')
async handleGitHubWebhook(
  @Body() payload: any,
  @Headers('x-hub-signature-256') signature: string,
) {
  const isValid = verifyWebhookSignature(
    JSON.stringify(payload),
    signature.replace('sha256=', ''),
    process.env.GITHUB_WEBHOOK_SECRET
  );
  
  if (!isValid) {
    throw new UnauthorizedException('Invalid webhook signature');
  }
  
  // Process GitHub webhook
  await this.processGitHubEvent(payload);
}

Webhook Configuration

Hasura Event Trigger Configuration

# Event trigger configuration example:
{
  "name": "user_created_trigger",
  "definition": {
    "enable_manual": false,
    "insert": {
      "columns": "*"
    },
    "retry_conf": {
      "num_retries": 3,
      "interval_sec": 10,
      "timeout_sec": 60
    },
    "webhook": "http://webhooks:3001/webhooks/user-created",
    "headers": [
      {
        "name": "Content-Type",
        "value": "application/json"
      },
      {
        "name": "X-Hasura-Admin-Secret",
        "value_from_env": "HASURA_GRAPHQL_ADMIN_SECRET"
      }
    ]
  }
}

Webhook Headers and Authentication

# Configure webhook headers:
# Content-Type: application/json
# X-Hasura-Admin-Secret: {{ADMIN_SECRET}}
# X-Request-ID: {{REQUEST_ID}}
# Authorization: Bearer {{API_TOKEN}}

# Custom headers for external services:
{
  "name": "X-API-Key",
  "value_from_env": "EXTERNAL_SERVICE_API_KEY"
}

# Session variables in headers:
{
  "name": "X-User-ID", 
  "value": "{{SESSION_VARIABLES['x-hasura-user-id']}}"
}

Error Handling and Retries

Retry Configuration

# Hasura retry configuration:
{
  "retry_conf": {
    "num_retries": 3,      # Number of retry attempts
    "interval_sec": 10,    # Delay between retries (seconds)
    "timeout_sec": 60      # Timeout per request (seconds)
  }
}

# Retry backoff strategy:
# Attempt 1: Immediate
# Attempt 2: 10 seconds later
# Attempt 3: 20 seconds later  
# Attempt 4: 40 seconds later

Error Response Handling

// Proper error responses for retries
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
  try {
    await this.processUserCreation(payload);
    
    // Success - return 200
    return { status: 'success' };
  } catch (error) {
    if (error instanceof ValidationError) {
      // Don't retry validation errors - return 200
      this.logger.warn('Validation error:', error.message);
      return { 
        status: 'error', 
        message: 'Validation failed',
        retry: false 
      };
    }
    
    if (error instanceof TemporaryError) {
      // Retry temporary errors - return 500
      this.logger.error('Temporary error:', error.message);
      throw new InternalServerErrorException(error.message);
    }
    
    // Unknown error - log and retry
    this.logger.error('Unknown error:', error);
    throw new InternalServerErrorException('Processing failed');
  }
}

Webhook Testing and Debugging

Local Testing

# Test webhook locally
curl -X POST http://localhost:3001/webhooks/user-created \
  -H "Content-Type: application/json" \
  -H "X-Hasura-Admin-Secret: your-admin-secret" \
  -d '{
    "event": {
      "session_variables": {},
      "op": "INSERT",
      "data": {
        "old": null,
        "new": {
          "id": "123e4567-e89b-12d3-a456-426614174000",
          "email": "test@example.com",
          "name": "Test User",
          "created_at": "2024-01-01T00:00:00Z"
        }
      }
    },
    "created_at": "2024-01-01T00:00:00Z",
    "id": "webhook-event-id",
    "trigger": {"name": "user_created_trigger"},
    "table": {"schema": "public", "name": "users"}
  }'

Webhook Monitoring

# Monitor webhook performance
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
  const startTime = Date.now();
  const eventId = payload.id;
  
  try {
    this.logger.log(`Processing webhook ${eventId}`);
    
    await this.processUserCreation(payload);
    
    const duration = Date.now() - startTime;
    this.metricsService.recordWebhookSuccess('user_created', duration);
    
    return { status: 'success' };
  } catch (error) {
    const duration = Date.now() - startTime;
    this.metricsService.recordWebhookError('user_created', duration, error);
    
    throw error;
  }
}

Advanced Webhook Patterns

Webhook Aggregation

// Batch process multiple events
@Injectable()
export class WebhookBatchProcessor {
  private eventQueue: any[] = [];
  private batchSize = 10;
  private batchTimeout = 5000; // 5 seconds
  
  async addEvent(event: any) {
    this.eventQueue.push(event);
    
    if (this.eventQueue.length >= this.batchSize) {
      await this.processBatch();
    }
  }
  
  @Cron('*/5 * * * * *') // Every 5 seconds
  async processBatch() {
    if (this.eventQueue.length === 0) return;
    
    const batch = this.eventQueue.splice(0, this.batchSize);
    
    try {
      await this.processBulkEvents(batch);
    } catch (error) {
      // Handle batch processing error
      this.logger.error('Batch processing failed:', error);
      // Optionally re-queue events
    }
  }
}

Webhook Transformation

// Transform webhook data before processing
@Post('webhooks/order-status-changed')
async handleOrderStatusChange(@Body() payload: any) {
  // Transform Hasura event to standard format
  const standardEvent = this.transformHasuraEvent(payload);
  
  // Send to external systems in their expected format
  await Promise.all([
    this.notifyShippingService(this.transformForShipping(standardEvent)),
    this.updateAnalytics(this.transformForAnalytics(standardEvent)),
    this.sendCustomerNotification(this.transformForEmail(standardEvent))
  ]);
}

private transformHasuraEvent(hasuraPayload: any) {
  return {
    eventType: 'order.status.changed',
    timestamp: hasuraPayload.created_at,
    orderId: hasuraPayload.event.data.new.id,
    oldStatus: hasuraPayload.event.data.old?.status,
    newStatus: hasuraPayload.event.data.new.status,
    customerId: hasuraPayload.event.data.new.customer_id
  };
}

Best Practices

Next Steps