Aller au contenu principal

Guide : Intégration des Webhooks

Ce guide vous explique comment recevoir et traiter les webhooks SendAs.me dans votre application, avec des exemples dans plusieurs langages.

Principe général

Lorsqu'un événement se produit (e-mail envoyé, échec, OAuth expiré, etc.), SendAs.me effectue un POST HTTP vers l'URL que vous avez configurée, avec :

  • Un body JSON contenant le payload de l'événement
  • Un header X-Gateway-Signature pour vérifier l'authenticité
  • Un header Content-Type: application/json

Votre endpoint doit :

  1. Vérifier la signature HMAC avant tout traitement
  2. Répondre avec un code 2xx dans les 30 secondes
  3. Traiter le payload de façon asynchrone si nécessaire

Configurer un endpoint webhook

Étape 1 : Créer l'endpoint dans votre application

Créez un endpoint HTTP qui accepte les requêtes POST. Voici la structure minimale.

Étape 2 : Créer la configuration webhook dans SendAs.me

Via l'API :

curl -X POST https://api.sendas.me/apps/app_pk_AbCdEfGhIjKlMnOp/webhooks \
-H "X-Api-Secret: votre_api_secret" \
-H "Content-Type: application/json" \
-d '{
"url": "https://votre-app.com/webhooks/sendas",
"events": ["email_sent", "email_failed_perm", "oauth_expired"]
}'

Notez le secret retourné dans la réponse — vous en aurez besoin pour vérifier les signatures.

Exemples d'implémentation

Python — FastAPI

import hmac
import hashlib
import json
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional

app = FastAPI()

WEBHOOK_SECRET = "whsec_votre_secret_sendas"

def verify_signature(body: bytes, signature: str, secret: str) -> bool:
"""Vérifie la signature HMAC-SHA256 du webhook SendAs.me"""
expected = hmac.new(
secret.encode("utf-8"),
body,
hashlib.sha256
).hexdigest()
received = signature.replace("sha256=", "")
return hmac.compare_digest(expected, received)

@app.post("/webhooks/sendas")
async def handle_webhook(
request: Request,
x_gateway_signature: Optional[str] = Header(None)
):
# Récupérer le body brut
body = await request.body()

# Vérifier la signature
if not x_gateway_signature:
raise HTTPException(status_code=401, detail="Signature manquante")

if not verify_signature(body, x_gateway_signature, WEBHOOK_SECRET):
raise HTTPException(status_code=401, detail="Signature invalide")

# Parser le payload
payload = json.loads(body)
event = payload.get("event")
app_id = payload.get("app_id")
data = payload.get("data", {})

# Traiter selon le type d'événement
if event == "email_sent":
await handle_email_sent(app_id, data)
elif event == "email_failed_perm":
await handle_email_failed(app_id, data)
elif event == "oauth_expired":
await handle_oauth_expired(app_id, data)

# Répondre 200 immédiatement
return {"status": "received"}

async def handle_email_sent(app_id: str, data: dict):
print(f"Email envoyé depuis {data['from_address']} vers {data['to_address']}")
# Votre logique métier ici

async def handle_email_failed(app_id: str, data: dict):
print(f"Échec permanent pour {data['to_address']}: {data['error']}")
# Notifier l'équipe, mettre à jour votre DB, etc.

async def handle_oauth_expired(app_id: str, data: dict):
print(f"Token OAuth expiré pour {data['email']} (plugin: {data['plugin']})")
# Alerter l'administrateur pour reconnecter le compte

Python — Flask

import hmac
import hashlib
import json
from flask import Flask, request, jsonify, abort

app = Flask(__name__)

WEBHOOK_SECRET = "whsec_votre_secret_sendas"

def verify_signature(body: bytes, signature: str) -> bool:
expected = hmac.new(
WEBHOOK_SECRET.encode("utf-8"),
body,
hashlib.sha256
).hexdigest()
received = signature.replace("sha256=", "")
return hmac.compare_digest(expected, received)

