[AWS] AWS 디커플링 애플리케이션: SQS, SNS, Kinesis, Active MQ

silver's avatar
Nov 22, 2025
[AWS] AWS 디커플링 애플리케이션: SQS, SNS, Kinesis, Active MQ

디커플링

동기 통신 (Tight Coupling) - 문제점

[서비스 A] --HTTP--> [서비스 B]B가 다운되면 A도 영향받음 트래픽 급증 시 B가 과부하

비동기 통신 (Loose Coupling) - 해결책

[서비스 A] --메시지--> [Queue] ---> [서비스 B]AB가 독립적으로 작동 B가 다운되어도 메시지는 큐에 대기

Amazon SQS (Simple Queue Service)

완전 관리형 메시지 큐 서비스
Producer → [SQS Queue] → Consumer (메시지 저장)
특징:
  • 무제한 처리량
  • 메시지 보관: 기본 4일, 최대 14일
  • 메시지 크기: 최대 256KB
  • 낮은 지연시간 (< 10ms)

표준 큐 (Standard Queue)

특징:
  • 무제한 처리량
  • 최소 1회 전달 (중복 가능)
  • 순서 보장 안 됨
Producer가 전송: [1, 2, 3, 4, 5] Consumer가 수신: [1, 3, 2, 5, 4, 2] (순서 뒤바뀜, 2 중복)
사용 사례:
  • 순서가 중요하지 않은 작업
  • 높은 처리량 필요
  • 중복 처리 가능한 워크로드

FIFO 큐

특징:
  • 정확히 1회 전달 (중복 없음)
  • 순서 보장
  • 처리량 제한: 초당 300 TPS (배치 시 3,000 TPS)
Producer가 전송: [1, 2, 3, 4, 5] Consumer가 수신: [1, 2, 3, 4, 5] (순서 보장)
사용 사례:
  • 금융 거래
  • 주문 처리
  • 순서가 중요한 작업
큐 이름: .fifo로 끝나야 함

메시지 생산 (Producer)

import boto3 sqs = boto3.client('sqs') queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue' # 메시지 전송 response = sqs.send_message( QueueUrl=queue_url, MessageBody='Hello SQS!', MessageAttributes={ 'Priority': {'StringValue': 'High', 'DataType': 'String'}, 'Timestamp': {'StringValue': '1700000000', 'DataType': 'Number'} } ) print(f"Message ID: {response['MessageId']}") # 배치 전송 (최대 10개) sqs.send_message_batch( QueueUrl=queue_url, Entries=[ {'Id': '1', 'MessageBody': 'Message 1'}, {'Id': '2', 'MessageBody': 'Message 2'}, {'Id': '3', 'MessageBody': 'Message 3'} ] )

메시지 소비 (Consumer)

# 메시지 수신 (롱 폴링) response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, # 최대 10개 WaitTimeSeconds=20, # 롱 폴링 (0-20초) MessageAttributeNames=['All'] ) for message in response.get('Messages', []): print(f"Body: {message['Body']}") # 처리 후 삭제 (중요!) sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'] )
→ 메시지는 처리 후 명시적으로 삭제해야 한다

Visibility Timeout (가시성 타임아웃)

1. Consumer가 메시지 수신 2. 메시지가 다른 Consumer에게 보이지 않음 (기본 30초) 3. 30초 내 처리 완료 → 삭제 4. 30초 초과 → 다시 큐에 나타남
# 타임아웃 연장 (처리 시간이 더 필요할 때) sqs.change_message_visibility( QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=60 # 60초로 연장 )

Long Polling (롱 폴링)

Short Polling (기본): - 즉시 반환 (메시지 없어도) - API 호출 많음 - 비용 증가 Long Polling (권장): - 메시지가 올 때까지 대기 (최대 20초) - API 호출 감소 - 비용 절감
# 롱 폴링 활성화 response = sqs.receive_message( QueueUrl=queue_url, WaitTimeSeconds=20 # 20초 대기 )

