Att bygga händelsestyra mikrotjänster med AWS Kinesis
Händelsestyrd arkitektur med AWS Kinesis för skalbarhet
AWS Kinesis har blivit en hörnsten för att bygga moderna händelsestyrda arkitekturer för mikrotjänster, vilket möjliggör realtidsbearbetning av data i stor skala med minimal driftsöverhåll.

Förståelse för händelsestyrda arkitekturer för mikrotjänster
Händelsestyrd arkitektur (EDA) är ett designmönster där tjänster kommunicerar genom händelser snarare än direkta synkrona anrop. Detta tillvägagångssätt erbjuder flera fördelar:
- Löst koppling: Tjänster behöver inte veta om varandras existens
- Skalbarhet: Varje tjänst kan skalas oberoende baserat på sin arbetsbelastning
- Resiliens: Fel i en tjänst sprider sig inte till andra
- Flexibilitet: Nya tjänster kan läggas till utan att modifiera befintliga
AWS Kinesis fungerar som ryggraden för att implementera EDA genom att agera som en distribuerad, hållbar händelseström som kopplar loss producenter från konsumenter.
För en bredare perspektiv på strömmande plattformar, se vår Apache Kafka Quickstart-guide för en jämförelse med självhysta alternativ.
Översikt över AWS Kinesis
AWS erbjuder flera Kinesis-tjänster, var och en designad för specifika användningsområden. När du utvärderar lösningar för strömmande data kan du också vilja överväga att jämföra RabbitMQ på EKS med SQS för olika meddelandemönster och kostnadsimplikationer.
Kinesis Data Streams
Huvudtjänsten för strömmande data som fångar, lagrar och bearbetar datarekor i realtid. Data Streams är idealisk för:
- Anpassade applikationer för realtidsbearbetning
- Att bygga datapipelines med latens under en sekund
- Att bearbeta miljontals händelser per sekund
- Att implementera händelseliknande mönster (event sourcing)
Kinesis Data Firehose
En helt hanterad tjänst som levererar strömmande data till destinationer som S3, Redshift, Elasticsearch eller HTTP-endpunkter. Bäst för:
- Enkla ETL-pipelines
- Samling och arkivering av loggar
- Nästan realtidsanalys (minsta latens på 60 sekunder)
- Scenarier där du inte behöver anpassad bearbetningslogik
Kinesis Data Analytics
Bearbetar och analyserar strömmande data med SQL eller Apache Flink. Användningsområden inkluderar:
- Realtidsdashboards
- Strömmande ETL
- Realtidsdetektering av avvikelser
- Kontinuerlig generering av metrik
Arkitektoniska mönster med Kinesis
1. Mönstret för händelseliknande (Event Sourcing)
Händelseliknande lagrar alla förändringar i applikationstillståndet som en sekvens av händelser. Kinesis är perfekt för detta. Om du behöver en repetition av Python-grunder, titta på vår Python-minneslista:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publicera en händelse till Kinesis-ström"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Exempel: Händelse för användarregistrering
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Separera läs- och skrivoperationer genom att använda Kinesis som händelsebuss:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out-mönster med Lambda
Bearbeta händelser från en enda ström med flera Lambda-funktioner. För TypeScript-implementeringar med starkare typsäkerhet, se vår TypeScript-minneslista:
// Lambda-konsument för e-postaviseringar
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// En annan Lambda för lageruppdateringar
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Bästa praxis för produktion
1. Välja rätt antal shardar
Beräkna dina shardar-krav baserat på:
- Ingress: 1 MB/s eller 1 000 rekord/s per shard
- Egress: 2 MB/s per shard (standardkonsumenter) eller 2 MB/s per konsument med förbättrad fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Beräkna erforderligt antal shardar"""
# Ingress-kapacitet
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Lägg till buffert
2. Implementera korrekt felhantering
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Sätt rekord med exponentiell backoff-väntetid vid försök"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponentiell backoff
continue
raise
3. Använd förbättrad fan-out för flera konsumenter
Förbättrad fan-out ger dedikerad genomströmning för varje konsument:
# Registrera en konsument med förbättrad fan-out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Övervaka viktiga metrik
Väsentliga CloudWatch-metrik att spåra:
IncomingRecords: Antal rekord som har lagts till korrektIncomingBytes: Byte-volym av inkommande dataGetRecords.IteratorAgeMilliseconds: Hur långt efter konsumenterna ärWriteProvisionedThroughputExceeded: Throttling-händelserReadProvisionedThroughputExceeded: Konsument-throttling
5. Implementera rätt strategi för partitionsnyckel
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generera partitionsnyckel med jämn fördelning"""
# Använd konsistent hashning för jämn fördelning
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Exempel på implementering i verkliga scenarier
Här är ett komplett exempel på en mikrotjänstarkitektur för orderhantering:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Skapa order och publicera händelser"""
order_id = self.generate_order_id()
# Publicera händelse för skapad order
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publicera händelse till Kinesis-ström"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Konsumerar orderhändelser och uppdaterar lager"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Uppdatera lagerdatabas
for item in order_data['items']:
# Implementering här
pass
Migrationsstrategi från monolit till mikrotjänster
Fas 1: Strangler Fig-mönster
Börja med att dirigera specifika händelser genom Kinesis medan du behåller monoliten:
- Identifiera begränsade kontexter i din monolit
- Skapa Kinesis-strömmar för händelser som korsar kontexter
- Gradvis extrahera tjänster som konsumerar från dessa strömmar
- Upprätthåll bakåtkompatibilitet med monoliten
Fas 2: Parallell bearbetning
Kör både gamla och nya system parallellt:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Skriv till både legacy-system och händelseström"""
try:
# Skriv till nya systemet först
publish_to_kinesis(kinesis_stream, data)
# Uppdatera sedan legacy-systemet
legacy_db.update(data)
except Exception as e:
# Implementera kompenserande logik
rollback_kinesis_event(kinesis_stream, data)
raise
Fas 3: Fullständig migration
När tillit har etablerats, dirigera all trafik genom den händelsestyrda arkitekturen.
Strategier för kostnoptimering
För omfattande vägledning om datapatrön för infrastruktur, inklusive objektlagring och databasarkitekturer, se Datainfrastruktur för AI-system: Objektlagring, databaser, sökning och AI-dataarkitektur.
1. Använd On-Demand-läge för varierande arbetsbelastningar
On-Demand-läget (infört 2023) skalar automatiskt baserat på trafik:
# Skapa ström med On-Demand-läge
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementera dataaggregation
Minska PUT-betalningsenheter genom att samla rekord:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Aggregera rekord för att minska kostnader"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Skicka aggregerat rekord
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimera databevarande
Standardbevarande är 24 timmar. Förläng det bara om det är nödvändigt:
# Sätt bevarande till 7 dagar
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Säkerhetsbästa praxis
1. Kryptering i viloläge och under överföring
# Skapa krypterad ström
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Aktivera kryptering
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM-policyer för minsta privilegium
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC-endpunkter
Håll trafiken inom AWS-nätverket. För att hantera AWS-infrastruktur som kod, överväg att använda Terraform - se vår Terraform-minneslista:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabilitet och felsökning
Distribuerad spårning med X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights-frågor
-- Hitta långsam bearbetningstid
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Spåra felfrekvenser
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Avancerade mönster
Saga-mönster för distribuerade transaktioner
Implementera långvariga transaktioner över mikrotjänster:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Kör saga med kompenserande logik"""
try:
# Steg 1: Reservera lager
self.publish_command('RESERVE_INVENTORY', order_data)
# Steg 2: Bearbeta betalning
self.publish_command('PROCESS_PAYMENT', order_data)
# Steg 3: Skicka order
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensera i omvänd ordning
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Kör kompenserande transaktioner"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Teststrategier
Lokal utveckling med LocalStack
# Starta LocalStack med Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Skapa testström
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integrationstestning
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Testa händelsepublicering med mockad Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Prestandajusterning
Optimera batchstorlek
def optimize_batch_processing(records, batch_size=500):
"""Bearbeta rekord i optimerade batchar"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Använd anslutningspooling
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Användbara länkar
AWS Kinesis-resurser:
- AWS Kinesis-dokumentation
- AWS Kinesis Data Streams-utvecklarguide
- Kinesis Client Library (KCL)
- AWS Kinesis-priskalkylator
- Kinesis Data Streams kvoter och gränser
- AWS Arkitekturblogg - Händelsestyrda arkitekturer
- AWS-exempel - Kinesis-exempel
Relaterade artiklar:
- Jämförelse av värdkostnader för RabbitMQ på EKS och SQS
- TypeScript-minneslista: Grundläggande koncept och bästa praxis
- Python-minneslista
Slutsats
AWS Kinesis ger en robust grund för att bygga skalbara, händelsestyrda arkitekturer för mikrotjänster. Genom att följa dessa mönster och bästa praxis kan du skapa system som är resiliens, skalbara och underhållbara. Börja smått med en enda händelseström, validera din arkitektur och utöka gradvis till mer komplexa mönster när ditt system växer.
Nyckeln till framgång är att förstå dina krav på dataflöde, välja rätt Kinesis-tjänst för ditt användningsområde och implementera korrekt övervakning och felhantering från start.