@app.route("/webhooks/sendas", methods=["POST"])
def handle_webhook():
# Vérifier la signature
signature = request.headers.get("X-Gateway-Signature", "")
if not verify_signature(request.data, signature):
abort(401)

payload = request.get_json()
event = payload.get("event")

handlers = {
"email_sent": handle_email_sent,
"email_failed_perm": handle_email_failed,
"email_failed_temp": handle_email_retry,
"oauth_connected": handle_oauth_connected,
"oauth_expired": handle_oauth_expired,
}

handler = handlers.get(event)
if handler:
handler(payload)

return jsonify({"status": "ok"}), 200

def handle_email_sent(payload):
data = payload["data"]
app.logger.info(f"Email envoyé: {data['subject']} -> {data['to_address']}")

def handle_email_failed(payload):
data = payload["data"]
app.logger.error(f"Échec définitif: {data['error']} pour {data['to_address']}")
# Envoyer une alerte, créer un ticket support, etc.

def handle_email_retry(payload):
data = payload["data"]
app.logger.warning(f"Retry #{data['retry_count']}: prochain essai à {data['next_retry_at']}")

def handle_oauth_connected(payload):
data = payload["data"]
app.logger.info(f"OAuth connecté: {data['email']} via {data['plugin']}")

def handle_oauth_expired(payload):
data = payload["data"]
app.logger.critical(f"TOKEN EXPIRÉ: {data['email']} - reconnexion nécessaire!")
# Envoyer un email d'alerte à l'administrateur

if __name__ == "__main__":
app.run(port=3000)

Node.js — Express

const express = require('express');
const crypto = require('crypto');

const app = express();
const WEBHOOK_SECRET = 'whsec_votre_secret_sendas';

// Important: utiliser express.raw() pour accéder au body brut
app.use('/webhooks/sendas', express.raw({ type: 'application/json' }));
app.use(express.json()); // pour les autres routes

function verifySignature(body, signatureHeader) {
if (!signatureHeader) return false;

const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(body)
.digest('hex');

const received = signatureHeader.replace('sha256=', '');

// Comparaison sécurisée contre les timing attacks
return crypto.timingSafeEqual(
Buffer.from(expected, 'hex'),
Buffer.from(received, 'hex')
);
}

app.post('/webhooks/sendas', (req, res) => {
const signature = req.headers['x-gateway-signature'];

// Vérifier la signature avec le body brut
if (!verifySignature(req.body, signature)) {
console.error('Signature webhook invalide');
return res.status(401).json({ error: 'Signature invalide' });
}

// Parser le payload (req.body est un Buffer)
const payload = JSON.parse(req.body.toString());
const { event, app_id, data } = payload;

console.log(`Webhook reçu: ${event} pour app ${app_id}`);

// Traitement asynchrone (ne pas bloquer la réponse)
setImmediate(() => processWebhook(event, app_id, data));

// Répondre immédiatement
res.status(200).json({ status: 'received' });
});

async function processWebhook(event, appId, data) {
switch (event) {
case 'email_sent':
console.log(`✓ Email envoyé: ${data.subject} -> ${data.to_address}`);
// Mettre à jour votre base de données, etc.
break;

case 'email_failed_perm':
console.error(`✗ Échec définitif: ${data.error}`);
// Notifier l'équipe, supprimer l'adresse invalide, etc.
await notifyTeam(data);
break;

case 'oauth_expired':
console.error(`! Token OAuth expiré: ${data.email}`);
// Envoyer une alerte à l'admin
await sendAdminAlert(appId, data);
break;
}
}

async function notifyTeam(data) {
// Votre logique de notification
}

async function sendAdminAlert(appId, data) {
// Votre logique d'alerte
}

app.listen(3000, () => console.log('Serveur webhook sur le port 3000'));

PHP

<?php

define('WEBHOOK_SECRET', 'whsec_votre_secret_sendas');

