Tworzenie mikroserwisów zorientowanych na zdarzenia przy użyciu AWS Kinesis

Architektura oparta na zdarzeniach z AWS Kinesis w celu skalowania.

Page content

AWS Kinesis stał się fundamentem budowania nowoczesnych architektur mikroserwisów opartych na zdarzeniach, umożliwiając przetwarzanie danych w czasie rzeczywistym w dużej skali przy minimalnym nakładzie operacyjnym.

amazon-kinesis

Zrozumienie architektury mikroserwisów opartych na zdarzeniach

Architektura oparta na zdarzeniach (EDA) to wzorzec projektowy, w którym usługi komunikują się poprzez zdarzenia zamiast bezpośrednich synchronicznych wywołań. To podejście oferuje kilka zalet:

  • Luźne powiązania: Usługi nie muszą znać istnienia innych usług
  • Skalowalność: Każda usługa skaluje się niezależnie w zależności od obciążenia
  • Odporność: Awaria jednej usługi nie kaskadowo wpływa na inne
  • Elastyczność: Nowe usługi można dodawać bez modyfikowania istniejących

AWS Kinesis stanowi szkieł do implementacji EDA, działając jako rozproszony, trwały strumień zdarzeń, który odłącza producentów od konsumentów.

W celu uzyskania szerszej perspektywy na platformy strumieniowe, zobacz nasz Skrócony przewodnik Apache Kafka w celu porównania z alternatywami hostowanymi samodzielnie.

Przegląd AWS Kinesis

AWS oferuje kilka usług Kinesis, każda zaprojektowana dla konkretnych przypadków użycia. Przy ocenie rozwiązań strumieniowych warto również rozważyć porównanie RabbitMQ na EKS vs SQS dla różnych wzorców komunikacyjnych i implikacji kosztowych.

Kinesis Data Streams

Podstawowa usługa strumieniowa, która rejestruje, przechowuje i przetwarza rekordy danych w czasie rzeczywistym. Data Streams jest idealna do:

  • Aplikacji z niestandardowym przetwarzaniem w czasie rzeczywistym
  • Budowania pipeline’ów danych z opóźnieniem poniżej sekundy
  • Przetwarzania milionów zdarzeń na sekundę
  • Implementowania wzorców Event Sourcing

Kinesis Data Firehose

W pełni zarządzana usługa, która dostarcza strumienie danych do destinacji takich jak S3, Redshift, Elasticsearch lub punkty końcowe HTTP. Najlepsza do:

  • Prosty pipeline’ów ETL
  • Agregacji i archiwizacji logów
  • Analizy w czasie prawie rzeczywistym (minimalne opóźnienie 60 sekund)
  • Scenariuszy, gdzie nie jest konieczna logika niestandardowego przetwarzania

Kinesis Data Analytics

Przetwarza i analizuje strumienie danych używając SQL lub Apache Flink. Przypadki użycia obejmują:

  • Dashbordów w czasie rzeczywistym
  • Strumieniowego ETL
  • Wykrywania anomalii w czasie rzeczywistym
  • Ciągłego generowania metryk

Wzorce architektoniczne z Kinesis

1. Wzorzec Event Sourcing

Event Sourcing przechowuje wszystkie zmiany stanu aplikacji jako sekwencję zdarzeń. Kinesis jest do tego idealny. Jeśli potrzebujesz odświeżenia podstaw Pythona, zobacz nasz Skrócony przewodnik Python:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Opublikuj zdarzenie do strumienia Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Przykład: Zdarzenie rejestracji użytkownika
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Rozdziel operacje czytania i zapisu, używając Kinesis jako szyny zdarzeń:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Wzorzec Fan-Out z Lambda

Przetwarzaj zdarzenia z pojedynczego strumienia za pomocą wielu funkcji Lambda. Dla implementacji w TypeScript z silniejszą typowością, odwołaj się do naszego Skróconego przewodnika TypeScript:

// Lambda consumer dla powiadomień e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Inna Lambda do aktualizacji stanów magazynowych
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Najlepsze praktyki dla produkcji

1. Wybór odpowiedniej liczby shardów

Oblicz wymagania dotyczące shardów w oparciu o:

  • Wejścia (Ingress): 1 MB/s lub 1000 rekordów/s na shard
  • Wyjścia (Egress): 2 MB/s na shard (standardowi konsumenci) lub 2 MB/s na konsumenta przy użyciu rozszerzonego Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Oblicz wymaganą liczbę shardów"""
    # Pojemność wejściowa
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Dodaj bufor

2. Wdrożenie odpowiedniego obsługi błędów

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Wstaw rekord z wykładniczym cofaniem przy ponowieniu"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Wykładnicze cofanie
                    continue
            raise

3. Użyj rozszerzonego Fan-Out dla wielu konsumentów

Rozszerzony Fan-Out zapewnia dedykowaną przepustowość dla każdego konsumenta:

# Zarejestruj konsumenta z rozszerzonym Fan-Out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitoruj kluczowe metryki

Niezbędne metryki CloudWatch do śledzenia:

  • IncomingRecords: Liczba pomyślnie wprowadzonych rekordów
  • IncomingBytes: Objętość bajtów przychodzących danych
  • GetRecords.IteratorAgeMilliseconds: Jak daleko konsumenci są w tyle
  • WriteProvisionedThroughputExceeded: Zdarzenia dławienia (throttling)
  • ReadProvisionedThroughputExceeded: Dławienie konsumentów

5. Wdrożenie odpowiedniej strategii kluczy partycji

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generuj klucz partycji z równomiernym rozkładem"""
    # Użyj spójnego hashowania dla równomiernego rozkładu
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Przykład wdrożenia w świecie rzeczywistym

