Aufbau ereignisgesteuerter Microservices mit AWS Kinesis
Ereignisgesteuerte Architektur mit AWS Kinesis für Skalierbarkeit
AWS Kinesis hat sich zu einem Eckpfeiler für die Entwicklung moderner ereignisgestufter Microservices-Architekturen entwickelt und ermöglicht die Echtzeit-Datenverarbeitung in großem Umfang mit minimalem Betriebsaufwand.

Verständnis ereignisgestufter Microservices-Architektur
Die ereignisgesteuerte Architektur (EDA) ist ein Entwurfsmuster, bei dem Dienste durch Ereignisse statt durch direkte synchrone Aufrufe kommunizieren. Dieser Ansatz bietet mehrere Vorteile:
- Lockere Kopplung: Dienste müssen nicht voneinander wissen
- Skalierbarkeit: Jeder Dienst kann unabhängig basierend auf seiner Arbeitslast skaliert werden
- Resilienz: Ausfälle eines Dienstes breiten sich nicht auf andere aus
- Flexibilität: Neue Dienste können hinzugefügt werden, ohne bestehende zu ändern
AWS Kinesis bietet das Rückgrat für die Implementierung von EDA, indem es als verteilter, dauerhafter Ereignisstrom fungiert, der Erzeuger von Konsumenten entkoppelt.
Für eine breitere Perspektive auf Streaming-Plattformen, sehen Sie unseren Apache Kafka Quickstart-Leitfaden zum Vergleich mit selbst gehosteten Alternativen.
Übersicht über AWS Kinesis
AWS bietet mehrere Kinesis-Dienste, die jeweils für spezifische Anwendungsfälle konzipiert sind. Bei der Bewertung von Streaming-Lösungen sollten Sie auch RabbitMQ auf EKS im Vergleich zu SQS für verschiedene Messaging-Muster und Kostenausswirkungen in Betracht ziehen.
Kinesis Data Streams
Der zentrale Streaming-Dienst, der Datensätze in Echtzeit erfasst, speichert und verarbeitet. Data Streams ist ideal für:
- Benutzerdefinierte Echtzeit-Verarbeitungsanwendungen
- Aufbau von Datenpipelines mit Latenzzeiten unter einer Sekunde
- Verarbeitung von Millionen von Ereignissen pro Sekunde
- Implementierung von Event-Sourcing-Mustern
Kinesis Data Firehose
Ein vollständig verwalteter Dienst, der Streaming-Daten an Ziele wie S3, Redshift, Elasticsearch oder HTTP-Endpunkte liefert. Am besten geeignet für:
- Einfache ETL-Pipelines
- Log-Aggregation und Archivierung
- Nahezu Echtzeit-Analysen (mindestens 60 Sekunden Latenz)
- Szenarien, in denen keine benutzerdefinierte Verarbeitungslogik benötigt wird
Kinesis Data Analytics
Verarbeitet und analysiert Streaming-Daten mit SQL oder Apache Flink. Anwendungsfälle umfassen:
- Echtzeit-Dashboards
- Streaming-ETL
- Echtzeit-Anomalieerkennung
- Kontinuierliche Metrikerstellung
Architekturmuster mit Kinesis
1. Event-Sourcing-Muster
Event Sourcing speichert alle Änderungen des Anwendungszustands als eine Folge von Ereignissen. Kinesis ist dafür perfekt. Falls Sie einen Auffrischungskurs zu Python-Grundlagen benötigen, schauen Sie sich unser Python-Cheat-Sheet an:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Ein Ereignis an den Kinesis-Stream senden"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Beispiel: Benutzerregistrierungsereignis
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Trennen Sie Lese- und Schreiboperationen unter Verwendung von Kinesis als Ereignisbus:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out-Muster mit Lambda
Verarbeiten Sie Ereignisse aus einem einzelnen Stream mit mehreren Lambda-Funktionen. Für TypeScript-Implementierungen mit stärkerer Typsicherheit, beziehen Sie sich auf unser TypeScript-Cheat-Sheet:
// Lambda-Konsument für E-Mail-Benachrichtigungen
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Eine weitere Lambda für Bestandsaktualisierungen
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Best Practices für die Produktion
1. Die richtige Shard-Anzahl wählen
Berechnen Sie Ihre Shard-Anforderungen basierend auf:
- Ingress: 1 MB/Sekunde oder 1.000 Datensätze/Sekunde pro Shard
- Egress: 2 MB/Sekunde pro Shard (Standardkonsumenten) oder 2 MB/Sekunde pro Konsument mit erweitertem Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Erforderliche Anzahl an Shards berechnen"""
# Ingress-Kapazität
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Puffer hinzufügen
2. Implementieren Sie eine korrekte Fehlerbehandlung
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Datensatz mit exponentieller Backoff-Wiederholung senden"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponentieller Backoff
continue
raise
3. Verwenden Sie erweiterten Fan-Out für mehrere Konsumenten
Erweiterter Fan-Out bietet eine dedizierte Durchsatzrate für jeden Konsumenten:
# Konsumenten mit erweitertem Fan-Out registrieren
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Überwachen Sie wichtige Metriken
Wesentliche CloudWatch-Metriken zur Verfolgung:
IncomingRecords: Anzahl erfolgreich gesendeter DatensätzeIncomingBytes: Datenvolumen der eingehenden Daten in BytesGetRecords.IteratorAgeMilliseconds: Wie weit Konsumenten im Rückstand sindWriteProvisionedThroughputExceeded: Drosselungsereignisse beim SchreibenReadProvisionedThroughputExceeded: Drosselung der Konsumenten
5. Implementieren Sie eine korrekte Strategie für Partitionsschlüssel
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generieren Sie einen Partitionsschlüssel mit gleichmäßiger Verteilung"""
# Verwenden Sie konsistentes Hashing für eine gleichmäßige Verteilung
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Beispiel für eine Implementierung in der Praxis
Hier ist ein vollständiges Beispiel für eine Microservices-Architektur zur Bestellabwicklung:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Bestellung erstellen und Ereignisse veröffentlichen"""
order_id = self.generate_order_id()
# "Bestellung erstellt"-Ereignis veröffentlichen
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Ereignis an den Kinesis-Stream senden"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Verbraucht Bestellereignisse und aktualisiert den Bestand"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Bestandsdatenbank aktualisieren
for item in order_data['items']:
# Implementierung hier
pass
Migrationsstrategie vom Monolith zu Microservices
Phase 1: Strangler-Fig-Muster
Beginnen Sie, spezifische Ereignisse über Kinesis weiterzuleiten, während der Monolith erhalten bleibt:
- Identifizieren Sie begrenzte Kontexte in Ihrem Monolithen
- Erstellen Sie Kinesis-Streams für ereignisse, die Kontexte überschreiten
- Extrahieren Sie schrittweise Dienste, die aus diesen Streams konsumieren
- Wahren Sie die Abwärtskompatibilität mit dem Monolithen
Phase 2: Parallele Verarbeitung
Führen Sie sowohl das alte als auch das neue System parallel:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Schreiben Sie sowohl in das Legacy-System als auch in den Ereignisstrom"""
try:
# Zuerst in das neue System schreiben
publish_to_kinesis(kinesis_stream, data)
# Dann das Legacy-System aktualisieren
legacy_db.update(data)
except Exception as e:
# Kompensationslogik implementieren
rollback_kinesis_event(kinesis_stream, data)
raise
Phase 3: Vollständige Migration
Sobald das Vertrauen etabliert ist, leiten Sie den gesamten Verkehr über die ereignisgesteuerte Architektur um.
Strategien zur Kostenoptimierung
Für umfassende Leitlinien zu Mustern der Dateninfrastruktur, einschließlich Objektspeicher- und Datenbankarchitekturen, siehe Dateninfrastruktur für KI-Systeme: Objektspeicher, Datenbanken, Suche & KI-Datenarchitektur.
1. Verwenden Sie den On-Demand-Modus für variable Arbeitslasten
Der On-Demand-Modus (eingeführt 2023) skaliert automatisch basierend auf dem Verkehr:
# Stream mit On-Demand-Modus erstellen
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementieren Sie Datenaggregation
Reduzieren Sie PUT-Payload-Einheiten durch das Bündeln von Datensätzen:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Aggregieren von Datensätzen zur Kostenreduzierung"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Aggregierten Datensatz senden
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimieren Sie die Datenhaltung
Die Standard-Haltedauer beträgt 24 Stunden. Verlängern Sie diese nur bei Bedarf:
# Haltedauer auf 7 Tage setzen
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Sicherheitsbest Practices
1. Verschlüsselung bei Ruhe und während der Übertragung
# Verschlüsselten Stream erstellen
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Verschlüsselung aktivieren
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM-Richtlinien für Least Privilege
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC-Endpunkte
Halten Sie den Verkehr innerhalb des AWS-Netzwerks. Für die Verwaltung der AWS-Infrastruktur als Code sollten Sie Terraform in Betracht ziehen – siehe unser Terraform-Cheat-Sheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observability und Debugging
Verteiltes Tracing mit X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights-Abfragen
-- Langsame Verarbeitungszeiten finden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Fehlerraten verfolgen
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Fortgeschrittene Muster
Saga-Muster für verteilte Transaktionen
Implementieren Sie langlaufende Transaktionen über Microservices hinweg:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Saga mit Kompensationslogik ausführen"""
try:
# Schritt 1: Bestand reservieren
self.publish_command('RESERVE_INVENTORY', order_data)
# Schritt 2: Zahlung verarbeiten
self.publish_command('PROCESS_PAYMENT', order_data)
# Schritt 3: Bestellung versenden
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# In umgekehrter Reihenfolge kompensieren
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Kompensationstransaktionen ausführen"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Teststrategien
Lokale Entwicklung mit LocalStack
# LocalStack mit Kinesis starten
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Teststream erstellen
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integrationstests
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Testen der Ereignisveröffentlichung mit gemocktem Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Leistungsoptimierung
Optimierung der Batch-Größe
def optimize_batch_processing(records, batch_size=500):
"""Datensätze in optimierten Batches verarbeiten"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Verwendung von Connection Pooling
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Nützliche Links
AWS Kinesis-Ressourcen:
- AWS Kinesis-Dokumentation
- AWS Kinesis Data Streams-Entwicklerleitfaden
- Kinesis Client Library (KCL)
- AWS Kinesis Preisrechner
- Kinesis Data Streams-Quoten und Limits
- AWS Architekturblog – ereignisgesteuerte Architekturen
- AWS Samples – Kinesis-Beispiele
Verwandte Artikel:
- Vergleich der Hosting-Kosten von RabbitMQ auf EKS und SQS
- TypeScript-Cheat-Sheet: Kernkonzepte und bewährte Methoden
- Python-Cheat-Sheet
Fazit
AWS Kinesis bietet eine robuste Grundlage für den Aufbau skalierbarer, ereignisgesteuerter Microservices-Architekturen. Durch die Befolgung dieser Muster und bewährten Methoden können Sie Systeme erstellen, die resilient, skalierbar und wartbar sind. Beginnen Sie klein mit einem einzelnen Ereignisstrom, validieren Sie Ihre Architektur und erweitern Sie schrittweise auf komplexere Muster, während Ihr System wächst.
Der Schlüssel zum Erfolg liegt im Verständnis Ihrer Datenflussanforderungen, der Auswahl des richtigen Kinesis-Dienstes für Ihren Anwendungsfall und der Implementierung einer ordnungsgemäßen Überwachung und Fehlerbehandlung von Anfang an.