/**
* Vérifie la signature HMAC-SHA256 du webhook SendAs.me
*/
function verifySignature(string $body, string $signatureHeader): bool
{
if (empty($signatureHeader)) {
return false;
}

$expected = hash_hmac('sha256', $body, WEBHOOK_SECRET);
$received = str_replace('sha256=', '', $signatureHeader);

// hash_equals est résistant aux timing attacks
return hash_equals($expected, $received);
}

// Récupérer le body brut
$body = file_get_contents('php://input');
$signature = $_SERVER['HTTP_X_GATEWAY_SIGNATURE'] ?? '';

// Vérifier la signature
if (!verifySignature($body, $signature)) {
http_response_code(401);
echo json_encode(['error' => 'Signature invalide']);
exit;
}

// Parser le payload
$payload = json_decode($body, true);
if (!$payload) {
http_response_code(400);
echo json_encode(['error' => 'Payload invalide']);
exit;
}

$event = $payload['event'] ?? '';
$appId = $payload['app_id'] ?? '';
$data = $payload['data'] ?? [];

// Répondre 200 immédiatement
http_response_code(200);
echo json_encode(['status' => 'received']);

// Traitement (après la réponse avec fastcgi_finish_request si disponible)
if (function_exists('fastcgi_finish_request')) {
fastcgi_finish_request();
}

switch ($event) {
case 'email_sent':
error_log("Email envoyé: {$data['subject']} -> {$data['to_address']}");
break;

case 'email_failed_perm':
error_log("ERREUR: Échec définitif pour {$data['to_address']}: {$data['error']}");
// Notifier par email, logger en DB, etc.
break;

case 'oauth_expired':
error_log("ALERTE: Token OAuth expiré pour {$data['email']}");
// Envoyer une alerte à l'administrateur
break;
}

Meilleures pratiques

1. Toujours vérifier la signature

Ne traitez jamais un webhook sans vérifier sa signature HMAC. N'importe qui pourrait envoyer des requêtes POST vers votre endpoint.

2. Répondre rapidement

Répondez 200 OK dans les 30 secondes, même si votre traitement prend plus de temps. Utilisez des queues ou des jobs asynchrones pour le traitement long.

3. Gérer l'idempotence

SendAs.me peut livrer le même webhook plusieurs fois (en cas de retry). Votre code doit gérer les doublons :

# Exemple Python : vérifier si le webhook a déjà été traité
async def handle_email_sent(log_id: str, data: dict):
# Vérifier si ce log_id a déjà été traité
if await db.webhook_processed(log_id):
return # Déjà traité, ignorer

await db.mark_webhook_processed(log_id)
# ... traitement ...

4. Logger les webhooks reçus

Conservez un historique des webhooks reçus pour le débogage :

@app.post("/webhooks/sendas")
async def handle_webhook(request: Request, ...):
payload = ...
await db.save_webhook_log(payload) # Logger avant traitement
# ... traitement ...

5. Surveiller les oauth_expired

Les événements oauth_expired sont critiques — ils signifient que les envois d'e-mails vont échouer. Mettez en place une alerte immédiate (email, Slack, PagerDuty) lorsque cet événement est reçu.

async def handle_oauth_expired(app_id: str, data: dict):
await send_slack_alert(
channel="#alerts-critical",
message=f"URGENT: Token OAuth expiré pour {data['email']} sur l'app {app_id}. "
f"Reconnexion requise immédiatement sur {PORTAL_URL}"
)

Tester votre endpoint

Utilisez l'endpoint de test de l'API SendAs.me pour envoyer un webhook de test :

curl -X POST https://api.sendas.me/apps/app_pk_AbCdEfGhIjKlMnOp/webhooks/wconf_abc123/test \
-H "X-Api-Secret: votre_api_secret"

Ou testez localement avec un tunnel ngrok :

# Exposer votre serveur local
ngrok http 3000

# Copier l'URL ngrok (ex: https://abc123.ngrok.io)
# Et la configurer comme URL webhook dans SendAs.me