Het bouwen van gebeurtenisgedreven microservices met AWS Kinesis

Event-gedreven architectuur met AWS Kinesis voor schaalbaarheid

Inhoud

AWS Kinesis is een hoeksteen geworden voor het bouwen van moderne, gebeurtenisgedreven microservices-architecturen, waarbij het mogelijk maakt om data in realtime op schaal te verwerken met minimale operationele overhead.

amazon-kinesis

Begrip van Gebeurtenisgedreven Microservices-architectuur

Gebeurtenisgedreven architectuur (EDA) is een ontwerppatroon waarbij services communiceren via gebeurtenissen in plaats van directe synchrone oproepen. Deze aanbod biedt verschillende voordelen:

  • Losse koppeling: Services hoeven niet bekend te zijn met het bestaan van elkaar
  • Schaalbaarheid: Elke service schaalt onafhankelijk op basis van de eigen werklast
  • Resilience: Mislukkingen in één service verspreiden zich niet naar andere services
  • Flexibiliteit: Nieuwe services kunnen worden toegevoegd zonder bestaande services aan te passen

AWS Kinesis vormt het fundament voor het implementeren van EDA door te fungeren als een gedistribueerde, duurzame gebeurtenisstream die producenten van consumenten decoupleert.

Voor een breder perspectief op streaming-platforms, raadpleeg onze Apache Kafka Quickstart gids voor een vergelijking met zelf-gehoste alternatieven.

Overzicht van AWS Kinesis

AWS biedt verschillende Kinesis-services, elk ontworpen voor specifieke gebruiksscenario’s. Bij het evalueren van streaming-oplossingen wilt u mogelijk ook overwegen om RabbitMQ op EKS te vergelijken met SQS voor verschillende messaging-patronen en kostenimplicaties.

Kinesis Data Streams

De kernstreamingservice die datarecords in realtime vastlegt, opslaat en verwerkt. Data Streams is ideaal voor:

  • Aangepaste realtime verwerkingsapplicaties
  • Het bouwen van datapijplijnen met een latentie van minder dan een seconde
  • Het verwerken van miljoenen gebeurtenissen per seconde
  • Het implementeren van event sourcing-patronen

Kinesis Data Firehose

Een volledig beheerde service die streamingdata levert naar bestemmingen zoals S3, Redshift, Elasticsearch of HTTP-eindpunten. Het beste voor:

  • Eenvoudige ETL-pijplijnen
  • Log-aggregatie en archivering
  • Near-realtime analyse (minimale latentie van 60 seconden)
  • Scenario’s waarbij geen aangepaste verwerkingslogica nodig is

Kinesis Data Analytics

Verwerkt en analyseert streamingdata met behulp van SQL of Apache Flink. Gebruiksscenario’s omvatten:

  • Realtime dashboards
  • Streaming ETL
  • Realtime anomaliedetectie
  • Continue metriekegeneratie

Architectonische Patronen met Kinesis

1. Event Sourcing Patroon

Event sourcing slaat alle wijzigingen in de toestand van een applicatie op als een reeks gebeurtenissen. Kinesis is perfect hiervoor. Als je een opfrisser over Python-fundamenten nodig hebt, bekijk dan onze Python Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publiceer een gebeurtenis naar de Kinesis-stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Voorbeeld: Gebeurtenis voor gebruikersregistratie
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Scheid lees- en schrijfoperaties door Kinesis te gebruiken als de gebeurtenisbus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-Out Patroon met Lambda

Verwerk gebeurtenissen van een enkele stream met meerdere Lambda-functies. Voor TypeScript-implementaties met sterkere typesafety, verwijzen we naar onze TypeScript Cheatsheet:

// Lambda-consument voor e-mailnotificaties
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Een andere Lambda voor voorraupdate
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Best Practices voor Productie

1. Het Kies van het Juiste Aantal Shards

Bereken uw shard-behoeften op basis van:

  • Ingress: 1 MB/sec of 1.000 records/sec per shard
  • Egress: 2 MB/sec per shard (standaard consumenten) of 2 MB/sec per consument met enhanced fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Bereken het vereiste aantal shards"""
    # Ingress capaciteit
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Voeg buffer toe

2. Implementeer Correcte Foutafhandeling

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Plaats record met exponentiële backoff retry"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentiële backoff
                    continue
            raise

3. Gebruik Enhanced Fan-Out voor Meerdere Consumenten

Enhanced fan-out biedt toegewezen doorvoer voor elke consument:

# Registreer een consument met enhanced fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitoren van Belangrijke Metrieken

Essentiële CloudWatch-metrieken om te volgen:

  • IncomingRecords: Aantal succesvol geplaatste records
  • IncomingBytes: Bytesvolume van inkomende data
  • GetRecords.IteratorAgeMilliseconds: Hoe ver consumenten achterlopen
  • WriteProvisionedThroughputExceeded: Throttling-gebeurtenissen
  • ReadProvisionedThroughputExceeded: Consument throttling

5. Implementeer een Correcte Partition Key Strategie

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Genereer partition key met gelijke verdeling"""
    # Gebruik consistente hashing voor gelijke verdeling
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Voorbeeld van Implementatie in de Praktijk