Oto kompletny przykład architektury mikroserwisów do przetwarzania zamówień:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Stwórz zamówienie i opublikuj zdarzenia"""
        order_id = self.generate_order_id()
        
        # Opublikuj zdarzenie utworzenia zamówienia
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Opublikuj zdarzenie do strumienia Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Konsumuje zdarzenia zamówień i aktualizuje stan magazynowy"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Aktualizuj bazę danych stanów magazynowych
        for item in order_data['items']:
            # Implementacja tutaj
            pass

Strategia migracji z monolitu do mikroserwisów

Faza 1: Wzorzec Strangler Fig

Zacznij od kierowania konkretnych zdarzeń przez Kinesis, jednocześnie utrzymując monolit:

  1. Zidentyfikuj ograniczone konteksty w monolicie
  2. Stwórz strumienie Kinesis dla zdarzeń międzykontekstowych
  3. Stopniowo wyodrębnij usługi konsumujące z tych strumieni
  4. Utrzymuj kompatybilność wstecz z monolitem

Faza 2: Przetwarzanie równoległe

Uruchom stare i nowe systemy równolegle:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Zapisz do starego systemu i strumienia zdarzeń"""
    try:
        # Najpierw zapisz do nowego systemu
        publish_to_kinesis(kinesis_stream, data)
        
        # Następnie zaktualizuj stary system
        legacy_db.update(data)
    except Exception as e:
        # Wdroż logikę kompensacyjną
        rollback_kinesis_event(kinesis_stream, data)
        raise

Faza 3: Pełna migracja

Po uzyskaniu pewności, kieruj cały ruch przez architekturę opartą na zdarzeniach.

Strategie optymalizacji kosztów

Dla kompleksowego przewodnika po wzorcach infrastruktury danych, w tym magazynowaniu obiektowym i architekturach baz danych, zobacz Infrastruktura danych dla systemów AI: Magazynowanie obiektowe, bazy danych, wyszukiwanie i architektura danych AI.

1. Użyj trybu On-Demand dla zmiennych obciążeń

Tryb On-Demand (wprowadzony w 2023) automatycznie skaluje się w zależności od ruchu:

# Stwórz strumień w trybie On-Demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Wdrożenie agregacji danych

Zmniejsz jednostki ładunku PUT przez grupowanie rekordów:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Zgrupuj rekordy, aby zmniejszyć koszty"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Wyślij zgrupowany rekord
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Zoptymalizuj retencję danych

Domyślna retencja to 24 godziny. Rozszerz ją tylko w razie potrzeby:

# Ustaw retencję na 7 dni
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Najlepsze praktyki bezpieczeństwa

1. Szyfrowanie w spoczynku i w przesyłaniu

# Stwórz zaszyfrowany strumień
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Włącz szyfrowanie
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Polityki IAM dla zasady najmniejszych przywilejów

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Punkty końcowe VPC

Utrzymuj ruch w sieci AWS. Do zarządzania infrastrukturą AWS jako kodem, rozważ użycie Terraform - zobacz nasz Skrócony przewodnik Terraform:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Obserwowalność i debugowanie

Rozproszony ślad z X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Zapytania CloudWatch Logs Insights

-- Znajdź wolne czasy przetwarzania
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Śledź wskaźniki błędów
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Zaawansowane wzorce

Wzorzec Saga dla rozproszonych transakcji

Zaimplementuj długotrwałe transakcje w mikroserwisach:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Wykonaj sagę z logiką kompensacyjną"""
        try:
            # Krok 1: Rezerwuj stan magazynowy
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Krok 2: Przetwórz płatność
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Krok 3: Wysył zamówienie
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Kompensuj w odwrotnej kolejności
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Wykonaj transakcje kompensacyjne"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Strategie testowania

Rozwój lokalny z LocalStack

# Uruchom LocalStack z Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Stwórz testowy strumień
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Testy integracyjne

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test publikacji zdarzeń z zmockowanym Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Tunowanie wydajności

Optymalizacja rozmiaru partii

def optimize_batch_processing(records, batch_size=500):
    """Przetwarzaj rekordy w zoptymalizowanych partiach"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Użyj puli połączeń

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Przydatne linki

Zasoby AWS Kinesis:

Powiązane artykuły:

Podsumowanie

AWS Kinesis zapewnia solidne fundamenty do budowania skalowalnych architektur mikroserwisów opartych na zdarzeniach. Postępując zgodnie z tymi wzorcami i najlepszymi praktykami, możesz tworzyć systemy odporne, skalowalne i łatwe w utrzymaniu. Zacznij mało z pojedynczym strumieniem zdarzeń, zwaliduj swoją architekturę i stopniowo rozszerzaj ją do bardziej złożonych wzorców, gdy Twój system rośnie.

Kluczem do sukcesu jest zrozumienie wymagań dotyczących przepływu danych, wybranie odpowiedniej usługi Kinesis dla danego przypadku użycia oraz wdrożenie odpowiedniego monitorowania i obsługi błędów od samego początku.