AWS Kinesis 를 활용한 이벤트 기반 마이크로서비스 구축

확장성을 위한 AWS Kinesis 기반 이벤트 기반 아키텍처

Page content

AWS Kinesis 는 최소한의 운영 오버헤드로 대규모 실시간 데이터 처리를 가능하게 하여 현대의 이벤트 기반 마이크로서비스 아키텍처 구축의 핵심 요소가 되었습니다.

amazon-kinesis

이벤트 기반 마이크로서비스 아키텍처 이해하기

이벤트 기반 아키텍처 (EDA) 는 서비스들이 직접적인 동기 호출 대신 이벤트를 통해 상호 소통하는 디자인 패턴입니다. 이 방식은 다음과 같은 여러 장점을 제공합니다:

  • 느슨한 결합 (Loose coupling): 서로의 존재를 알 필요 없이 서비스들이 독립적으로 작동
  • 확장성: 각 서비스는 자신의 부하에 따라 독립적으로 확장 가능
  • 복원력: 한 서비스의 장애가 다른 서비스로 연쇄적으로 전파되지 않음
  • 유연성: 기존 서비스 수정 없이 새로운 서비스를 추가 가능

AWS Kinesis 는 생산자와 소비자를 분리하는 분산되고 내구성이 있는 이벤트 스트림으로서 EDA 구현의 기반을 제공합니다.

스트리밍 플랫폼에 대한 더 넓은 시각을 위해, 자체 호스팅 대안과의 비교를 위해 Apache Kafka Quickstart 가이드 를 참고하세요.

AWS Kinesis 개요

AWS 는 특정 사용 사례에 맞춰 설계된 여러 Kinesis 서비스를 제공합니다. 스트리밍 솔루션을 평가할 때, 다른 메시징 패턴과 비용 영향에 따라 RabbitMQ on EKS vs SQS 비교 도 고려해 볼 만합니다.

Kinesis Data Streams

데이터 레코드를 실시간으로 캡처, 저장 및 처리하는 핵심 스트리밍 서비스입니다. Data Streams 는 다음과 같은 경우에 적합합니다:

  • 맞춤형 실시간 처리 애플리케이션
  • 초 단위 지연 시간을 갖는 데이터 파이프라인 구축
  • 초당 수백만 건의 이벤트 처리
  • 이벤트 소싱 패턴 구현

Kinesis Data Firehose

S3, Redshift, Elasticsearch 또는 HTTP 엔드포인트와 같은 목적지로 스트리밍 데이터를 전달하는 완전히 관리되는 서비스입니다. 다음에 가장 적합합니다:

  • 간단한 ETL 파이프라인
  • 로그 집계 및 보관
  • 준실시간 분석 (최소 60 초 지연 시간)
  • 맞춤형 처리 로직이 필요 없는 시나리오

Kinesis Data Analytics

SQL 또는 Apache Flink 를 사용하여 스트리밍 데이터를 처리하고 분석합니다. 사용 사례는 다음과 같습니다:

  • 실시간 대시보드
  • 스트리밍 ETL
  • 실시간 이상 탐지
  • 지속적인 메트릭 생성

Kinesis 를 활용한 아키텍처 패턴

1. 이벤트 소싱 (Event Sourcing) 패턴

이벤트 소싱은 애플리케이션 상태의 모든 변경 사항을 이벤트 시퀀스로 저장합니다. Kinesis 는 이에 매우 적합합니다. Python 기초를 복습해야 한다면, Python 요령 을 확인하세요:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Kinesis 스트림에 이벤트 발행"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# 예제: 사용자 등록 이벤트
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (명령 및 쿼리 책임 분리)

Kinesis 를 이벤트 버스로서 사용하여 읽기 및 쓰기 작업 분리:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Lambda 와 함께한 팬아웃 (Fan-Out) 패턴

단일 스트림의 이벤트를 여러 Lambda 함수로 처리합니다. 더 강력한 타입 안전성을 위해 TypeScript 구현이 필요하면 TypeScript 요령 을 참조하세요:

// 이메일 알림을 위한 Lambda 소비자
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// 재고 업데이트를 위한 또 다른 Lambda
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

프로덕션 환경 모범 사례

1. 적절한 샤드 (Shard) 수 선택

다음 요소에 따라 샤드 요구 사항을 계산하세요:

  • 인그레스 (Ingress): 샤드당 1 MB/초 또는 1,000 레코드/초
  • 에그레스 (Egress): 샤드당 2 MB/초 (표준 소비자) 또는 향상된 팬아웃 사용 시 소비자당 2 MB/초
def calculate_shards(records_per_second, avg_record_size_kb):
    """필요한 샤드 수 계산"""
    # 인그레스 용량
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # 버퍼 추가