Dead Letter Queue (DLQ)

실패한 메시지를 별도 큐로 이동
[Main Queue] → 처리 실패 (3회) → [DLQ] ↓ 에러 분석 및 재처리
# DLQ 설정 sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'RedrivePolicy': json.dumps({ 'deadLetterTargetArn': 'arn:aws:sqs:region:account:DLQ', 'maxReceiveCount': '3' # 3회 실패 시 DLQ로 }) } )

Amazon SNS (Simple Notification Service)

개념

Pub/Sub (발행-구독) 모델의 메시징 서비스
Publisher → [SNS Topic] → Subscribers (여러 개) ├─ SQS ├─ Lambda ├─ HTTP/S ├─ Email └─ SMS
SQS와의 차이:
  • SQS: 1:1 (Producer → Consumer)
  • SNS: 1:N (Publisher → Multiple Subscribers)

Topic 생성 및 구독

import boto3 sns = boto3.client('sns') # Topic 생성 response = sns.create_topic(Name='MyTopic') topic_arn = response['TopicArn'] # 구독 추가 # 1. SQS sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint='arn:aws:sqs:region:account:MyQueue' ) # 2. Lambda sns.subscribe( TopicArn=topic_arn, Protocol='lambda', Endpoint='arn:aws:lambda:region:account:function:MyFunction' ) # 3. Email sns.subscribe( TopicArn=topic_arn, Protocol='email', Endpoint='user@example.com' ) # 4. HTTP/S sns.subscribe( TopicArn=topic_arn, Protocol='https', Endpoint='https://api.example.com/webhook' )

메시지 발행

# 모든 구독자에게 전송 sns.publish( TopicArn=topic_arn, Subject='Order Update', Message='Order #12345 has been shipped' ) # 메시지 속성으로 필터링 sns.publish( TopicArn=topic_arn, Message='New order received', MessageAttributes={ 'order_type': {'DataType': 'String', 'StringValue': 'premium'}, 'price': {'DataType': 'Number', 'StringValue': '100'} } )

구독 필터 정책

특정 조건의 메시지만 수신하도록 필터링
# "premium" 주문만 수신 filter_policy = { 'order_type': ['premium'] } sns.set_subscription_attributes( SubscriptionArn=subscription_arn, AttributeName='FilterPolicy', AttributeValue=json.dumps(filter_policy) )
Publisher가 발행: ├─ order_type: "premium" → Subscriber A (필터 통과) ├─ order_type: "standard" → 차단 └─ order_type: "premium" → Subscriber A (수신)

SNS FIFO Topics

순서를 보장하는 SNS
# FIFO Topic 생성 response = sns.create_topic( Name='MyTopic.fifo', Attributes={ 'FifoTopic': 'true', 'ContentBasedDeduplication': 'true' } ) # 메시지 발행 (Message Group ID 필요) sns.publish( TopicArn=topic_arn, Message='Order processed', MessageGroupId='order-group-1', MessageDeduplicationId='unique-id-123' )

SNS + SQS 팬아웃 패턴

하나의 메시지를 여러 SQS 큐로 전달
[Publisher] ↓ [SNS Topic] ├─→ [SQS Queue 1] → Email Service ├─→ [SQS Queue 2] → SMS Service └─→ [SQS Queue 3] → Analytics Service

설정

# 1. SNS Topic 생성 topic = sns.create_topic(Name='OrderEvents') topic_arn = topic['TopicArn'] # 2. SQS 큐들 생성 queues = [] for service in ['email', 'sms', 'analytics']: queue = sqs.create_queue(QueueName=f'{service}-queue') queues.append(queue['QueueUrl']) # 3. 큐 정책 설정 (SNS가 메시지 전송 가능하도록) policy = { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "sns.amazonaws.com"}, "Action": "sqs:SendMessage", "Resource": queue_arn, "Condition": { "ArnEquals": {"aws:SourceArn": topic_arn} } }] } sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={'Policy': json.dumps(policy)} ) # 4. SNS 구독 for queue_arn in queue_arns: sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn )
장점:
  • 각 서비스가 독립적으로 처리
  • 한 서비스 실패가 다른 서비스에 영향 없음
  • 새 서비스 추가 용이

