Aufbau ereignisgesteuerter Microservices mit AWS Kinesis

Ereignisgesteuerte Architektur mit AWS Kinesis für Skalierbarkeit

Inhaltsverzeichnis

AWS Kinesis hat sich zu einem Eckpfeiler für die Entwicklung moderner ereignisgestufter Microservices-Architekturen entwickelt und ermöglicht die Echtzeit-Datenverarbeitung in großem Umfang mit minimalem Betriebsaufwand.

amazon-kinesis

Verständnis ereignisgestufter Microservices-Architektur

Die ereignisgesteuerte Architektur (EDA) ist ein Entwurfsmuster, bei dem Dienste durch Ereignisse statt durch direkte synchrone Aufrufe kommunizieren. Dieser Ansatz bietet mehrere Vorteile:

  • Lockere Kopplung: Dienste müssen nicht voneinander wissen
  • Skalierbarkeit: Jeder Dienst kann unabhängig basierend auf seiner Arbeitslast skaliert werden
  • Resilienz: Ausfälle eines Dienstes breiten sich nicht auf andere aus
  • Flexibilität: Neue Dienste können hinzugefügt werden, ohne bestehende zu ändern

AWS Kinesis bietet das Rückgrat für die Implementierung von EDA, indem es als verteilter, dauerhafter Ereignisstrom fungiert, der Erzeuger von Konsumenten entkoppelt.

Für eine breitere Perspektive auf Streaming-Plattformen, sehen Sie unseren Apache Kafka Quickstart-Leitfaden zum Vergleich mit selbst gehosteten Alternativen.

Übersicht über AWS Kinesis

AWS bietet mehrere Kinesis-Dienste, die jeweils für spezifische Anwendungsfälle konzipiert sind. Bei der Bewertung von Streaming-Lösungen sollten Sie auch RabbitMQ auf EKS im Vergleich zu SQS für verschiedene Messaging-Muster und Kostenausswirkungen in Betracht ziehen.

Kinesis Data Streams

Der zentrale Streaming-Dienst, der Datensätze in Echtzeit erfasst, speichert und verarbeitet. Data Streams ist ideal für:

  • Benutzerdefinierte Echtzeit-Verarbeitungsanwendungen
  • Aufbau von Datenpipelines mit Latenzzeiten unter einer Sekunde
  • Verarbeitung von Millionen von Ereignissen pro Sekunde
  • Implementierung von Event-Sourcing-Mustern

Kinesis Data Firehose

Ein vollständig verwalteter Dienst, der Streaming-Daten an Ziele wie S3, Redshift, Elasticsearch oder HTTP-Endpunkte liefert. Am besten geeignet für:

  • Einfache ETL-Pipelines
  • Log-Aggregation und Archivierung
  • Nahezu Echtzeit-Analysen (mindestens 60 Sekunden Latenz)
  • Szenarien, in denen keine benutzerdefinierte Verarbeitungslogik benötigt wird

Kinesis Data Analytics

Verarbeitet und analysiert Streaming-Daten mit SQL oder Apache Flink. Anwendungsfälle umfassen:

  • Echtzeit-Dashboards
  • Streaming-ETL
  • Echtzeit-Anomalieerkennung
  • Kontinuierliche Metrikerstellung

Architekturmuster mit Kinesis

1. Event-Sourcing-Muster

Event Sourcing speichert alle Änderungen des Anwendungszustands als eine Folge von Ereignissen. Kinesis ist dafür perfekt. Falls Sie einen Auffrischungskurs zu Python-Grundlagen benötigen, schauen Sie sich unser Python-Cheat-Sheet an:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Ein Ereignis an den Kinesis-Stream senden"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Beispiel: Benutzerregistrierungsereignis
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Trennen Sie Lese- und Schreiboperationen unter Verwendung von Kinesis als Ereignisbus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-Out-Muster mit Lambda

Verarbeiten Sie Ereignisse aus einem einzelnen Stream mit mehreren Lambda-Funktionen. Für TypeScript-Implementierungen mit stärkerer Typsicherheit, beziehen Sie sich auf unser TypeScript-Cheat-Sheet:

// Lambda-Konsument für E-Mail-Benachrichtigungen
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Eine weitere Lambda für Bestandsaktualisierungen
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Best Practices für die Produktion

1. Die richtige Shard-Anzahl wählen

Berechnen Sie Ihre Shard-Anforderungen basierend auf:

  • Ingress: 1 MB/Sekunde oder 1.000 Datensätze/Sekunde pro Shard
  • Egress: 2 MB/Sekunde pro Shard (Standardkonsumenten) oder 2 MB/Sekunde pro Konsument mit erweitertem Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Erforderliche Anzahl an Shards berechnen"""
    # Ingress-Kapazität
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Puffer hinzufügen

2. Implementieren Sie eine korrekte Fehlerbehandlung

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Datensatz mit exponentieller Backoff-Wiederholung senden"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentieller Backoff
                    continue
            raise

3. Verwenden Sie erweiterten Fan-Out für mehrere Konsumenten

Erweiterter Fan-Out bietet eine dedizierte Durchsatzrate für jeden Konsumenten:

# Konsumenten mit erweitertem Fan-Out registrieren
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Überwachen Sie wichtige Metriken

Wesentliche CloudWatch-Metriken zur Verfolgung:

  • IncomingRecords: Anzahl erfolgreich gesendeter Datensätze
  • IncomingBytes: Datenvolumen der eingehenden Daten in Bytes
  • GetRecords.IteratorAgeMilliseconds: Wie weit Konsumenten im Rückstand sind
  • WriteProvisionedThroughputExceeded: Drosselungsereignisse beim Schreiben
  • ReadProvisionedThroughputExceeded: Drosselung der Konsumenten

5. Implementieren Sie eine korrekte Strategie für Partitionsschlüssel

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generieren Sie einen Partitionsschlüssel mit gleichmäßiger Verteilung"""
    # Verwenden Sie konsistentes Hashing für eine gleichmäßige Verteilung
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Beispiel für eine Implementierung in der Praxis

