디커플링
동기 통신 (Tight Coupling) - 문제점
[서비스 A] --HTTP--> [서비스 B]
↓
B가 다운되면 A도 영향받음
트래픽 급증 시 B가 과부하
비동기 통신 (Loose Coupling) - 해결책
[서비스 A] --메시지--> [Queue] ---> [서비스 B]
↓
A와 B가 독립적으로 작동
B가 다운되어도 메시지는 큐에 대기
Amazon SQS (Simple Queue Service)
완전 관리형 메시지 큐 서비스
Producer → [SQS Queue] → Consumer
(메시지 저장)
특징:
- 무제한 처리량
- 메시지 보관: 기본 4일, 최대 14일
- 메시지 크기: 최대 256KB
- 낮은 지연시간 (< 10ms)
표준 큐 (Standard Queue)
특징:
- 무제한 처리량
- 최소 1회 전달 (중복 가능)
- 순서 보장 안 됨
Producer가 전송: [1, 2, 3, 4, 5]
Consumer가 수신: [1, 3, 2, 5, 4, 2] (순서 뒤바뀜, 2 중복)
사용 사례:
- 순서가 중요하지 않은 작업
- 높은 처리량 필요
- 중복 처리 가능한 워크로드
FIFO 큐
특징:
- 정확히 1회 전달 (중복 없음)
- 순서 보장
- 처리량 제한: 초당 300 TPS (배치 시 3,000 TPS)
Producer가 전송: [1, 2, 3, 4, 5]
Consumer가 수신: [1, 2, 3, 4, 5] (순서 보장)
사용 사례:
- 금융 거래
- 주문 처리
- 순서가 중요한 작업
큐 이름:
.fifo로 끝나야 함메시지 생산 (Producer)
import boto3
sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue'
# 메시지 전송
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody='Hello SQS!',
MessageAttributes={
'Priority': {'StringValue': 'High', 'DataType': 'String'},
'Timestamp': {'StringValue': '1700000000', 'DataType': 'Number'}
}
)
print(f"Message ID: {response['MessageId']}")
# 배치 전송 (최대 10개)
sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[
{'Id': '1', 'MessageBody': 'Message 1'},
{'Id': '2', 'MessageBody': 'Message 2'},
{'Id': '3', 'MessageBody': 'Message 3'}
]
)
메시지 소비 (Consumer)
# 메시지 수신 (롱 폴링)
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10, # 최대 10개
WaitTimeSeconds=20, # 롱 폴링 (0-20초)
MessageAttributeNames=['All']
)
for message in response.get('Messages', []):
print(f"Body: {message['Body']}")
# 처리 후 삭제 (중요!)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
→ 메시지는 처리 후 명시적으로 삭제해야 한다
Visibility Timeout (가시성 타임아웃)
1. Consumer가 메시지 수신
2. 메시지가 다른 Consumer에게 보이지 않음 (기본 30초)
3. 30초 내 처리 완료 → 삭제
4. 30초 초과 → 다시 큐에 나타남
# 타임아웃 연장 (처리 시간이 더 필요할 때)
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=60 # 60초로 연장
)
Long Polling (롱 폴링)
Short Polling (기본):
- 즉시 반환 (메시지 없어도)
- API 호출 많음
- 비용 증가
Long Polling (권장):
- 메시지가 올 때까지 대기 (최대 20초)
- API 호출 감소
- 비용 절감
# 롱 폴링 활성화
response = sqs.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=20 # 20초 대기
)
Dead Letter Queue (DLQ)
실패한 메시지를 별도 큐로 이동
[Main Queue] → 처리 실패 (3회) → [DLQ]
↓
에러 분석 및 재처리
# DLQ 설정
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': 'arn:aws:sqs:region:account:DLQ',
'maxReceiveCount': '3' # 3회 실패 시 DLQ로
})
}
)
Amazon SNS (Simple Notification Service)
개념
Pub/Sub (발행-구독) 모델의 메시징 서비스
Publisher → [SNS Topic] → Subscribers (여러 개)
├─ SQS
├─ Lambda
├─ HTTP/S
├─ Email
└─ SMS
SQS와의 차이:
- SQS: 1:1 (Producer → Consumer)
- SNS: 1:N (Publisher → Multiple Subscribers)
Topic 생성 및 구독
import boto3
sns = boto3.client('sns')
# Topic 생성
response = sns.create_topic(Name='MyTopic')
topic_arn = response['TopicArn']
# 구독 추가
# 1. SQS
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint='arn:aws:sqs:region:account:MyQueue'
)
# 2. Lambda
sns.subscribe(
TopicArn=topic_arn,
Protocol='lambda',
Endpoint='arn:aws:lambda:region:account:function:MyFunction'
)
# 3. Email
sns.subscribe(
TopicArn=topic_arn,
Protocol='email',
Endpoint='user@example.com'
)
# 4. HTTP/S
sns.subscribe(
TopicArn=topic_arn,
Protocol='https',
Endpoint='https://api.example.com/webhook'
)
메시지 발행
# 모든 구독자에게 전송
sns.publish(
TopicArn=topic_arn,
Subject='Order Update',
Message='Order #12345 has been shipped'
)
# 메시지 속성으로 필터링
sns.publish(
TopicArn=topic_arn,
Message='New order received',
MessageAttributes={
'order_type': {'DataType': 'String', 'StringValue': 'premium'},
'price': {'DataType': 'Number', 'StringValue': '100'}
}
)
구독 필터 정책
특정 조건의 메시지만 수신하도록 필터링
# "premium" 주문만 수신
filter_policy = {
'order_type': ['premium']
}
sns.set_subscription_attributes(
SubscriptionArn=subscription_arn,
AttributeName='FilterPolicy',
AttributeValue=json.dumps(filter_policy)
)
Publisher가 발행:
├─ order_type: "premium" → Subscriber A (필터 통과)
├─ order_type: "standard" → 차단
└─ order_type: "premium" → Subscriber A (수신)
SNS FIFO Topics
순서를 보장하는 SNS
# FIFO Topic 생성
response = sns.create_topic(
Name='MyTopic.fifo',
Attributes={
'FifoTopic': 'true',
'ContentBasedDeduplication': 'true'
}
)
# 메시지 발행 (Message Group ID 필요)
sns.publish(
TopicArn=topic_arn,
Message='Order processed',
MessageGroupId='order-group-1',
MessageDeduplicationId='unique-id-123'
)
SNS + SQS 팬아웃 패턴
하나의 메시지를 여러 SQS 큐로 전달
[Publisher]
↓
[SNS Topic]
├─→ [SQS Queue 1] → Email Service
├─→ [SQS Queue 2] → SMS Service
└─→ [SQS Queue 3] → Analytics Service
설정
# 1. SNS Topic 생성
topic = sns.create_topic(Name='OrderEvents')
topic_arn = topic['TopicArn']
# 2. SQS 큐들 생성
queues = []
for service in ['email', 'sms', 'analytics']:
queue = sqs.create_queue(QueueName=f'{service}-queue')
queues.append(queue['QueueUrl'])
# 3. 큐 정책 설정 (SNS가 메시지 전송 가능하도록)
policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": queue_arn,
"Condition": {
"ArnEquals": {"aws:SourceArn": topic_arn}
}
}]
}
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={'Policy': json.dumps(policy)}
)
# 4. SNS 구독
for queue_arn in queue_arns:
sns.subscribe(
TopicArn=topic_arn,
Protocol='sqs',
Endpoint=queue_arn
)
장점:
- 각 서비스가 독립적으로 처리
- 한 서비스 실패가 다른 서비스에 영향 없음
- 새 서비스 추가 용이
Amazon Kinesis
실시간 스트리밍 데이터를 처리하는 서비스
Kinesis vs SQS
특성 | SQS | Kinesis |
용도 | 메시지 큐 | 실시간 스트리밍 |
보관 | 최대 14일 | 최대 365일 |
순서 | FIFO만 보장 | 샤드별 보장 |
소비 | 삭제됨 | 재생 가능 |
처리량 | 무제한 | 샤드당 제한 |
선택:
- 작업 큐, 비동기 처리 → SQS
- 실시간 분석, 로그, IoT → Kinesis
Kinesis Data Streams
개념
Producers → [Kinesis Stream] → Consumers
├─ Shard 1
├─ Shard 2
└─ Shard 3
Shard (샤드):
- 읽기: 2 MB/s
- 쓰기: 1 MB/s (초당 1,000 레코드)
생성 및 사용
import boto3
kinesis = boto3.client('kinesis')
# 스트림 생성
kinesis.create_stream(
StreamName='MyStream',
ShardCount=3 # 3개 샤드
)
# 데이터 전송
kinesis.put_record(
StreamName='MyStream',
Data=json.dumps({'order_id': 123, 'amount': 100}),
PartitionKey='order-123' # 같은 키는 같은 샤드로
)
# 배치 전송 (최대 500개)
kinesis.put_records(
StreamName='MyStream',
Records=[
{
'Data': json.dumps({'id': i}),
'PartitionKey': f'key-{i}'
}
for i in range(100)
]
)
데이터 읽기
# Shard Iterator 가져오기
response = kinesis.get_shard_iterator(
StreamName='MyStream',
ShardId='shardId-000000000000',
ShardIteratorType='LATEST' # 최신 데이터부터
)
shard_iterator = response['ShardIterator']
# 데이터 읽기
while True:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
data = json.loads(record['Data'])
print(data)
# 다음 iterator
shard_iterator = response['NextShardIterator']
if not response['Records']:
time.sleep(1)
Capacity Modes
Provisioned (프로비저닝):
- 샤드 개수 지정
- 샤드당 비용: ~$0.015/시간
- 예측 가능한 워크로드
On-Demand (온디맨드):
- 자동 확장
- 사용량 기반 과금
- 예측 불가능한 워크로드
Kinesis Data Firehose
스트리밍 데이터를 자동으로 목적지에 전달
Sources → [Firehose] → Destinations
├─ 변환 (Lambda)
└─ 배치
↓
├─ S3
├─ Redshift
├─ OpenSearch
└─ HTTP Endpoint
특징
- 완전 관리형 (서버리스)
- 자동 스케일링
- 거의 실시간 (최소 60초 지연)
- 데이터 변환 (Lambda)
설정
firehose = boto3.client('firehose')
# Delivery Stream 생성
firehose.create_delivery_stream(
DeliveryStreamName='MyStream',
DeliveryStreamType='DirectPut',
S3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::account:role/FirehoseRole',
'BucketARN': 'arn:aws:s3:::my-bucket',
'Prefix': 'logs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/',
'BufferingHints': {
'SizeInMBs': 5, # 5MB마다
'IntervalInSeconds': 300 # 또는 5분마다
},
'CompressionFormat': 'GZIP'
}
)
# 데이터 전송
firehose.put_record(
DeliveryStreamName='MyStream',
Record={'Data': json.dumps({'event': 'click', 'timestamp': time.time()})}
)
데이터 변환 (Lambda)
# Lambda 함수
def lambda_handler(event, context):
output = []
for record in event['records']:
# Base64 디코딩
payload = base64.b64decode(record['data'])
data = json.loads(payload)
# 변환 (예: 필드 추가)
data['processed_at'] = time.time()
# 인코딩
output_data = json.dumps(data) + '\n'
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(output_data.encode())
}
output.append(output_record)
return {'records': output}
SQS vs SNS vs Kinesis 비교
서비스 | 모델 | 보관 | 사용 사례 |
SQS | Queue (1:1) | 14일 | 작업 큐, 비동기 처리 |
SNS | Pub/Sub (1:N) | 없음 | 알림, 팬아웃 |
Kinesis | Stream (N:N) | 365일 | 실시간 분석, 로그 |
선택 가이드
작업 큐, 디커플링 → SQS 알림, 여러 대상 → SNS 실시간 스트리밍, 로그 수집 → Kinesis ETL 파이프라인 → Kinesis Firehose
실전 시나리오
시나리오 1: 주문 처리 시스템
[API Gateway]
↓
[Lambda: 주문 접수]
↓
[SNS: OrderTopic]
├─→ [SQS: PaymentQueue] → 결제 처리
├─→ [SQS: InventoryQueue] → 재고 차감
├─→ [SQS: EmailQueue] → 이메일 발송
└─→ [Lambda: Analytics] → 실시간 분석
장점:
- 각 서비스 독립적 처리
- 한 서비스 실패해도 다른 서비스 정상 작동
- 새 기능 추가 용이
시나리오 2: 실시간 로그 분석
[Application Servers]
↓
[Kinesis Data Stream]
├─→ [Lambda: 실시간 알람] → 에러 즉시 감지
└─→ [Kinesis Firehose]
↓
[S3] → [Athena] → 쿼리 분석
시나리오 3: IoT 데이터 수집
[IoT Devices] (수천 개)
↓
[Kinesis Data Stream]
├─→ [Lambda: 이상 탐지]
└─→ [Kinesis Firehose]
↓
[S3] → [Glue] → [Redshift]
모범 사례
SQS
- 롱 폴링 사용 (비용 절감)
- 배치 처리 (처리량 증가)
- DLQ 설정 (에러 처리)
- Visibility Timeout 적절히 설정
# 배치 처리 예시
messages = []
while len(messages) < 10:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages.extend(response.get('Messages', []))
# 배치 삭제
sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=[
{'Id': str(i), 'ReceiptHandle': msg['ReceiptHandle']}
for i, msg in enumerate(messages)
]
)
SNS
- 구독 필터 활용 (불필요한 메시지 차단)
- 재시도 정책 설정
- DLQ 설정
Kinesis
- 적절한 샤드 수 (처리량에 맞게)
- Partition Key 균등 분배
- Enhanced Fan-Out (다수 Consumer)
# Partition Key를 해시로 균등 분배
import hashlib
def get_partition_key(user_id):
return hashlib.md5(str(user_id).encode()).hexdigest()
kinesis.put_record(
StreamName='MyStream',
Data=data,
PartitionKey=get_partition_key(user_id)
)
비용 최적화
SQS
요청 비용: $0.40 / 백만 요청
최적화:
- 롱 폴링 사용 (API 호출 감소)
- 배치 처리 (10개씩)
- 불필요한 폴링 제거
절감: 50-70%
Kinesis
Data Streams:
- Provisioned: 샤드당 $0.015/시간
- On-Demand: $0.015/GB + $0.04/백만 요청
최적화:
- 적절한 샤드 수 (과다 프로비저닝 방지)
- 데이터 압축
- Firehose 사용 (관리 불필요)
AWS 메시징 서비스는 확장 가능하고 탄력적인 아키텍처의 핵심으로 아래 서비스들의 적절한 조합으로 확장 가능하고 장애에 강한 시스템 구축이 가능
- SQS: 작업 큐, 비동기 처리 (표준 / FIFO)
- SNS: Pub/Sub, 팬아웃 패턴
- Kinesis: 실시간 스트리밍 데이터 처리
선택 가이드:
- 작업 디커플링 → SQS
- 알림, 여러 구독자 → SNS
- 실시간 분석, 로그 → Kinesis
- 간단한 ETL → Kinesis Firehose
Share article