Het bouwen van gebeurtenisgedreven microservices met AWS Kinesis
Event-gedreven architectuur met AWS Kinesis voor schaalbaarheid
AWS Kinesis is een hoeksteen geworden voor het bouwen van moderne, gebeurtenisgedreven microservices-architecturen, waarbij het mogelijk maakt om data in realtime op schaal te verwerken met minimale operationele overhead.

Begrip van Gebeurtenisgedreven Microservices-architectuur
Gebeurtenisgedreven architectuur (EDA) is een ontwerppatroon waarbij services communiceren via gebeurtenissen in plaats van directe synchrone oproepen. Deze aanbod biedt verschillende voordelen:
- Losse koppeling: Services hoeven niet bekend te zijn met het bestaan van elkaar
- Schaalbaarheid: Elke service schaalt onafhankelijk op basis van de eigen werklast
- Resilience: Mislukkingen in één service verspreiden zich niet naar andere services
- Flexibiliteit: Nieuwe services kunnen worden toegevoegd zonder bestaande services aan te passen
AWS Kinesis vormt het fundament voor het implementeren van EDA door te fungeren als een gedistribueerde, duurzame gebeurtenisstream die producenten van consumenten decoupleert.
Voor een breder perspectief op streaming-platforms, raadpleeg onze Apache Kafka Quickstart gids voor een vergelijking met zelf-gehoste alternatieven.
Overzicht van AWS Kinesis
AWS biedt verschillende Kinesis-services, elk ontworpen voor specifieke gebruiksscenario’s. Bij het evalueren van streaming-oplossingen wilt u mogelijk ook overwegen om RabbitMQ op EKS te vergelijken met SQS voor verschillende messaging-patronen en kostenimplicaties.
Kinesis Data Streams
De kernstreamingservice die datarecords in realtime vastlegt, opslaat en verwerkt. Data Streams is ideaal voor:
- Aangepaste realtime verwerkingsapplicaties
- Het bouwen van datapijplijnen met een latentie van minder dan een seconde
- Het verwerken van miljoenen gebeurtenissen per seconde
- Het implementeren van event sourcing-patronen
Kinesis Data Firehose
Een volledig beheerde service die streamingdata levert naar bestemmingen zoals S3, Redshift, Elasticsearch of HTTP-eindpunten. Het beste voor:
- Eenvoudige ETL-pijplijnen
- Log-aggregatie en archivering
- Near-realtime analyse (minimale latentie van 60 seconden)
- Scenario’s waarbij geen aangepaste verwerkingslogica nodig is
Kinesis Data Analytics
Verwerkt en analyseert streamingdata met behulp van SQL of Apache Flink. Gebruiksscenario’s omvatten:
- Realtime dashboards
- Streaming ETL
- Realtime anomaliedetectie
- Continue metriekegeneratie
Architectonische Patronen met Kinesis
1. Event Sourcing Patroon
Event sourcing slaat alle wijzigingen in de toestand van een applicatie op als een reeks gebeurtenissen. Kinesis is perfect hiervoor. Als je een opfrisser over Python-fundamenten nodig hebt, bekijk dan onze Python Cheatsheet:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publiceer een gebeurtenis naar de Kinesis-stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Voorbeeld: Gebeurtenis voor gebruikersregistratie
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Scheid lees- en schrijfoperaties door Kinesis te gebruiken als de gebeurtenisbus:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out Patroon met Lambda
Verwerk gebeurtenissen van een enkele stream met meerdere Lambda-functies. Voor TypeScript-implementaties met sterkere typesafety, verwijzen we naar onze TypeScript Cheatsheet:
// Lambda-consument voor e-mailnotificaties
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Een andere Lambda voor voorraupdate
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Best Practices voor Productie
1. Het Kies van het Juiste Aantal Shards
Bereken uw shard-behoeften op basis van:
- Ingress: 1 MB/sec of 1.000 records/sec per shard
- Egress: 2 MB/sec per shard (standaard consumenten) of 2 MB/sec per consument met enhanced fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Bereken het vereiste aantal shards"""
# Ingress capaciteit
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Voeg buffer toe
2. Implementeer Correcte Foutafhandeling
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Plaats record met exponentiële backoff retry"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponentiële backoff
continue
raise
3. Gebruik Enhanced Fan-Out voor Meerdere Consumenten
Enhanced fan-out biedt toegewezen doorvoer voor elke consument:
# Registreer een consument met enhanced fan-out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitoren van Belangrijke Metrieken
Essentiële CloudWatch-metrieken om te volgen:
IncomingRecords: Aantal succesvol geplaatste recordsIncomingBytes: Bytesvolume van inkomende dataGetRecords.IteratorAgeMilliseconds: Hoe ver consumenten achterlopenWriteProvisionedThroughputExceeded: Throttling-gebeurtenissenReadProvisionedThroughputExceeded: Consument throttling
5. Implementeer een Correcte Partition Key Strategie
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Genereer partition key met gelijke verdeling"""
# Gebruik consistente hashing voor gelijke verdeling
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Voorbeeld van Implementatie in de Praktijk
Hier is een compleet voorbeeld van een microservices-architectuur voor orderverwerking:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Maak een order aan en publiceer gebeurtenissen"""
order_id = self.generate_order_id()
# Publiceer order-creatie gebeurtenis
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publiceer gebeurtenis naar Kinesis-stream"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Consumente order-gebeurtenissen en update voorraad"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Update voorraaddatabase
for item in order_data['items']:
# Implementatie hier
pass
Migratiestrategie van Monolith naar Microservices
Fase 1: Strangler Fig Patroon
Begin met het routeren van specifieke gebeurtenissen via Kinesis terwijl de monolith behouden blijft:
- Identificeer bounded contexts in uw monolith
- Maak Kinesis-streams aan voor cross-context gebeurtenissen
- Haal services geleidelijk uit die consumeren van deze streams
- Behoud backward compatibiliteit met de monolith
Fase 2: Parallelle Verwerking
Voer zowel het oude als het nieuwe systeem parallel uit:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Schrijf naar zowel legacy systeem als gebeurtenisstream"""
try:
# Eerst schrijven naar nieuwe systeem
publish_to_kinesis(kinesis_stream, data)
# Vervolgens legacy systeem updaten
legacy_db.update(data)
except Exception as e:
# Implementeer compensatielogica
rollback_kinesis_event(kinesis_stream, data)
raise
Fase 3: Volledige Migratie
Zodra vertrouwen is opgebouwd, routeer alle verkeer via de gebeurtenisgedreven architectuur.
Strategieën voor Kostenoptimalisatie
Voor uitgebreide richtlijnen over data-infrastructuurpatronen, inclusief objectopslag en database-architecturen, verwijzen we naar Data Infrastructuur voor AI-systemen: Objectopslag, Databases, Zoeken & AI Data Architectuur.
1. Gebruik On-Demand Mode voor Variabele Werklasten
On-demand mode (ingevoerd in 2023) schaalt automatisch op basis van verkeer:
# Maak stream aan met on-demand mode
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementeer Data-aggregatie
Verhoog PUT-payload eenheden door records te batchen:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Aggregateer records om kosten te verminderen"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Stuur geaggregeerd record
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimaliseer Data-retentie
Standaard retentie is 24 uur. Verleng alleen indien nodig:
# Stel retentie in op 7 dagen
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Veiligheidsbest Practices
1. Versleuteling Rust en in Transit
# Maak versleutelde stream aan
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Schakel versleuteling in
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM-beleidsregels voor Minimale Rechten
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC Endpoints
Houd verkeer binnen het AWS-netwerk. Voor het beheren van AWS-infrastructuur als code, overweeg het gebruik van Terraform - zie onze Terraform cheatsheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabiliteit en Debugging
Gedistribueerde Tracing met X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights Queries
-- Zoek langzame verwerkingstijden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Track foutniveaus
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Geavanceerde Patronen
Saga Patroon voor Gedistribueerde Transacties
Implementeer langlopende transacties over microservices heen:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Voer saga uit met compensatielogica"""
try:
# Stap 1: Reserveer voorraad
self.publish_command('RESERVE_INVENTORY', order_data)
# Stap 2: Verwerk betaling
self.publish_command('PROCESS_PAYMENT', order_data)
# Stap 3: Verstuur order
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Compenseer in omgekeerde volgorde
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Voer compensatietransacties uit"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Teststrategieën
Lokale Ontwikkeling met LocalStack
# Start LocalStack met Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Maak teststream aan
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integratietesten
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Test gebeurtenispublicatie met gemockte Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Prestatie-tuning
Optimaliseer Batchgrootte
def optimize_batch_processing(records, batch_size=500):
"""Verwerk records in geoptimaliseerde batches"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Gebruik Connection Pooling
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Nuttige Links
AWS Kinesis Bronnen:
- AWS Kinesis Documentatie
- AWS Kinesis Data Streams Developer Gids
- Kinesis Client Library (KCL)
- AWS Kinesis Prijsrekenmachine
- Kinesis Data Streams Quota’s en Limieten
- AWS Architectuur Blog - Gebeurtenisgedreven Architecturen
- AWS Samples - Kinesis Voorbeelden
Gerelateerde Artikelen:
- Rabbitmq op Eks vs Sqs hosting kosten vergelijking
- TypeScript Cheatsheet: Kernconcepten & Best Practices
- Python Cheatsheet
Conclusie
AWS Kinesis biedt een robuuste basis voor het bouwen van schaalbare, gebeurtenisgedreven microservices-architecturen. Door deze patronen en best practices te volgen, kunt u systemen creëren die resilient, schaalbaar en onderhoudbaar zijn. Begin klein met een enkele gebeurtenisstream, valideer uw architectuur en breid geleidelijk uit naar complexere patronen naarmate uw systeem groeit.
De sleutel tot succes is het begrijpen van uw dataflow-eisen, het kiezen van de juiste Kinesis-service voor uw gebruiksscenario en het implementeren van correct monitoring en foutafhandeling vanaf het begin.