Tworzenie mikroserwisów zorientowanych na zdarzenia przy użyciu AWS Kinesis
Architektura oparta na zdarzeniach z AWS Kinesis w celu skalowania.
AWS Kinesis stał się fundamentem budowania nowoczesnych architektur mikroserwisów opartych na zdarzeniach, umożliwiając przetwarzanie danych w czasie rzeczywistym w dużej skali przy minimalnym nakładzie operacyjnym.

Zrozumienie architektury mikroserwisów opartych na zdarzeniach
Architektura oparta na zdarzeniach (EDA) to wzorzec projektowy, w którym usługi komunikują się poprzez zdarzenia zamiast bezpośrednich synchronicznych wywołań. To podejście oferuje kilka zalet:
- Luźne powiązania: Usługi nie muszą znać istnienia innych usług
- Skalowalność: Każda usługa skaluje się niezależnie w zależności od obciążenia
- Odporność: Awaria jednej usługi nie kaskadowo wpływa na inne
- Elastyczność: Nowe usługi można dodawać bez modyfikowania istniejących
AWS Kinesis stanowi szkieł do implementacji EDA, działając jako rozproszony, trwały strumień zdarzeń, który odłącza producentów od konsumentów.
W celu uzyskania szerszej perspektywy na platformy strumieniowe, zobacz nasz Skrócony przewodnik Apache Kafka w celu porównania z alternatywami hostowanymi samodzielnie.
Przegląd AWS Kinesis
AWS oferuje kilka usług Kinesis, każda zaprojektowana dla konkretnych przypadków użycia. Przy ocenie rozwiązań strumieniowych warto również rozważyć porównanie RabbitMQ na EKS vs SQS dla różnych wzorców komunikacyjnych i implikacji kosztowych.
Kinesis Data Streams
Podstawowa usługa strumieniowa, która rejestruje, przechowuje i przetwarza rekordy danych w czasie rzeczywistym. Data Streams jest idealna do:
- Aplikacji z niestandardowym przetwarzaniem w czasie rzeczywistym
- Budowania pipeline’ów danych z opóźnieniem poniżej sekundy
- Przetwarzania milionów zdarzeń na sekundę
- Implementowania wzorców Event Sourcing
Kinesis Data Firehose
W pełni zarządzana usługa, która dostarcza strumienie danych do destinacji takich jak S3, Redshift, Elasticsearch lub punkty końcowe HTTP. Najlepsza do:
- Prosty pipeline’ów ETL
- Agregacji i archiwizacji logów
- Analizy w czasie prawie rzeczywistym (minimalne opóźnienie 60 sekund)
- Scenariuszy, gdzie nie jest konieczna logika niestandardowego przetwarzania
Kinesis Data Analytics
Przetwarza i analizuje strumienie danych używając SQL lub Apache Flink. Przypadki użycia obejmują:
- Dashbordów w czasie rzeczywistym
- Strumieniowego ETL
- Wykrywania anomalii w czasie rzeczywistym
- Ciągłego generowania metryk
Wzorce architektoniczne z Kinesis
1. Wzorzec Event Sourcing
Event Sourcing przechowuje wszystkie zmiany stanu aplikacji jako sekwencję zdarzeń. Kinesis jest do tego idealny. Jeśli potrzebujesz odświeżenia podstaw Pythona, zobacz nasz Skrócony przewodnik Python:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Opublikuj zdarzenie do strumienia Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Przykład: Zdarzenie rejestracji użytkownika
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Rozdziel operacje czytania i zapisu, używając Kinesis jako szyny zdarzeń:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Wzorzec Fan-Out z Lambda
Przetwarzaj zdarzenia z pojedynczego strumienia za pomocą wielu funkcji Lambda. Dla implementacji w TypeScript z silniejszą typowością, odwołaj się do naszego Skróconego przewodnika TypeScript:
// Lambda consumer dla powiadomień e-mail
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Inna Lambda do aktualizacji stanów magazynowych
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Najlepsze praktyki dla produkcji
1. Wybór odpowiedniej liczby shardów
Oblicz wymagania dotyczące shardów w oparciu o:
- Wejścia (Ingress): 1 MB/s lub 1000 rekordów/s na shard
- Wyjścia (Egress): 2 MB/s na shard (standardowi konsumenci) lub 2 MB/s na konsumenta przy użyciu rozszerzonego Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Oblicz wymaganą liczbę shardów"""
# Pojemność wejściowa
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Dodaj bufor
2. Wdrożenie odpowiedniego obsługi błędów
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Wstaw rekord z wykładniczym cofaniem przy ponowieniu"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Wykładnicze cofanie
continue
raise
3. Użyj rozszerzonego Fan-Out dla wielu konsumentów
Rozszerzony Fan-Out zapewnia dedykowaną przepustowość dla każdego konsumenta:
# Zarejestruj konsumenta z rozszerzonym Fan-Out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitoruj kluczowe metryki
Niezbędne metryki CloudWatch do śledzenia:
IncomingRecords: Liczba pomyślnie wprowadzonych rekordówIncomingBytes: Objętość bajtów przychodzących danychGetRecords.IteratorAgeMilliseconds: Jak daleko konsumenci są w tyleWriteProvisionedThroughputExceeded: Zdarzenia dławienia (throttling)ReadProvisionedThroughputExceeded: Dławienie konsumentów
5. Wdrożenie odpowiedniej strategii kluczy partycji
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generuj klucz partycji z równomiernym rozkładem"""
# Użyj spójnego hashowania dla równomiernego rozkładu
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Przykład wdrożenia w świecie rzeczywistym
Oto kompletny przykład architektury mikroserwisów do przetwarzania zamówień:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Stwórz zamówienie i opublikuj zdarzenia"""
order_id = self.generate_order_id()
# Opublikuj zdarzenie utworzenia zamówienia
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Opublikuj zdarzenie do strumienia Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Konsumuje zdarzenia zamówień i aktualizuje stan magazynowy"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Aktualizuj bazę danych stanów magazynowych
for item in order_data['items']:
# Implementacja tutaj
pass
Strategia migracji z monolitu do mikroserwisów
Faza 1: Wzorzec Strangler Fig
Zacznij od kierowania konkretnych zdarzeń przez Kinesis, jednocześnie utrzymując monolit:
- Zidentyfikuj ograniczone konteksty w monolicie
- Stwórz strumienie Kinesis dla zdarzeń międzykontekstowych
- Stopniowo wyodrębnij usługi konsumujące z tych strumieni
- Utrzymuj kompatybilność wstecz z monolitem
Faza 2: Przetwarzanie równoległe
Uruchom stare i nowe systemy równolegle:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Zapisz do starego systemu i strumienia zdarzeń"""
try:
# Najpierw zapisz do nowego systemu
publish_to_kinesis(kinesis_stream, data)
# Następnie zaktualizuj stary system
legacy_db.update(data)
except Exception as e:
# Wdroż logikę kompensacyjną
rollback_kinesis_event(kinesis_stream, data)
raise
Faza 3: Pełna migracja
Po uzyskaniu pewności, kieruj cały ruch przez architekturę opartą na zdarzeniach.
Strategie optymalizacji kosztów
Dla kompleksowego przewodnika po wzorcach infrastruktury danych, w tym magazynowaniu obiektowym i architekturach baz danych, zobacz Infrastruktura danych dla systemów AI: Magazynowanie obiektowe, bazy danych, wyszukiwanie i architektura danych AI.
1. Użyj trybu On-Demand dla zmiennych obciążeń
Tryb On-Demand (wprowadzony w 2023) automatycznie skaluje się w zależności od ruchu:
# Stwórz strumień w trybie On-Demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Wdrożenie agregacji danych
Zmniejsz jednostki ładunku PUT przez grupowanie rekordów:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Zgrupuj rekordy, aby zmniejszyć koszty"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Wyślij zgrupowany rekord
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Zoptymalizuj retencję danych
Domyślna retencja to 24 godziny. Rozszerz ją tylko w razie potrzeby:
# Ustaw retencję na 7 dni
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Najlepsze praktyki bezpieczeństwa
1. Szyfrowanie w spoczynku i w przesyłaniu
# Stwórz zaszyfrowany strumień
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Włącz szyfrowanie
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Polityki IAM dla zasady najmniejszych przywilejów
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Punkty końcowe VPC
Utrzymuj ruch w sieci AWS. Do zarządzania infrastrukturą AWS jako kodem, rozważ użycie Terraform - zobacz nasz Skrócony przewodnik Terraform:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Obserwowalność i debugowanie
Rozproszony ślad z X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Zapytania CloudWatch Logs Insights
-- Znajdź wolne czasy przetwarzania
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Śledź wskaźniki błędów
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Zaawansowane wzorce
Wzorzec Saga dla rozproszonych transakcji
Zaimplementuj długotrwałe transakcje w mikroserwisach:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Wykonaj sagę z logiką kompensacyjną"""
try:
# Krok 1: Rezerwuj stan magazynowy
self.publish_command('RESERVE_INVENTORY', order_data)
# Krok 2: Przetwórz płatność
self.publish_command('PROCESS_PAYMENT', order_data)
# Krok 3: Wysył zamówienie
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensuj w odwrotnej kolejności
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Wykonaj transakcje kompensacyjne"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Strategie testowania
Rozwój lokalny z LocalStack
# Uruchom LocalStack z Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Stwórz testowy strumień
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Testy integracyjne
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Test publikacji zdarzeń z zmockowanym Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Tunowanie wydajności
Optymalizacja rozmiaru partii
def optimize_batch_processing(records, batch_size=500):
"""Przetwarzaj rekordy w zoptymalizowanych partiach"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Użyj puli połączeń
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Przydatne linki
Zasoby AWS Kinesis:
- Dokumentacja AWS Kinesis
- Przewodnik dewelopera AWS Kinesis Data Streams
- Biblioteka klienta Kinesis (KCL)
- Kalkulator cen AWS Kinesis
- Kwoty i limity Kinesis Data Streams
- Blog architektury AWS - Architektury oparte na zdarzeniach
- Przykłady AWS - Kinesis
Powiązane artykuły:
- Porównanie kosztów hostowania RabbitMQ na Eks vs Sqs
- Skrócony przewodnik TypeScript: Podstawowe koncepcje i najlepsze praktyki
- Skrócony przewodnik Python
Podsumowanie
AWS Kinesis zapewnia solidne fundamenty do budowania skalowalnych architektur mikroserwisów opartych na zdarzeniach. Postępując zgodnie z tymi wzorcami i najlepszymi praktykami, możesz tworzyć systemy odporne, skalowalne i łatwe w utrzymaniu. Zacznij mało z pojedynczym strumieniem zdarzeń, zwaliduj swoją architekturę i stopniowo rozszerzaj ją do bardziej złożonych wzorców, gdy Twój system rośnie.
Kluczem do sukcesu jest zrozumienie wymagań dotyczących przepływu danych, wybranie odpowiedniej usługi Kinesis dla danego przypadku użycia oraz wdrożenie odpowiedniego monitorowania i obsługi błędów od samego początku.