2. 적절한 에러 핸들링 구현

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """지수 백오프 리트라이를 통한 레코드 저장"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 지수 백오프
                    continue
            raise

3. 여러 소비자를 위한 향상된 팬아웃 (Enhanced Fan-Out) 사용

향상된 팬아웃은 각 소비자에게 전용 처리량을 제공합니다:

# 향상된 팬아웃으로 소비자 등록
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. 핵심 메트릭 모니터링

추적해야 할 필수 CloudWatch 메트릭:

  • IncomingRecords: 성공적으로 저장된 레코드 수
  • IncomingBytes: 들어오는 데이터의 바이트 양
  • GetRecords.IteratorAgeMilliseconds: 소비자가 뒤처진 정도
  • WriteProvisionedThroughputExceeded: 스로틀링 이벤트
  • ReadProvisionedThroughputExceeded: 소비자 스로틀링

5. 적절한 파티션 키 전략 구현

import hashlib

def get_partition_key(user_id, shard_count=10):
    """균등 분포를 위한 파티션 키 생성"""
    # 균등 분포를 위한 일관된 해싱 사용
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

실전 구현 예제

주문 처리 마이크로서비스 아키텍처의 완전한 예제입니다:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """주문 생성 및 이벤트 발행"""
        order_id = self.generate_order_id()
        
        # 주문 생성 이벤트 발행
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Kinesis 스트림으로 이벤트 발행"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """주문 이벤트를 소비하고 재고를 업데이트"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # 재고 데이터베이스 업데이트
        for item in order_data['items']:
            # 구현 내용
            pass

모놀리식에서 마이크로서비스로의 마이그레이션 전략

1 단계: strangler fig 패턴

모놀리식을 유지하면서 특정 이벤트를 Kinesis 를 통해 라우팅하는 것으로 시작합니다:

  1. 모놀리식 내의 바운디드 컨텍스트 (Bounded Context) 식별
  2. 컨텍스트 간 이벤트를 위한 Kinesis 스트림 생성
  3. 이러한 스트림에서 소비하는 서비스를 점진적으로 추출
  4. 모놀리식과의 역상호 호환성 유지

2 단계: 병렬 처리

구체적이고 새로운 시스템을 병렬로 실행합니다:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """레거시 시스템과 이벤트 스트림에 모두 작성"""
    try:
        # 먼저 새로운 시스템에 작성
        publish_to_kinesis(kinesis_stream, data)
        
        # 그 다음 레거시 시스템 업데이트
        legacy_db.update(data)
    except Exception as e:
        # 보상 로직 구현
        rollback_kinesis_event(kinesis_stream, data)
        raise

3 단계: 완전한 마이그레이션

신뢰가 구축되면 모든 트래픽을 이벤트 기반 아키텍처로 라우팅합니다.

비용 최적화 전략

객체 저장소 및 데이터베이스 아키텍처를 포함한 데이터 인프라 패턴에 대한 포괄적인 가이드는 AI 시스템을 위한 데이터 인프라: 객체 저장소, 데이터베이스, 검색 및 AI 데이터 아키텍처 를 참조하세요.

1. 가변 워크로드를 위한 온디맨드 (On-Demand) 모드 사용

2023 년에 도입된 온디맨드 모드는 트래픽에 따라 자동으로 확장됩니다:

# 온디맨드 모드로 스트림 생성
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. 데이터 집계 구현

PUT 페이로드 단위를 줄이기 위해 레코드를 배치합니다:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """비용 절감을 위해 레코드 집계"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # 집계된 레코드 전송
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. 데이터 보존 기간 최적화

기본 보존 기간은 24 시간입니다. 필요한 경우에만 연장하세요:

# 보존 기간을 7 일로 설정
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

보안 모범 사례

1. 저장 중 및 전송 중 암호화

# 암호화된 스트림 생성
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# 암호화 활성화
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. 최소 권한을 위한 IAM 정책

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC 엔드포인트

트래픽을 AWS 네트워크 내부에 유지합니다. AWS 인프라를 코드로 관리하려면 Terraform 을 사용하는 것을 고려하세요 - Terraform 요령:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

관측 가능성 및 디버깅

X-Ray 를 활용한 분산 추적

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights 쿼리

-- 느린 처리 시간 찾기
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- 에러율 추적
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

고급 패턴

분산 트랜잭션을 위한 사가 (Saga) 패턴

마이크로서비스 간에 장기 실행 트랜잭션 구현:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """보상 로직을 포함한 사가 실행"""
        try:
            # 단계 1: 재고 예약
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # 단계 2: 결제 처리
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # 단계 3: 주문 발송
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # 역순으로 보상
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """보상 트랜잭션 실행"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

테스트 전략

LocalStack 를 활용한 로컬 개발

# Kinesis 와 함께 LocalStack 시작
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# 테스트 스트림 생성
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

통합 테스트

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """모킹된 Kinesis 로 이벤트 발행 테스트"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

성능 튜닝

배치 크기 최적화

def optimize_batch_processing(records, batch_size=500):
    """최적화된 배치로 레코드 처리"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

커넥션 풀링 사용

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

유용한 링크

AWS Kinesis 리소스:

관련 기사:

결론

AWS Kinesis 는 확장 가능하고 이벤트 기반 마이크로서비스 아키텍처를 구축하기 위한 견고한 기반을 제공합니다. 이러한 패턴과 모범 사례를 따르시면 복원력이 높고 확장 가능하며 유지보수가 쉬운 시스템을 구축할 수 있습니다. 단일 이벤트 스트림으로 작게 시작하여 아키텍처를 검증한 후, 시스템이 성장함에 따라 더 복잡한 패턴으로 점진적으로 확장해 보세요.

성공의 열쇠는 데이터 흐름 요구 사항을 이해하고, 사용 사례에 맞는 올바른 Kinesis 서비스를 선택하며, 처음부터 적절한 모니터링과 에러 핸들링을 구현하는 것입니다.