Updated for nself v0.4.8
React to database changes and external events with Hasura Event Triggers and webhook integrations for real-time data processing and external service notifications.
Webhooks in nself enable real-time reactions to database changes through Hasura Event Triggers, scheduled events, and external webhook integrations. Create webhook handlers using custom service templates like express-ts, nest-ts, or fastapi for automatic processing, notifications, and data synchronization.
Automatically trigger webhooks when database rows are inserted, updated, or deleted:
# Event triggers fire on:
# - INSERT operations (new records)
# - UPDATE operations (modified records)
# - DELETE operations (removed records)
# - Manual events (triggered via GraphQL)
# Common use cases:
# - Send welcome email when user registers
# - Update search index when content changes
# - Sync data with external systems
# - Generate notifications
# - Audit logging
# - Cache invalidationConfigure event triggers in Hasura Console:
Create a webhook handler using custom service templates:
# In .env file - create a webhook service
# Available templates: express-ts, nest-ts, fastapi, etc.
CS_1=webhooks:nest-ts:3001:/webhooks
# Rebuild to generate
nself build && nself start// services/webhooks/src/webhooks/user.controller.ts
import { Controller, Post, Body, Headers, Logger } from '@nestjs/common';
import { EmailService } from '../services/email.service';
import { AnalyticsService } from '../services/analytics.service';
interface EventTriggerPayload {
event: {
session_variables: Record<string, any>;
op: 'INSERT' | 'UPDATE' | 'DELETE';
data: {
old: any;
new: any;
};
};
created_at: string;
id: string;
delivery_info: {
max_retries: number;
current_retry: number;
};
trigger: {
name: string;
};
table: {
schema: string;
name: string;
};
}
@Controller('webhooks')
export class UserWebhookController {
private readonly logger = new Logger(UserWebhookController.name);
constructor(
private emailService: EmailService,
private analyticsService: AnalyticsService,
) {}
@Post('user-created')
async handleUserCreated(
@Body() payload: EventTriggerPayload,
@Headers('x-hasura-admin-secret') adminSecret: string,
) {
// Verify webhook authenticity
if (adminSecret !== process.env.HASURA_GRAPHQL_ADMIN_SECRET) {
this.logger.warn('Invalid admin secret in webhook request');
return { status: 'unauthorized' };
}
try {
const { event } = payload;
const newUser = event.data.new;
this.logger.log(`Processing user created event for: ${newUser.email}`);
// Send welcome email
await this.emailService.sendWelcomeEmail({
email: newUser.email,
name: newUser.name,
userId: newUser.id,
});
// Track user registration in analytics
await this.analyticsService.trackEvent({
event: 'user_registered',
userId: newUser.id,
properties: {
email: newUser.email,
name: newUser.name,
registration_method: newUser.registration_method || 'email',
},
});
// Create user profile
await this.createDefaultUserProfile(newUser);
this.logger.log(`Successfully processed user created event for: ${newUser.email}`);
return {
status: 'success',
message: 'User created event processed successfully',
};
} catch (error) {
this.logger.error('Error processing user created event:', error);
// Return error to trigger retry
throw new Error(`Failed to process user created event: ${error.message}`);
}
}
@Post('user-updated')
async handleUserUpdated(@Body() payload: EventTriggerPayload) {
const { event } = payload;
const oldUser = event.data.old;
const newUser = event.data.new;
// Check what fields changed
const changedFields = this.getChangedFields(oldUser, newUser);
if (changedFields.includes('email')) {
// Email changed - send verification email
await this.emailService.sendEmailVerification(newUser.email, newUser.id);
}
if (changedFields.includes('name')) {
// Update external services with new name
await this.syncUserNameToExternalServices(newUser);
}
return { status: 'success' };
}
private async createDefaultUserProfile(user: any) {
// Create default user profile
// This could be a GraphQL mutation or direct database insert
}
private getChangedFields(oldData: any, newData: any): string[] {
const changed = [];
for (const key in newData) {
if (oldData[key] !== newData[key]) {
changed.push(key);
}
}
return changed;
}
}# services/webhooks/order_processing.py
# Create with: CS_2=order-webhooks:fastapi:3002:/order-webhooks
from flask import Flask, request, jsonify
import os
import logging
from typing import Dict, Any
import requests
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.route('/webhooks/order-completed', methods=['POST'])
def handle_order_completed():
try:
# Verify webhook authenticity
admin_secret = request.headers.get('x-hasura-admin-secret')
if admin_secret != os.getenv('HASURA_GRAPHQL_ADMIN_SECRET'):
return jsonify({'status': 'unauthorized'}), 401
payload = request.get_json()
event = payload['event']
new_order = event['data']['new']
logger.info(f"Processing order completed: {new_order['id']}")
# Process the order
result = process_order_completion(new_order)
if result['success']:
return jsonify({
'status': 'success',
'message': 'Order processed successfully'
})
else:
# Return error to trigger retry
return jsonify({
'status': 'error',
'message': result['error']
}), 500
except Exception as e:
logger.error(f"Error processing order webhook: {str(e)}")
return jsonify({
'status': 'error',
'message': 'Failed to process order'
}), 500
def process_order_completion(order: Dict[str, Any]) -> Dict[str, Any]:
try:
# Send order confirmation email
send_order_confirmation_email(order)
# Update inventory
update_inventory(order['items'])
# Create shipment
shipment = create_shipment(order)
# Send to fulfillment service
notify_fulfillment_service(order, shipment)
# Update analytics
track_order_completion(order)
return {'success': True}
except Exception as e:
logger.error(f"Order processing failed: {str(e)}")
return {'success': False, 'error': str(e)}
def send_order_confirmation_email(order: Dict[str, Any]):
# Send email using your email service
pass
def update_inventory(items: list):
# Update product inventory
pass
def create_shipment(order: Dict[str, Any]):
# Create shipment record
return {'id': 'ship_123', 'tracking_number': 'TRK123456'}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3002)Schedule webhooks to run at specific intervals:
# Create scheduled event in Hasura Console:
# Name: daily-report-generation
# Webhook: http://nestjs:3001/webhooks/scheduled/daily-report
# Schedule: 0 9 * * * (9 AM daily)
# Payload: {"report_type": "daily_summary"}
@Post('scheduled/daily-report')
async handleDailyReport(@Body() payload: any) {
try {
const reportType = payload.report_type;
// Generate daily report
const report = await this.reportService.generateDailyReport();
// Send report to stakeholders
await this.emailService.sendDailyReport(report);
// Store report in database
await this.reportService.saveReport(report);
return {
status: 'success',
message: 'Daily report generated and sent',
report_id: report.id
};
} catch (error) {
throw new Error(`Failed to generate daily report: ${error.message}`);
}
}# Examples of scheduled webhooks:
# Daily cleanup (2 AM)
0 2 * * * -> /webhooks/scheduled/cleanup
- Delete expired sessions
- Archive old logs
- Clean temporary files
# Weekly reports (Monday 9 AM)
0 9 * * 1 -> /webhooks/scheduled/weekly-report
- Generate usage reports
- Send to administrators
- Update dashboards
# Monthly billing (1st of month, midnight)
0 0 1 * * -> /webhooks/scheduled/billing
- Calculate usage charges
- Generate invoices
- Process payments
# Hourly health check
0 * * * * -> /webhooks/scheduled/health-check
- Check external services
- Verify database connections
- Send alerts if neededHandle webhooks from external services like payment providers, email services, etc:
// Handle Stripe webhook
@Post('external/stripe')
async handleStripeWebhook(
@Body() payload: any,
@Headers('stripe-signature') signature: string,
) {
// Verify webhook signature
const isValid = this.stripeService.verifyWebhook(payload, signature);
if (!isValid) {
throw new BadRequestException('Invalid webhook signature');
}
const event = payload;
switch (event.type) {
case 'payment_intent.succeeded':
await this.handlePaymentSuccess(event.data.object);
break;
case 'payment_intent.payment_failed':
await this.handlePaymentFailure(event.data.object);
break;
case 'customer.subscription.created':
await this.handleSubscriptionCreated(event.data.object);
break;
case 'customer.subscription.deleted':
await this.handleSubscriptionCancelled(event.data.object);
break;
default:
this.logger.log(`Unhandled Stripe event: ${event.type}`);
}
return { status: 'success' };
}// Verify webhook signatures
import crypto from 'crypto';
function verifyWebhookSignature(
payload: string,
signature: string,
secret: string
): boolean {
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload, 'utf8')
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
// GitHub webhook verification
@Post('external/github')
async handleGitHubWebhook(
@Body() payload: any,
@Headers('x-hub-signature-256') signature: string,
) {
const isValid = verifyWebhookSignature(
JSON.stringify(payload),
signature.replace('sha256=', ''),
process.env.GITHUB_WEBHOOK_SECRET
);
if (!isValid) {
throw new UnauthorizedException('Invalid webhook signature');
}
// Process GitHub webhook
await this.processGitHubEvent(payload);
}# Event trigger configuration example:
{
"name": "user_created_trigger",
"definition": {
"enable_manual": false,
"insert": {
"columns": "*"
},
"retry_conf": {
"num_retries": 3,
"interval_sec": 10,
"timeout_sec": 60
},
"webhook": "http://webhooks:3001/webhooks/user-created",
"headers": [
{
"name": "Content-Type",
"value": "application/json"
},
{
"name": "X-Hasura-Admin-Secret",
"value_from_env": "HASURA_GRAPHQL_ADMIN_SECRET"
}
]
}
}# Configure webhook headers:
# Content-Type: application/json
# X-Hasura-Admin-Secret: {{ADMIN_SECRET}}
# X-Request-ID: {{REQUEST_ID}}
# Authorization: Bearer {{API_TOKEN}}
# Custom headers for external services:
{
"name": "X-API-Key",
"value_from_env": "EXTERNAL_SERVICE_API_KEY"
}
# Session variables in headers:
{
"name": "X-User-ID",
"value": "{{SESSION_VARIABLES['x-hasura-user-id']}}"
}# Hasura retry configuration:
{
"retry_conf": {
"num_retries": 3, # Number of retry attempts
"interval_sec": 10, # Delay between retries (seconds)
"timeout_sec": 60 # Timeout per request (seconds)
}
}
# Retry backoff strategy:
# Attempt 1: Immediate
# Attempt 2: 10 seconds later
# Attempt 3: 20 seconds later
# Attempt 4: 40 seconds later// Proper error responses for retries
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
try {
await this.processUserCreation(payload);
// Success - return 200
return { status: 'success' };
} catch (error) {
if (error instanceof ValidationError) {
// Don't retry validation errors - return 200
this.logger.warn('Validation error:', error.message);
return {
status: 'error',
message: 'Validation failed',
retry: false
};
}
if (error instanceof TemporaryError) {
// Retry temporary errors - return 500
this.logger.error('Temporary error:', error.message);
throw new InternalServerErrorException(error.message);
}
// Unknown error - log and retry
this.logger.error('Unknown error:', error);
throw new InternalServerErrorException('Processing failed');
}
}# Test webhook locally
curl -X POST http://localhost:3001/webhooks/user-created \
-H "Content-Type: application/json" \
-H "X-Hasura-Admin-Secret: your-admin-secret" \
-d '{
"event": {
"session_variables": {},
"op": "INSERT",
"data": {
"old": null,
"new": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"email": "test@example.com",
"name": "Test User",
"created_at": "2024-01-01T00:00:00Z"
}
}
},
"created_at": "2024-01-01T00:00:00Z",
"id": "webhook-event-id",
"trigger": {"name": "user_created_trigger"},
"table": {"schema": "public", "name": "users"}
}'# Monitor webhook performance
@Post('webhooks/user-created')
async handleUserCreated(@Body() payload: any) {
const startTime = Date.now();
const eventId = payload.id;
try {
this.logger.log(`Processing webhook ${eventId}`);
await this.processUserCreation(payload);
const duration = Date.now() - startTime;
this.metricsService.recordWebhookSuccess('user_created', duration);
return { status: 'success' };
} catch (error) {
const duration = Date.now() - startTime;
this.metricsService.recordWebhookError('user_created', duration, error);
throw error;
}
}// Batch process multiple events
@Injectable()
export class WebhookBatchProcessor {
private eventQueue: any[] = [];
private batchSize = 10;
private batchTimeout = 5000; // 5 seconds
async addEvent(event: any) {
this.eventQueue.push(event);
if (this.eventQueue.length >= this.batchSize) {
await this.processBatch();
}
}
@Cron('*/5 * * * * *') // Every 5 seconds
async processBatch() {
if (this.eventQueue.length === 0) return;
const batch = this.eventQueue.splice(0, this.batchSize);
try {
await this.processBulkEvents(batch);
} catch (error) {
// Handle batch processing error
this.logger.error('Batch processing failed:', error);
// Optionally re-queue events
}
}
}// Transform webhook data before processing
@Post('webhooks/order-status-changed')
async handleOrderStatusChange(@Body() payload: any) {
// Transform Hasura event to standard format
const standardEvent = this.transformHasuraEvent(payload);
// Send to external systems in their expected format
await Promise.all([
this.notifyShippingService(this.transformForShipping(standardEvent)),
this.updateAnalytics(this.transformForAnalytics(standardEvent)),
this.sendCustomerNotification(this.transformForEmail(standardEvent))
]);
}
private transformHasuraEvent(hasuraPayload: any) {
return {
eventType: 'order.status.changed',
timestamp: hasuraPayload.created_at,
orderId: hasuraPayload.event.data.new.id,
oldStatus: hasuraPayload.event.data.old?.status,
newStatus: hasuraPayload.event.data.new.status,
customerId: hasuraPayload.event.data.new.customer_id
};
}