[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.11.19
📕 학습 목록
- Step Functions 설계
- Lambda 함수 생성
📗 활동 내용
1. Step Functions 설계
1) AWS Step Functions 란?
- 애플리케이션 워크플로를 시각적으로 디자인하고 실행할 수 있도록 돕는 완전 관리형 서비스
- 여러 AWS 서비스와 자체 애플리케이션을 연결하여 복잡한 작업을 순서대로 실행하거나 조건에 따라 분기를 설정할 수 있음
- 단순한 직렬 워크플로 뿐만 아니라 병렬 처리, 중첩 워크플로, 조건부 흐름, 대량 데이터 병렬 처리(Map) 같은 복잡한 작업 관리에도 최적화되어 있음
- 이러한 기능을 활용하면 효율적이고 확장 가능한 애플리케이션 워크플로를 설계할 수 있음
2) 특징
- 상태 기반 워크플로
- 단계(State)로 구성된 워크플로를 정의
- 각 상태에서 전환(Transition) 및 결과를 체계적으로 관리
- 시각적 설계
- AWS Management Console에서 드래그 앤 드롭으로 설계 가능
- JSON 형식으로 상태 머신 정의 가능
- 서버리스 서비스
- 인프라 설정이나 관리 필요 없이 실행 가능
- 내결함성
- 오류 발생 시 재시도(Retry) 및 오류 처리(Catch) 로직 지원
- AWS 서비스 통합
- Lambda, S3, DynamoDB, SNS 등 다양한 AWS 서비스와 손쉽게 연동 가능
- 실시간 상태 추적
- 워크플로 실행 상태를 실시간으로 모니터링하고 디버깅 가능
3) 고급 기능
- 병렬 처리
- 병렬 상태(Parallel State)를 사용하여 여러 작업을 동시에 실행
- 병렬로 실행된 작업의 결과를 합산하거나 집계 가능
- 중첩 워크플로
- 워크플로를 계층화하여 상위 상태 머신이 하위 워크플로를 호출 가능
- 복잡한 로직을 단순화하고 재사용 가능
- 조건부 흐름 제어
- 조건(State Choice)을 사용하여 특정 조건에 따라 워크플로 경로를 분기
- 동적 로직 및 맞춤형 프로세스 설계 가능
- 작업 재시도 및 오류 처리
- 네트워크 문제나 임시적 오류를 처리하기 위한 재시도 로직 설정 가능
- 오류 발생 시 대체 경로를 설정하여 워크플로 지속 가능
- 장기 실행 작업 관리
- 최대 1년까지 실행 상태를 유지하며 비동기 작업도 관리 가능
- Map 상태
- 대규모 데이터 처리에 최적화된 반복 실행 로직 제공
- 데이터를 여러 조각으로 나눠 병렬 작업 처리
4) 활용사례
- 데이터 처리 파이프라인
- 데이터 수집, 변환, 분석, 저장 단계를 자동화
- 마이크로서비스 오케스트레이션
- 여러 Lambda 함수 및 API Gateway를 연결한 복잡한 비즈니스 로직 실행
- ETL(Extract, Transform, Load) 작업
- 데이터 추출, 변환, 저장 작업을 순차 또는 병렬로 처리
- 머신러닝 워크플로
- 데이터 준비, 모델 학습, 평가, 배포 단계 관리
- 장기 실행 작업
- 상태 저장이 필요한 비동기 작업 관리 (ex: 이메일 알림, 이벤트 재시도)
- 병렬 처리
- 여러 파일 업로드나 분석 작업을 동시에 실행하여 성능 최적화
4) 병렬 워크플로우 설계
2. Lambda 함수 설계
1) 사용자 영어 발음 채점 로직
- 사용자 음성 업로드: Django API → S3 버킷에 업로드 → PostgreSQL에 경로 저장
- Lambda 트리거: S3 이벤트 → Lambda 함수 실행
- 타임스탬프 추출: Google Speech-to-Text API 사용
- 음성 비교 및 채점: Lambda에서 음성 비교 후 채점 결과 생성
- 채점 결과 저장: Django API를 통해 PostgreSQL에 저장
[tip] 전체 흐름에서의 역할
• Django API
- 사용자 음성을 업로드하고 PostgreSQL에 경로와 관련 데이터를 저장
- 표준 음성과 사용자 음성을 매핑하기 위한 메타데이터 관리 (content_type, level, title, sentence 등)
• S3 이벤트와 Lambda
- Lambda가 트리거되면서 표준 음성과 사용자 음성을 매핑
- S3에 저장된 경로 정보(user_audio_key와 standard_audio_key)를 사용하여 두 파일을 가져오고, 비교 및 채점 수행
• Lambda → Django API
- Lambda가 채점 결과를 계산한 후 Django API를 호출하여 PostgreSQL에 결과 저장
2) 트리거 설계
2-1) Lambda 함수와 트리거의 차이
- Lambda 함수: 특정 작업을 수행하는 코드를 정의. 사용자가 정의한 함수로, 입력 데이터를 받아 처리하고 결과를 반환
- 트리거: Lambda 함수를 실행시키는 조건이나 이벤트를 설정. 예를 들어, S3에 파일이 업로드되거나 특정 시간에 실행되도록 설정할 수 있음
2-2) 트리거 설정 방법
① AWS Management Console에서 트리거 설정
- AWS Lambda 콘솔로 이동
- 해당 Lambda 함수를 선택(Step Functions 첫 번째 함수 'Timestamp Generation')
- Configuration 탭에서 Triggers 섹션으로 이동
- Add Trigger 버튼을 클릭
- Trigger configuration에서 S3를 선택하고
- Bucket: user-audio-file
- Event type: All object create events
- Prefix: 필요한 경우 특정 경로를 설정
- Suffix: 필요한 경우 파일 형식을 설정 (예: .wav)
- Add를 클릭하여 저장
② EventBridge를 사용하여 트리거 설정
- S3 버킷 이벤트 알림 설정
- AWS Management Console에서 S3로 이동
- 사용자 음성이 업로드되는 S3 버킷(user-audio-file)을 선택
- Properties 탭에서 아래로 스크롤하여 Event notifications 섹션으로 이동
- Create event notification 버튼을 클릭
- Event name: StepFunctionsTrigger
- Event types: All object create events를 선택
- Destination: Amazon EventBridge를 선택
- Save changes를 클릭하여 설정을 저장
- EventBridge에서 Step Functions 연결
- AWS Management Console에서 EventBridge로 이동
- Rules 섹션으로 이동 후 Create rule 클릭
- Name: TriggerStepFunctionsOnS3Upload
- Define pattern에서 Event source를 선택
- Event source: AWS services
- AWS service: S3
- Event type: Object Created
- Specific bucket name에 user-audio-file을 입력
- Prefix: path/to/ (경로를 지정해 특정 파일 업로드만 트리거할 경우 설정)
- Next step 클릭
- Target을 Step Functions state machine으로 선택
- State machine: 생성한 상태 머신 선택 (MyStateMachine)
- Input transformer를 사용하여 이벤트를 상태 머신의 입력 형식으로 변환
{
"bucket_name": "$.detail.bucket.name",
"user_audio_key": "$.detail.object.key"
}
11. Next step → Create rule을 클릭
3) Lambda 함수
- Lambda에서의 로컬 환경 개념
- AWS Lambda 함수는 실행될 때 AWS에서 제공하는 임시 실행 환경에서 실행됨
- 이 환경에는 파일을 임시로 저장할 수 있는 디렉토리(/tmp)가 제공됨
- /tmp는 512MB의 저장 공간만 사용할 수 있으며, Lambda 함수 실행이 종료되면 해당 디렉토리의 모든 데이터는 삭제됨
- "S3에서 파일 URL 가져와서 바로 Lambda 함수를 실행할 수 있는게 X, 임시 디렉토리에 저장해서 함수를 실행해야함"
① Timestamp 생성 Lambda 함수
import boto3
import os
import re
from google.cloud import speech
from google.cloud.speech_v1 import RecognitionAudio, RecognitionConfig
# S3 클라이언트 생성
s3 = boto3.client('s3')
def split_into_syllables(word):
"""
단어를 음절로 분리.
"""
return re.findall(r'[aeiouy]+[^aeiouy]*', word, re.IGNORECASE)
def generate_syllable_timestamps(word, start_time, end_time):
"""
단어 타임스탬프를 음절 타임스탬프로 변환.
"""
syllables = split_into_syllables(word)
duration_per_syllable = (end_time - start_time) / len(syllables)
syllable_timestamps = []
current_time = start_time
for syllable in syllables:
syllable_timestamps.append({
"syllable": syllable,
"start_time": current_time,
"end_time": current_time + duration_per_syllable
})
current_time += duration_per_syllable
return syllable_timestamps
def transcribe_audio(file_path):
"""
Google Speech-to-Text API를 사용해 단어별 타임스탬프 추출.
"""
client = speech.SpeechClient()
with open(file_path, "rb") as audio_file:
audio_data = audio_file.read()
audio = RecognitionAudio(content=audio_data)
config = RecognitionConfig(
encoding=RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="en-US",
enable_word_time_offsets=True
)
response = client.recognize(config=config, audio=audio)
word_timestamps = []
for result in response.results:
for word_info in result.alternatives[0].words:
word_timestamps.append({
"word": word_info.word,
"start_time": word_info.start_time.total_seconds(),
"end_time": word_info.end_time.total_seconds()
})
return word_timestamps
def parse_user_audio_key(user_audio_key):
"""
사용자 음성 경로를 분석하여 표준 음성 경로를 생성하는 데 필요한 정보를 추출.
"""
parts = user_audio_key.split('/')
content_type = parts[2] # conversation, novel, phonics
level = parts[3].split('_')[1] # level_1 → 1
title = parts[4].replace('_', ' ') # At_the_Toy_Store → At the Toy Store
line_number = parts[5].split('_')[1].split('.')[0] # audio_line_1.wav → 1
return content_type, level, title, line_number
def lambda_handler(event, context):
"""
AWS Lambda 핸들러 함수.
"""
# 입력 데이터
bucket_name = event['bucket_name']
user_audio_key = event['user_audio_key']
# 사용자 경로에서 표준 음성 경로 정보 추출
content_type, level, title, line_number = parse_user_audio_key(user_audio_key)
# 표준 음성 경로 생성
standard_audio_key = f"standard-audio-file/{content_type}/level_{level}/{title}/audio_line_{line_number}.wav"
# 로컬 파일 경로
user_audio_path = "/tmp/user_audio.wav"
standard_audio_path = "/tmp/standard_audio.wav"
google_credentials_path = "/tmp/google_credentials.json"
# S3에서 파일 다운로드
secret_key_bucket = 'the-secret-key'
google_credentials_key = "credentials/epaproject_blar_blar.json"
s3.download_file(bucket_name, user_audio_key, user_audio_path)
s3.download_file('standard-audio-file', standard_audio_key, standard_audio_path)
s3.download_file(secret_key_bucket, google_credentials_key, google_credentials_path)
# Google API 키 설정
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = google_credentials_path
# 사용자와 표준 음성의 단어 타임스탬프 추출
user_word_timestamps = transcribe_audio(user_audio_path)
standard_word_timestamps = transcribe_audio(standard_audio_path)
# 음절 타임스탬프 생성
user_syllable_timestamps = []
standard_syllable_timestamps = []
for word_data in user_word_timestamps:
user_syllable_timestamps.extend(
generate_syllable_timestamps(
word_data['word'],
word_data['start_time'],
word_data['end_time']
)
)
for word_data in standard_word_timestamps:
standard_syllable_timestamps.extend(
generate_syllable_timestamps(
word_data['word'],
word_data['start_time'],
word_data['end_time']
)
)
# 결과 반환
return {
"user_word_timestamps": user_word_timestamps,
"user_syllable_timestamps": user_syllable_timestamps,
"standard_word_timestamps": standard_word_timestamps,
"standard_syllable_timestamps": standard_syllable_timestamps
}
② Pitch∙Rhythm 패턴 유사도 Lambda 함수
import boto3
import librosa
import numpy as np
import parselmouth
def preprocess_audio(input_path, target_sr=16000):
"""오디오 파일 읽기 및 리샘플링"""
y, sr = librosa.load(input_path, sr=None)
if sr != target_sr:
y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
return y, target_sr
def extract_sentence_pitch(y, sr):
"""음성 파일에서 피치 추출"""
snd = parselmouth.Sound(y, sr)
pitch = snd.to_pitch()
frame_frequencies = pitch.selected_array["frequency"]
valid_frequencies = frame_frequencies[frame_frequencies > 0]
return valid_frequencies if len(valid_frequencies) > 0 else np.array([0])
def calculate_cosine_similarity(vec1, vec2):
"""NumPy를 사용한 코사인 유사도 계산"""
if len(vec1) == 0 or len(vec2) == 0:
return 0.0
vec1 = np.asarray(vec1)
vec2 = np.asarray(vec2)
dot_product = np.dot(vec1, vec2)
norm_vec1 = np.linalg.norm(vec1)
norm_vec2 = np.linalg.norm(vec2)
if norm_vec1 == 0 or norm_vec2 == 0:
return 0.0
return dot_product / (norm_vec1 * norm_vec2)
def calculate_pitch_similarity(user_pitch, ref_pitch):
"""피치 패턴 유사도 계산"""
return calculate_cosine_similarity(user_pitch, ref_pitch)
def lambda_handler(event, context):
# 입력 데이터
bucket_name = event['bucket_name']
user_audio_key = event['user_audio_key']
standard_audio_key = event['standard_audio_key']
# 로컬 경로
user_audio_path = "/tmp/user_audio.wav"
standard_audio_path = "/tmp/standard_audio.wav"
# S3에서 사용자 및 표준 음성 다운로드
s3 = boto3.client('s3')
s3.download_file(bucket_name, user_audio_key, user_audio_path)
s3.download_file(bucket_name, standard_audio_key, standard_audio_path)
# 사용자와 표준 음성의 피치 추출
y_user, sr_user = preprocess_audio(user_audio_path)
y_ref, sr_ref = preprocess_audio(standard_audio_path)
user_pitch = extract_sentence_pitch(y_user, sr_user)
ref_pitch = extract_sentence_pitch(y_ref, sr_ref)
# 피치 패턴 유사도 계산
pitch_similarity = calculate_pitch_similarity(user_pitch, ref_pitch)
# 결과 반환
return {
"pitch_similarity": pitch_similarity
}
③ 발화 속도 비율, 발화 중단 패턴 유사도, 잘못 인식된 단어 비율/리스트 Lambda 함수
import boto3
import numpy as np
def calculate_speed_ratio(user_timestamps, ref_timestamps):
"""발화 속도 비율 계산"""
if len(user_timestamps) == 0 or len(ref_timestamps) == 0:
return 0.0
user_duration = user_timestamps[-1]["end_time"] - user_timestamps[0]["start_time"]
ref_duration = ref_timestamps[-1]["end_time"] - ref_timestamps[0]["start_time"]
user_speed = len(user_timestamps) / user_duration if user_duration > 0 else 0
ref_speed = len(ref_timestamps) / ref_duration if ref_duration > 0 else 0
return user_speed / ref_speed if ref_speed > 0 else 0
def calculate_pause_pattern_similarity(user_timestamps, ref_timestamps):
"""중단 패턴 유사도 계산"""
user_pauses = [word["end_time"] - word["start_time"] for word in user_timestamps]
ref_pauses = [word["end_time"] - word["start_time"] for word in ref_timestamps]
return calculate_cosine_similarity(user_pauses, ref_pauses)
def calculate_cosine_similarity(vec1, vec2):
"""벡터 간 코사인 유사도 계산 (NumPy만 사용)"""
if len(vec1) == 0 or len(vec2) == 0:
return 0.0
vec1 = np.asarray(vec1)
vec2 = np.asarray(vec2)
dot_product = np.dot(vec1, vec2)
norm_vec1 = np.linalg.norm(vec1)
norm_vec2 = np.linalg.norm(vec2)
if norm_vec1 == 0 or norm_vec2 == 0:
return 0.0
return dot_product / (norm_vec1 * norm_vec2)
def lambda_handler(event, context):
# 입력 데이터
user_timestamps = event["user_timestamps"]
ref_timestamps = event["ref_timestamps"]
# 발화 속도 비율 계산
speed_ratio = calculate_speed_ratio(user_timestamps, ref_timestamps)
# 중단 패턴 유사도 계산
pause_similarity = calculate_pause_pattern_similarity(user_timestamps, ref_timestamps)
# 잘못 인식된 단어 비율 계산
user_words = [word["word"] for word in user_timestamps]
ref_words = [word["word"] for word in ref_timestamps]
mispronounced_words = list(set(user_words) - set(ref_words))
total_words = len(ref_words)
mispronounced_ratio = len(mispronounced_words) / total_words if total_words > 0 else 0
# 결과 반환
return {
"speed_ratio": speed_ratio,
"pause_similarity": pause_similarity,
"mispronounced_words": mispronounced_words,
"mispronounced_ratio": mispronounced_ratio
}
④ 채점 결과 PostgreSQL 저장 Lambda 함수
import boto3
import psycopg2
import json
def load_config_from_s3(bucket_name, config_key):
"""
S3에서 설정 파일(JSON 형식)을 가져오기
"""
s3 = boto3.client('s3')
config_path = "/tmp/config.json"
# S3에서 파일 다운로드
s3.download_file(bucket_name, config_key, config_path)
# JSON 로드
with open(config_path, 'r') as file:
config = json.load(file)
return config
def insert_score_to_db(db_config, user_data):
"""
PostgreSQL 데이터베이스에 채점 결과를 삽입
"""
try:
# PostgreSQL 연결
connection = psycopg2.connect(
dbname=db_config["NAME"],
user=db_config["USER"],
password=db_config["PASSWORD"],
host=db_config["HOST"],
port=db_config["PORT"]
)
cursor = connection.cursor()
# SQL 삽입 쿼리
insert_query = """
INSERT INTO app_userpronunciation (
user_id, content_type_id, object_id, audio_file, pitch_similarity,
rhythm_similarity, speed_ratio, pause_similarity,
mispronounced_words, mispronounced_ratio, feedback, status, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
"""
cursor.execute(insert_query, (
user_data['user_id'],
user_data['content_type_id'],
user_data['object_id'],
user_data['audio_file'],
user_data['pitch_similarity'],
user_data['rhythm_similarity'],
user_data['speed_ratio'],
user_data['pause_similarity'],
json.dumps(user_data['mispronounced_words']), # 리스트를 JSON 형식으로 저장
user_data['mispronounced_ratio'],
user_data['feedback'],
'completed'
))
# 변경 사항 커밋
connection.commit()
cursor.close()
connection.close()
return {"status": "success", "message": "Score inserted successfully"}
except Exception as e:
return {"status": "error", "message": str(e)}
def lambda_handler(event, context):
"""
Lambda 핸들러 함수
"""
# S3에서 PostgreSQL 설정 로드
bucket_name = "the-secret-key" # S3 버킷 이름
config_key = "postgres_config.json" # S3에 저장된 설정 파일 이름
db_config = load_config_from_s3(bucket_name, config_key)
# Lambda 호출 이벤트에서 데이터 추출
user_data = {
"user_id": event.get("user_id"),
"content_type_id": event.get("content_type_id"),
"object_id": event.get("object_id"),
"audio_file": event.get("audio_file"),
"pitch_similarity": event.get("pitch_similarity"),
"rhythm_similarity": event.get("rhythm_similarity"),
"speed_ratio": event.get("speed_ratio"),
"pause_similarity": event.get("pause_similarity"),
"mispronounced_words": event.get("mispronounced_words", []),
"mispronounced_ratio": event.get("mispronounced_ratio", 0.0),
"feedback": event.get("feedback", "")
}
# PostgreSQL에 채점 결과 저장
result = insert_score_to_db(db_config, user_data)
return result
📙 내일 일정
- 중간 프로젝트
'TIL _Today I Learned > 2024.11' 카테고리의 다른 글
[DAY 87] Django REST Framework & 중간 프로젝트 (0) | 2024.11.21 |
---|---|
[DAY 86] 중간 프로젝트_ 채점 기준 선정 (0) | 2024.11.20 |
[DAY 84] 중간 프로젝트_ LLM (0) | 2024.11.18 |
[DAY 83] 중간 프로젝트_ KST 알고리즘의 활용 (0) | 2024.11.15 |
[DAY 82] 중간 프로젝트_ KST 알고리즘의 활용 (0) | 2024.11.14 |