Hier is een compleet voorbeeld van een microservices-architectuur voor orderverwerking:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Maak een order aan en publiceer gebeurtenissen"""
        order_id = self.generate_order_id()
        
        # Publiceer order-creatie gebeurtenis
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publiceer gebeurtenis naar Kinesis-stream"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consumente order-gebeurtenissen en update voorraad"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Update voorraaddatabase
        for item in order_data['items']:
            # Implementatie hier
            pass

Migratiestrategie van Monolith naar Microservices

Fase 1: Strangler Fig Patroon

Begin met het routeren van specifieke gebeurtenissen via Kinesis terwijl de monolith behouden blijft:

  1. Identificeer bounded contexts in uw monolith
  2. Maak Kinesis-streams aan voor cross-context gebeurtenissen
  3. Haal services geleidelijk uit die consumeren van deze streams
  4. Behoud backward compatibiliteit met de monolith

Fase 2: Parallelle Verwerking

Voer zowel het oude als het nieuwe systeem parallel uit:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Schrijf naar zowel legacy systeem als gebeurtenisstream"""
    try:
        # Eerst schrijven naar nieuwe systeem
        publish_to_kinesis(kinesis_stream, data)
        
        # Vervolgens legacy systeem updaten
        legacy_db.update(data)
    except Exception as e:
        # Implementeer compensatielogica
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fase 3: Volledige Migratie

Zodra vertrouwen is opgebouwd, routeer alle verkeer via de gebeurtenisgedreven architectuur.

Strategieën voor Kostenoptimalisatie

Voor uitgebreide richtlijnen over data-infrastructuurpatronen, inclusief objectopslag en database-architecturen, verwijzen we naar Data Infrastructuur voor AI-systemen: Objectopslag, Databases, Zoeken & AI Data Architectuur.

1. Gebruik On-Demand Mode voor Variabele Werklasten

On-demand mode (ingevoerd in 2023) schaalt automatisch op basis van verkeer:

# Maak stream aan met on-demand mode
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementeer Data-aggregatie

Verhoog PUT-payload eenheden door records te batchen:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregateer records om kosten te verminderen"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Stuur geaggregeerd record
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimaliseer Data-retentie

Standaard retentie is 24 uur. Verleng alleen indien nodig:

# Stel retentie in op 7 dagen
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Veiligheidsbest Practices

1. Versleuteling Rust en in Transit

# Maak versleutelde stream aan
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Schakel versleuteling in
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-beleidsregels voor Minimale Rechten

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC Endpoints

Houd verkeer binnen het AWS-netwerk. Voor het beheren van AWS-infrastructuur als code, overweeg het gebruik van Terraform - zie onze Terraform cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabiliteit en Debugging

Gedistribueerde Tracing met X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights Queries

-- Zoek langzame verwerkingstijden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Track foutniveaus
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Geavanceerde Patronen

Saga Patroon voor Gedistribueerde Transacties

Implementeer langlopende transacties over microservices heen:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Voer saga uit met compensatielogica"""
        try:
            # Stap 1: Reserveer voorraad
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Stap 2: Verwerk betaling
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Stap 3: Verstuur order
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compenseer in omgekeerde volgorde
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Voer compensatietransacties uit"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategieën

Lokale Ontwikkeling met LocalStack

# Start LocalStack met Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Maak teststream aan
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integratietesten

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test gebeurtenispublicatie met gemockte Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Prestatie-tuning

Optimaliseer Batchgrootte

def optimize_batch_processing(records, batch_size=500):
    """Verwerk records in geoptimaliseerde batches"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Gebruik Connection Pooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis Bronnen:

Gerelateerde Artikelen:

Conclusie

AWS Kinesis biedt een robuuste basis voor het bouwen van schaalbare, gebeurtenisgedreven microservices-architecturen. Door deze patronen en best practices te volgen, kunt u systemen creëren die resilient, schaalbaar en onderhoudbaar zijn. Begin klein met een enkele gebeurtenisstream, valideer uw architectuur en breid geleidelijk uit naar complexere patronen naarmate uw systeem groeit.

De sleutel tot succes is het begrijpen van uw dataflow-eisen, het kiezen van de juiste Kinesis-service voor uw gebruiksscenario en het implementeren van correct monitoring en foutafhandeling vanaf het begin.