Hier ist ein vollständiges Beispiel für eine Microservices-Architektur zur Bestellabwicklung:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Bestellung erstellen und Ereignisse veröffentlichen"""
        order_id = self.generate_order_id()
        
        # "Bestellung erstellt"-Ereignis veröffentlichen
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Ereignis an den Kinesis-Stream senden"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Verbraucht Bestellereignisse und aktualisiert den Bestand"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Bestandsdatenbank aktualisieren
        for item in order_data['items']:
            # Implementierung hier
            pass

Migrationsstrategie vom Monolith zu Microservices

Phase 1: Strangler-Fig-Muster

Beginnen Sie, spezifische Ereignisse über Kinesis weiterzuleiten, während der Monolith erhalten bleibt:

  1. Identifizieren Sie begrenzte Kontexte in Ihrem Monolithen
  2. Erstellen Sie Kinesis-Streams für ereignisse, die Kontexte überschreiten
  3. Extrahieren Sie schrittweise Dienste, die aus diesen Streams konsumieren
  4. Wahren Sie die Abwärtskompatibilität mit dem Monolithen

Phase 2: Parallele Verarbeitung

Führen Sie sowohl das alte als auch das neue System parallel:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Schreiben Sie sowohl in das Legacy-System als auch in den Ereignisstrom"""
    try:
        # Zuerst in das neue System schreiben
        publish_to_kinesis(kinesis_stream, data)
        
        # Dann das Legacy-System aktualisieren
        legacy_db.update(data)
    except Exception as e:
        # Kompensationslogik implementieren
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3: Vollständige Migration

Sobald das Vertrauen etabliert ist, leiten Sie den gesamten Verkehr über die ereignisgesteuerte Architektur um.

Strategien zur Kostenoptimierung

Für umfassende Leitlinien zu Mustern der Dateninfrastruktur, einschließlich Objektspeicher- und Datenbankarchitekturen, siehe Dateninfrastruktur für KI-Systeme: Objektspeicher, Datenbanken, Suche & KI-Datenarchitektur.

1. Verwenden Sie den On-Demand-Modus für variable Arbeitslasten

Der On-Demand-Modus (eingeführt 2023) skaliert automatisch basierend auf dem Verkehr:

# Stream mit On-Demand-Modus erstellen
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementieren Sie Datenaggregation

Reduzieren Sie PUT-Payload-Einheiten durch das Bündeln von Datensätzen:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregieren von Datensätzen zur Kostenreduzierung"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Aggregierten Datensatz senden
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimieren Sie die Datenhaltung

Die Standard-Haltedauer beträgt 24 Stunden. Verlängern Sie diese nur bei Bedarf:

# Haltedauer auf 7 Tage setzen
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Sicherheitsbest Practices

1. Verschlüsselung bei Ruhe und während der Übertragung

# Verschlüsselten Stream erstellen
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Verschlüsselung aktivieren
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-Richtlinien für Least Privilege

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC-Endpunkte

Halten Sie den Verkehr innerhalb des AWS-Netzwerks. Für die Verwaltung der AWS-Infrastruktur als Code sollten Sie Terraform in Betracht ziehen – siehe unser Terraform-Cheat-Sheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observability und Debugging

Verteiltes Tracing mit X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights-Abfragen

-- Langsame Verarbeitungszeiten finden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Fehlerraten verfolgen
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Fortgeschrittene Muster

Saga-Muster für verteilte Transaktionen

Implementieren Sie langlaufende Transaktionen über Microservices hinweg:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Saga mit Kompensationslogik ausführen"""
        try:
            # Schritt 1: Bestand reservieren
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Schritt 2: Zahlung verarbeiten
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Schritt 3: Bestellung versenden
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # In umgekehrter Reihenfolge kompensieren
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Kompensationstransaktionen ausführen"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategien

Lokale Entwicklung mit LocalStack

# LocalStack mit Kinesis starten
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Teststream erstellen
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integrationstests

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testen der Ereignisveröffentlichung mit gemocktem Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Leistungsoptimierung

Optimierung der Batch-Größe

def optimize_batch_processing(records, batch_size=500):
    """Datensätze in optimierten Batches verarbeiten"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Verwendung von Connection Pooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis-Ressourcen:

Verwandte Artikel:

Fazit

AWS Kinesis bietet eine robuste Grundlage für den Aufbau skalierbarer, ereignisgesteuerter Microservices-Architekturen. Durch die Befolgung dieser Muster und bewährten Methoden können Sie Systeme erstellen, die resilient, skalierbar und wartbar sind. Beginnen Sie klein mit einem einzelnen Ereignisstrom, validieren Sie Ihre Architektur und erweitern Sie schrittweise auf komplexere Muster, während Ihr System wächst.

Der Schlüssel zum Erfolg liegt im Verständnis Ihrer Datenflussanforderungen, der Auswahl des richtigen Kinesis-Dienstes für Ihren Anwendungsfall und der Implementierung einer ordnungsgemäßen Überwachung und Fehlerbehandlung von Anfang an.