Amazon Kinesis

실시간 스트리밍 데이터를 처리하는 서비스

Kinesis vs SQS

특성
SQS
Kinesis
용도
메시지 큐
실시간 스트리밍
보관
최대 14일
최대 365일
순서
FIFO만 보장
샤드별 보장
소비
삭제됨
재생 가능
처리량
무제한
샤드당 제한
선택:
  • 작업 큐, 비동기 처리 → SQS
  • 실시간 분석, 로그, IoT → Kinesis

Kinesis Data Streams

개념

Producers → [Kinesis Stream] → Consumers ├─ Shard 1 ├─ Shard 2 └─ Shard 3
Shard (샤드):
  • 읽기: 2 MB/s
  • 쓰기: 1 MB/s (초당 1,000 레코드)

생성 및 사용

import boto3 kinesis = boto3.client('kinesis') # 스트림 생성 kinesis.create_stream( StreamName='MyStream', ShardCount=3 # 3개 샤드 ) # 데이터 전송 kinesis.put_record( StreamName='MyStream', Data=json.dumps({'order_id': 123, 'amount': 100}), PartitionKey='order-123' # 같은 키는 같은 샤드로 ) # 배치 전송 (최대 500개) kinesis.put_records( StreamName='MyStream', Records=[ { 'Data': json.dumps({'id': i}), 'PartitionKey': f'key-{i}' } for i in range(100) ] )

데이터 읽기

# Shard Iterator 가져오기 response = kinesis.get_shard_iterator( StreamName='MyStream', ShardId='shardId-000000000000', ShardIteratorType='LATEST' # 최신 데이터부터 ) shard_iterator = response['ShardIterator'] # 데이터 읽기 while True: response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) for record in response['Records']: data = json.loads(record['Data']) print(data) # 다음 iterator shard_iterator = response['NextShardIterator'] if not response['Records']: time.sleep(1)

Capacity Modes

Provisioned (프로비저닝):
- 샤드 개수 지정 - 샤드당 비용: ~$0.015/시간 - 예측 가능한 워크로드
On-Demand (온디맨드):
- 자동 확장 - 사용량 기반 과금 - 예측 불가능한 워크로드

Kinesis Data Firehose

스트리밍 데이터를 자동으로 목적지에 전달
Sources → [Firehose] → Destinations ├─ 변환 (Lambda) └─ 배치 ↓ ├─ S3 ├─ Redshift ├─ OpenSearch └─ HTTP Endpoint

특징

  • 완전 관리형 (서버리스)
  • 자동 스케일링
  • 거의 실시간 (최소 60초 지연)
  • 데이터 변환 (Lambda)

설정

firehose = boto3.client('firehose') # Delivery Stream 생성 firehose.create_delivery_stream( DeliveryStreamName='MyStream', DeliveryStreamType='DirectPut', S3DestinationConfiguration={ 'RoleARN': 'arn:aws:iam::account:role/FirehoseRole', 'BucketARN': 'arn:aws:s3:::my-bucket', 'Prefix': 'logs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/', 'BufferingHints': { 'SizeInMBs': 5, # 5MB마다 'IntervalInSeconds': 300 # 또는 5분마다 }, 'CompressionFormat': 'GZIP' } ) # 데이터 전송 firehose.put_record( DeliveryStreamName='MyStream', Record={'Data': json.dumps({'event': 'click', 'timestamp': time.time()})} )

데이터 변환 (Lambda)

# Lambda 함수 def lambda_handler(event, context): output = [] for record in event['records']: # Base64 디코딩 payload = base64.b64decode(record['data']) data = json.loads(payload) # 변환 (예: 필드 추가) data['processed_at'] = time.time() # 인코딩 output_data = json.dumps(data) + '\n' output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(output_data.encode()) } output.append(output_record) return {'records': output}

