Création de microservices orientés événements avec AWS Kinesis

Architecture orientée événements avec AWS Kinesis pour la scalabilité

Sommaire

AWS Kinesis est devenu un pilier pour la construction d’architectures de microservices modernes orientées événements, permettant le traitement de données en temps réel à grande échelle avec une surcharge opérationnelle minimale.

amazon-kinesis

Comprendre l’architecture de microservices orientée événements

L’architecture orientée événements (EDA) est un modèle de conception où les services communiquent via des événements plutôt que par des appels synchrones directs. Cette approche offre plusieurs avantages :

  • Couplage faible : Les services n’ont pas besoin de connaître l’existence des autres
  • Extensibilité : Chaque service peut être mis à l’échelle indépendamment en fonction de sa charge de travail
  • Résilience : Les défaillances d’un service ne se propagent pas aux autres
  • Flexibilité : De nouveaux services peuvent être ajoutés sans modifier les existants

AWS Kinesis fournit la base pour implémenter l’EDA en agissant comme un flux d’événements distribué et durable qui découple les producteurs des consommateurs.

Pour une perspective plus large sur les plateformes de streaming, consultez notre guide de démarrage rapide Apache Kafka pour une comparaison avec les alternatives auto-hébergées.

Aperçu d’AWS Kinesis

AWS propose plusieurs services Kinesis, chacun conçu pour des cas d’utilisation spécifiques. Lorsque vous évaluez des solutions de streaming, vous pourriez également vouloir envisager de comparer RabbitMQ sur EKS vs SQS pour différents modèles de messagerie et implications de coûts.

Kinesis Data Streams

Le service de streaming principal qui capture, stocke et traite les enregistrements de données en temps réel. Data Streams est idéal pour :

  • Les applications de traitement en temps réel personnalisées
  • La construction de pipelines de données avec une latence inférieure à la seconde
  • Le traitement de millions d’événements par seconde
  • La mise en œuvre de modèles d’origine événementielle (event sourcing)

Kinesis Data Firehose

Un service entièrement géré qui livre des données en streaming vers des destinations comme S3, Redshift, Elasticsearch ou des points de terminaison HTTP. Le mieux adapté pour :

  • Les pipelines ETL simples
  • L’agrégation et l’archivage des journaux
  • L’analyse quasi temps réel (latence minimale de 60 secondes)
  • Les scénarios où la logique de traitement personnalisée n’est pas nécessaire

Kinesis Data Analytics

Traite et analyse les données en streaming en utilisant SQL ou Apache Flink. Les cas d’utilisation incluent :

  • Tableaux de bord en temps réel
  • ETL en streaming
  • Détection d’anomalies en temps réel
  • Génération continue de métriques

Modèles d’architecture avec Kinesis

1. Modèle d’origine événementielle (Event Sourcing)

L’origine événementielle stocke tous les changements d’état de l’application sous forme de séquence d’événements. Kinesis est parfait pour cela. Si vous avez besoin d’un rappel sur les fondamentaux de Python, consultez notre Fiche mémo Python :

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publie un événement vers le flux Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Exemple : événement d'inscription utilisateur
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Séparez les opérations de lecture et d’écriture en utilisant Kinesis comme bus d’événements :

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Modèle Fan-Out avec Lambda

Traitez des événements d’un seul flux avec plusieurs fonctions Lambda. Pour les implémentations TypeScript avec une sécurité de type plus forte, consultez notre Fiche mémo TypeScript :

// Consommateur Lambda pour les notifications par e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Un autre Lambda pour les mises à jour de stock
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Meilleures pratiques pour la production

1. Choisir le bon nombre de shards

Calculez vos besoins en shards en fonction de :

  • Ingress : 1 Mo/sec ou 1 000 enregistrements/sec par shard
  • Egress : 2 Mo/sec par shard (consommateurs standards) ou 2 Mo/sec par consommateur avec le fan-out amélioré
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calcule le nombre requis de shards"""
    # Capacité d'ingress
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Ajout d'une marge

2. Implémenter une gestion d’erreur appropriée

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Met un enregistrement avec une stratégie de reprise à backoff exponentiel"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Backoff exponentiel
                    continue
            raise

3. Utiliser le fan-out amélioré pour plusieurs consommateurs

Le fan-out amélioré fournit un débit dédié pour chaque consommateur :

# Enregistrer un consommateur avec le fan-out amélioré
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Surveiller les métriques clés

Métriques CloudWatch essentielles à suivre :

  • IncomingRecords : Nombre d’enregistrements mis avec succès
  • IncomingBytes : Volume en octets des données entrantes
  • GetRecords.IteratorAgeMilliseconds : Retard des consommateurs
  • WriteProvisionedThroughputExceeded : Événements de limitation (throttling)
  • ReadProvisionedThroughputExceeded : Limitation des consommateurs

5. Implémenter une stratégie de clé de partition appropriée

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Génère une clé de partition avec une distribution uniforme"""
    # Utiliser le hachage cohérent pour une distribution uniforme
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Exemple d’implémentation dans le monde réel

