AWS Kinesis 를 활용한 이벤트 기반 마이크로서비스 구축
확장성을 위한 AWS Kinesis 기반 이벤트 기반 아키텍처
AWS Kinesis 는 최소한의 운영 오버헤드로 대규모 실시간 데이터 처리를 가능하게 하여 현대의 이벤트 기반 마이크로서비스 아키텍처 구축의 핵심 요소가 되었습니다.

이벤트 기반 마이크로서비스 아키텍처 이해하기
이벤트 기반 아키텍처 (EDA) 는 서비스들이 직접적인 동기 호출 대신 이벤트를 통해 상호 소통하는 디자인 패턴입니다. 이 방식은 다음과 같은 여러 장점을 제공합니다:
- 느슨한 결합 (Loose coupling): 서로의 존재를 알 필요 없이 서비스들이 독립적으로 작동
- 확장성: 각 서비스는 자신의 부하에 따라 독립적으로 확장 가능
- 복원력: 한 서비스의 장애가 다른 서비스로 연쇄적으로 전파되지 않음
- 유연성: 기존 서비스 수정 없이 새로운 서비스를 추가 가능
AWS Kinesis 는 생산자와 소비자를 분리하는 분산되고 내구성이 있는 이벤트 스트림으로서 EDA 구현의 기반을 제공합니다.
스트리밍 플랫폼에 대한 더 넓은 시각을 위해, 자체 호스팅 대안과의 비교를 위해 Apache Kafka Quickstart 가이드 를 참고하세요.
AWS Kinesis 개요
AWS 는 특정 사용 사례에 맞춰 설계된 여러 Kinesis 서비스를 제공합니다. 스트리밍 솔루션을 평가할 때, 다른 메시징 패턴과 비용 영향에 따라 RabbitMQ on EKS vs SQS 비교 도 고려해 볼 만합니다.
Kinesis Data Streams
데이터 레코드를 실시간으로 캡처, 저장 및 처리하는 핵심 스트리밍 서비스입니다. Data Streams 는 다음과 같은 경우에 적합합니다:
- 맞춤형 실시간 처리 애플리케이션
- 초 단위 지연 시간을 갖는 데이터 파이프라인 구축
- 초당 수백만 건의 이벤트 처리
- 이벤트 소싱 패턴 구현
Kinesis Data Firehose
S3, Redshift, Elasticsearch 또는 HTTP 엔드포인트와 같은 목적지로 스트리밍 데이터를 전달하는 완전히 관리되는 서비스입니다. 다음에 가장 적합합니다:
- 간단한 ETL 파이프라인
- 로그 집계 및 보관
- 준실시간 분석 (최소 60 초 지연 시간)
- 맞춤형 처리 로직이 필요 없는 시나리오
Kinesis Data Analytics
SQL 또는 Apache Flink 를 사용하여 스트리밍 데이터를 처리하고 분석합니다. 사용 사례는 다음과 같습니다:
- 실시간 대시보드
- 스트리밍 ETL
- 실시간 이상 탐지
- 지속적인 메트릭 생성
Kinesis 를 활용한 아키텍처 패턴
1. 이벤트 소싱 (Event Sourcing) 패턴
이벤트 소싱은 애플리케이션 상태의 모든 변경 사항을 이벤트 시퀀스로 저장합니다. Kinesis 는 이에 매우 적합합니다. Python 기초를 복습해야 한다면, Python 요령 을 확인하세요:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Kinesis 스트림에 이벤트 발행"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# 예제: 사용자 등록 이벤트
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (명령 및 쿼리 책임 분리)
Kinesis 를 이벤트 버스로서 사용하여 읽기 및 쓰기 작업 분리:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Lambda 와 함께한 팬아웃 (Fan-Out) 패턴
단일 스트림의 이벤트를 여러 Lambda 함수로 처리합니다. 더 강력한 타입 안전성을 위해 TypeScript 구현이 필요하면 TypeScript 요령 을 참조하세요:
// 이메일 알림을 위한 Lambda 소비자
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// 재고 업데이트를 위한 또 다른 Lambda
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
프로덕션 환경 모범 사례
1. 적절한 샤드 (Shard) 수 선택
다음 요소에 따라 샤드 요구 사항을 계산하세요:
- 인그레스 (Ingress): 샤드당 1 MB/초 또는 1,000 레코드/초
- 에그레스 (Egress): 샤드당 2 MB/초 (표준 소비자) 또는 향상된 팬아웃 사용 시 소비자당 2 MB/초
def calculate_shards(records_per_second, avg_record_size_kb):
"""필요한 샤드 수 계산"""
# 인그레스 용량
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # 버퍼 추가
2. 적절한 에러 핸들링 구현
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""지수 백오프 리트라이를 통한 레코드 저장"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 지수 백오프
continue
raise
3. 여러 소비자를 위한 향상된 팬아웃 (Enhanced Fan-Out) 사용
향상된 팬아웃은 각 소비자에게 전용 처리량을 제공합니다:
# 향상된 팬아웃으로 소비자 등록
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. 핵심 메트릭 모니터링
추적해야 할 필수 CloudWatch 메트릭:
IncomingRecords: 성공적으로 저장된 레코드 수IncomingBytes: 들어오는 데이터의 바이트 양GetRecords.IteratorAgeMilliseconds: 소비자가 뒤처진 정도WriteProvisionedThroughputExceeded: 스로틀링 이벤트ReadProvisionedThroughputExceeded: 소비자 스로틀링
5. 적절한 파티션 키 전략 구현
import hashlib
def get_partition_key(user_id, shard_count=10):
"""균등 분포를 위한 파티션 키 생성"""
# 균등 분포를 위한 일관된 해싱 사용
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
실전 구현 예제
주문 처리 마이크로서비스 아키텍처의 완전한 예제입니다:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""주문 생성 및 이벤트 발행"""
order_id = self.generate_order_id()
# 주문 생성 이벤트 발행
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Kinesis 스트림으로 이벤트 발행"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""주문 이벤트를 소비하고 재고를 업데이트"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# 재고 데이터베이스 업데이트
for item in order_data['items']:
# 구현 내용
pass
모놀리식에서 마이크로서비스로의 마이그레이션 전략
1 단계: strangler fig 패턴
모놀리식을 유지하면서 특정 이벤트를 Kinesis 를 통해 라우팅하는 것으로 시작합니다:
- 모놀리식 내의 바운디드 컨텍스트 (Bounded Context) 식별
- 컨텍스트 간 이벤트를 위한 Kinesis 스트림 생성
- 이러한 스트림에서 소비하는 서비스를 점진적으로 추출
- 모놀리식과의 역상호 호환성 유지
2 단계: 병렬 처리
구체적이고 새로운 시스템을 병렬로 실행합니다:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""레거시 시스템과 이벤트 스트림에 모두 작성"""
try:
# 먼저 새로운 시스템에 작성
publish_to_kinesis(kinesis_stream, data)
# 그 다음 레거시 시스템 업데이트
legacy_db.update(data)
except Exception as e:
# 보상 로직 구현
rollback_kinesis_event(kinesis_stream, data)
raise
3 단계: 완전한 마이그레이션
신뢰가 구축되면 모든 트래픽을 이벤트 기반 아키텍처로 라우팅합니다.
비용 최적화 전략
객체 저장소 및 데이터베이스 아키텍처를 포함한 데이터 인프라 패턴에 대한 포괄적인 가이드는 AI 시스템을 위한 데이터 인프라: 객체 저장소, 데이터베이스, 검색 및 AI 데이터 아키텍처 를 참조하세요.
1. 가변 워크로드를 위한 온디맨드 (On-Demand) 모드 사용
2023 년에 도입된 온디맨드 모드는 트래픽에 따라 자동으로 확장됩니다:
# 온디맨드 모드로 스트림 생성
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. 데이터 집계 구현
PUT 페이로드 단위를 줄이기 위해 레코드를 배치합니다:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""비용 절감을 위해 레코드 집계"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# 집계된 레코드 전송
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. 데이터 보존 기간 최적화
기본 보존 기간은 24 시간입니다. 필요한 경우에만 연장하세요:
# 보존 기간을 7 일로 설정
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
보안 모범 사례
1. 저장 중 및 전송 중 암호화
# 암호화된 스트림 생성
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# 암호화 활성화
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. 최소 권한을 위한 IAM 정책
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC 엔드포인트
트래픽을 AWS 네트워크 내부에 유지합니다. AWS 인프라를 코드로 관리하려면 Terraform 을 사용하는 것을 고려하세요 - Terraform 요령:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
관측 가능성 및 디버깅
X-Ray 를 활용한 분산 추적
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights 쿼리
-- 느린 처리 시간 찾기
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- 에러율 추적
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
고급 패턴
분산 트랜잭션을 위한 사가 (Saga) 패턴
마이크로서비스 간에 장기 실행 트랜잭션 구현:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""보상 로직을 포함한 사가 실행"""
try:
# 단계 1: 재고 예약
self.publish_command('RESERVE_INVENTORY', order_data)
# 단계 2: 결제 처리
self.publish_command('PROCESS_PAYMENT', order_data)
# 단계 3: 주문 발송
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# 역순으로 보상
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""보상 트랜잭션 실행"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
테스트 전략
LocalStack 를 활용한 로컬 개발
# Kinesis 와 함께 LocalStack 시작
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# 테스트 스트림 생성
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
통합 테스트
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""모킹된 Kinesis 로 이벤트 발행 테스트"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
성능 튜닝
배치 크기 최적화
def optimize_batch_processing(records, batch_size=500):
"""최적화된 배치로 레코드 처리"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
커넥션 풀링 사용
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
유용한 링크
AWS Kinesis 리소스:
- AWS Kinesis 문서
- AWS Kinesis Data Streams 개발자 가이드
- Kinesis Client Library (KCL)
- AWS Kinesis 가격 계산기
- Kinesis Data Streams 쿼터 및 제한
- AWS 아키텍처 블로그 - 이벤트 기반 아키텍처
- AWS Samples - Kinesis 예제
관련 기사:
결론
AWS Kinesis 는 확장 가능하고 이벤트 기반 마이크로서비스 아키텍처를 구축하기 위한 견고한 기반을 제공합니다. 이러한 패턴과 모범 사례를 따르시면 복원력이 높고 확장 가능하며 유지보수가 쉬운 시스템을 구축할 수 있습니다. 단일 이벤트 스트림으로 작게 시작하여 아키텍처를 검증한 후, 시스템이 성장함에 따라 더 복잡한 패턴으로 점진적으로 확장해 보세요.
성공의 열쇠는 데이터 흐름 요구 사항을 이해하고, 사용 사례에 맞는 올바른 Kinesis 서비스를 선택하며, 처음부터 적절한 모니터링과 에러 핸들링을 구현하는 것입니다.