SQS vs SNS vs Kinesis 비교

서비스
모델
보관
사용 사례
SQS
Queue (1:1)
14일
작업 큐, 비동기 처리
SNS
Pub/Sub (1:N)
없음
알림, 팬아웃
Kinesis
Stream (N:N)
365일
실시간 분석, 로그

선택 가이드

작업 큐, 디커플링 → SQS 알림, 여러 대상 → SNS 실시간 스트리밍, 로그 수집 → Kinesis ETL 파이프라인 → Kinesis Firehose

실전 시나리오

시나리오 1: 주문 처리 시스템

[API Gateway][Lambda: 주문 접수][SNS: OrderTopic] ├─→ [SQS: PaymentQueue] → 결제 처리 ├─→ [SQS: InventoryQueue] → 재고 차감 ├─→ [SQS: EmailQueue] → 이메일 발송 └─→ [Lambda: Analytics] → 실시간 분석
장점:
  • 각 서비스 독립적 처리
  • 한 서비스 실패해도 다른 서비스 정상 작동
  • 새 기능 추가 용이

시나리오 2: 실시간 로그 분석

[Application Servers][Kinesis Data Stream] ├─→ [Lambda: 실시간 알람] → 에러 즉시 감지 └─→ [Kinesis Firehose][S3][Athena] → 쿼리 분석

시나리오 3: IoT 데이터 수집

[IoT Devices] (수천 개) ↓ [Kinesis Data Stream] ├─→ [Lambda: 이상 탐지] └─→ [Kinesis Firehose][S3][Glue][Redshift]

모범 사례

SQS

  1. 롱 폴링 사용 (비용 절감)
  1. 배치 처리 (처리량 증가)
  1. DLQ 설정 (에러 처리)
  1. Visibility Timeout 적절히 설정
# 배치 처리 예시 messages = [] while len(messages) < 10: response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20 ) messages.extend(response.get('Messages', [])) # 배치 삭제 sqs.delete_message_batch( QueueUrl=queue_url, Entries=[ {'Id': str(i), 'ReceiptHandle': msg['ReceiptHandle']} for i, msg in enumerate(messages) ] )

SNS

  1. 구독 필터 활용 (불필요한 메시지 차단)
  1. 재시도 정책 설정
  1. DLQ 설정

Kinesis

  1. 적절한 샤드 수 (처리량에 맞게)
  1. Partition Key 균등 분배
  1. Enhanced Fan-Out (다수 Consumer)
# Partition Key를 해시로 균등 분배 import hashlib def get_partition_key(user_id): return hashlib.md5(str(user_id).encode()).hexdigest() kinesis.put_record( StreamName='MyStream', Data=data, PartitionKey=get_partition_key(user_id) )

비용 최적화

SQS

요청 비용: $0.40 / 백만 요청 최적화: - 롱 폴링 사용 (API 호출 감소) - 배치 처리 (10개씩) - 불필요한 폴링 제거 절감: 50-70%

Kinesis

Data Streams: - Provisioned: 샤드당 $0.015/시간 - On-Demand: $0.015/GB + $0.04/백만 요청 최적화: - 적절한 샤드 수 (과다 프로비저닝 방지) - 데이터 압축 - Firehose 사용 (관리 불필요)

💡
AWS 메시징 서비스는 확장 가능하고 탄력적인 아키텍처의 핵심으로 아래 서비스들의 적절한 조합으로 확장 가능하고 장애에 강한 시스템 구축이 가능
  • SQS: 작업 큐, 비동기 처리 (표준 / FIFO)
  • SNS: Pub/Sub, 팬아웃 패턴
  • Kinesis: 실시간 스트리밍 데이터 처리
선택 가이드:
  • 작업 디커플링 → SQS
  • 알림, 여러 구독자 → SNS
  • 실시간 분석, 로그 → Kinesis
  • 간단한 ETL → Kinesis Firehose
 
Share article

silver