Voici un exemple complet d’architecture de microservices de traitement de commandes :

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Crée une commande et publie des événements"""
        order_id = self.generate_order_id()
        
        # Publier l'événement de création de commande
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publie un événement vers le flux Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consomme les événements de commande et met à jour le stock"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Mettre à jour la base de données de stock
        for item in order_data['items']:
            # Implémentation ici
            pass

Stratégie de migration du monolithe aux microservices

Phase 1 : Modèle du Figuier Étrangleur

Commencez par router des événements spécifiques via Kinesis tout en conservant le monolithe :

  1. Identifiez les contextes délimités dans votre monolithe
  2. Créez des flux Kinesis pour les événements transversaux
  3. Extrait progressivement les services qui consomment ces flux
  4. Maintenez la compatibilité ascendante avec le monolithe

Phase 2 : Traitement parallèle

Exécutez les systèmes anciens et nouveaux en parallèle :

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Écrit à la fois dans le système legacy et dans le flux d'événements"""
    try:
        # Écrire d'abord dans le nouveau système
        publish_to_kinesis(kinesis_stream, data)
        
        # Puis mettre à jour le système legacy
        legacy_db.update(data)
    except Exception as e:
        # Implémenter la logique de compensation
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3 : Migration complète

Une fois la confiance établie, routez tout le trafic via l’architecture orientée événements.

Stratégies d’optimisation des coûts

Pour des conseils complets sur les modèles d’infrastructure de données, y compris le stockage d’objets et les architectures de bases de données, consultez Infrastructure de données pour les systèmes IA : Stockage d’objets, bases de données, recherche et architecture de données IA.

1. Utiliser le mode à la demande pour les charges de travail variables

Le mode à la demande (introduit en 2023) ajuste automatiquement l’échelle en fonction du trafic :

# Créer un flux en mode à la demande
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implémenter l’agrégation de données

Réduisez les unités de charge PUT en regroupant les enregistrements :

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agrège les enregistrements pour réduire les coûts"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Envoyer l'enregistrement agrégé
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimiser la rétention des données

La rétention par défaut est de 24 heures. Ne l’étendez que si nécessaire :

# Définir la rétention sur 7 jours
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Meilleures pratiques de sécurité

1. Chiffrement au repos et en transit

# Créer un flux chiffré
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Activer le chiffrement
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Politiques IAM pour le principe du moindre privilège

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Points de terminaison VPC

Gardez le trafic au sein du réseau AWS. Pour la gestion de l’infrastructure AWS en tant que code, envisagez d’utiliser Terraform - consultez notre fiche mémo Terraform :

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilité et débogage

Traçage distribué avec X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Requêtes CloudWatch Logs Insights

-- Trouver les temps de traitement lents
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Suivre les taux d'erreur
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Modèles avancés

Modèle Saga pour les transactions distribuées

Implémentez des transactions longue durée entre microservices :

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Exécute la saga avec une logique de compensation"""
        try:
            # Étape 1 : Réserver le stock
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Étape 2 : Traiter le paiement
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Étape 3 : Expédier la commande
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compenser dans l'ordre inverse
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Exécute les transactions de compensation"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Stratégies de test

Développement local avec LocalStack

# Démarrer LocalStack avec Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Créer un flux de test
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Tests d’intégration

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test de publication d'événements avec Kinesis mocké"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Réglage des performances

Optimiser la taille du lot

def optimize_batch_processing(records, batch_size=500):
    """Traite les enregistrements par lots optimisés"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Utiliser la mise en pool de connexions

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Liens utiles

Ressources AWS Kinesis :

Articles connexes :

Conclusion

AWS Kinesis fournit une base robuste pour la construction d’architectures de microservices orientées événements évolutives. En suivant ces modèles et bonnes pratiques, vous pouvez créer des systèmes résilients, évolutifs et maintenables. Commencez petit avec un seul flux d’événements, validez votre architecture, et étendez progressivement vers des modèles plus complexes à mesure que votre système grandit.

La clé du succès réside dans la compréhension de vos besoins en flux de données, le choix du service Kinesis approprié pour votre cas d’utilisation et la mise en œuvre d’une surveillance et d’une gestion des erreurs appropriées dès le départ.