본문 바로가기
TIL _Today I Learned/2024.11

[DAY 85] Lambda, Step Functions & 중간 프로젝트

by gamdong2 2024. 11. 19.
[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.11.19

📕 학습 목록

  • Step Functions 설계
  • Lambda 함수 생성

 

📗 활동 내용

1. Step Functions 설계

1) AWS Step Functions 란?

  • 애플리케이션 워크플로를 시각적으로 디자인하고 실행할 수 있도록 돕는 완전 관리형 서비스
  • 여러 AWS 서비스와 자체 애플리케이션을 연결하여 복잡한 작업을 순서대로 실행하거나 조건에 따라 분기를 설정할 수 있음
  • 단순한 직렬 워크플로 뿐만 아니라 병렬 처리, 중첩 워크플로, 조건부 흐름, 대량 데이터 병렬 처리(Map) 같은 복잡한 작업 관리에도 최적화되어 있음
  • 이러한 기능을 활용하면 효율적이고 확장 가능한 애플리케이션 워크플로를 설계할 수 있음

2) 특징

  • 상태 기반 워크플로
    • 단계(State)로 구성된 워크플로를 정의
    • 각 상태에서 전환(Transition) 및 결과를 체계적으로 관리
  • 시각적 설계
    • AWS Management Console에서 드래그 앤 드롭으로 설계 가능
    • JSON 형식으로 상태 머신 정의 가능
  • 서버리스 서비스
    • 인프라 설정이나 관리 필요 없이 실행 가능
  • 내결함성
    • 오류 발생 시 재시도(Retry) 및 오류 처리(Catch) 로직 지원
  • AWS 서비스 통합
    • Lambda, S3, DynamoDB, SNS 등 다양한 AWS 서비스와 손쉽게 연동 가능
  • 실시간 상태 추적
    • 워크플로 실행 상태를 실시간으로 모니터링하고 디버깅 가능

3) 고급 기능

  • 병렬 처리
    • 병렬 상태(Parallel State)를 사용하여 여러 작업을 동시에 실행
    • 병렬로 실행된 작업의 결과를 합산하거나 집계 가능
  • 중첩 워크플로
    • 워크플로를 계층화하여 상위 상태 머신이 하위 워크플로를 호출 가능
    • 복잡한 로직을 단순화하고 재사용 가능
  • 조건부 흐름 제어
    • 조건(State Choice)을 사용하여 특정 조건에 따라 워크플로 경로를 분기
    • 동적 로직 및 맞춤형 프로세스 설계 가능
  • 작업 재시도 및 오류 처리
    • 네트워크 문제나 임시적 오류를 처리하기 위한 재시도 로직 설정 가능
    • 오류 발생 시 대체 경로를 설정하여 워크플로 지속 가능
  • 장기 실행 작업 관리
    • 최대 1년까지 실행 상태를 유지하며 비동기 작업도 관리 가능
  • Map 상태
    • 대규모 데이터 처리에 최적화된 반복 실행 로직 제공
    • 데이터를 여러 조각으로 나눠 병렬 작업 처리

4) 활용사례

  • 데이터 처리 파이프라인
    • 데이터 수집, 변환, 분석, 저장 단계를 자동화
  • 마이크로서비스 오케스트레이션
    • 여러 Lambda 함수 및 API Gateway를 연결한 복잡한 비즈니스 로직 실행
  • ETL(Extract, Transform, Load) 작업
    • 데이터 추출, 변환, 저장 작업을 순차 또는 병렬로 처리
  • 머신러닝 워크플로
    • 데이터 준비, 모델 학습, 평가, 배포 단계 관리
  • 장기 실행 작업
    • 상태 저장이 필요한 비동기 작업 관리 (ex: 이메일 알림, 이벤트 재시도)
  • 병렬 처리
    • 여러 파일 업로드나 분석 작업을 동시에 실행하여 성능 최적화

4) 병렬 워크플로우 설계

 

2. Lambda 함수 설계

1) 사용자 영어 발음 채점 로직

  • 사용자 음성 업로드: Django API → S3 버킷에 업로드 → PostgreSQL에 경로 저장
  • Lambda 트리거: S3 이벤트 → Lambda 함수 실행
  • 타임스탬프 추출: Google Speech-to-Text API 사용
  • 음성 비교 및 채점: Lambda에서 음성 비교 후 채점 결과 생성
  • 채점 결과 저장: Django API를 통해 PostgreSQL에 저장
[tip] 전체 흐름에서의 역할
• Django API
    - 사용자 음성을 업로드하고 PostgreSQL에 경로와 관련 데이터를 저장
    - 표준 음성과 사용자 음성을 매핑하기 위한 메타데이터 관리 (content_type, level, title, sentence 등)
• S3 이벤트와 Lambda
    - Lambda가 트리거되면서 표준 음성과 사용자 음성을 매핑
    - S3에 저장된 경로 정보(user_audio_key와 standard_audio_key)를 사용하여 두 파일을 가져오고, 비교 및 채점 수행
• Lambda → Django API
    - Lambda가 채점 결과를 계산한 후 Django API를 호출하여 PostgreSQL에 결과 저장

 

2) 트리거 설계
2-1) Lambda 함수와 트리거의 차이

  • Lambda 함수: 특정 작업을 수행하는 코드를 정의. 사용자가 정의한 함수로, 입력 데이터를 받아 처리하고 결과를 반환
  • 트리거: Lambda 함수를 실행시키는 조건이나 이벤트를 설정. 예를 들어, S3에 파일이 업로드되거나 특정 시간에 실행되도록 설정할 수 있음

2-2) 트리거 설정 방법

① AWS Management Console에서 트리거 설정

트리거가 설정된 Lambda 함

  1. AWS Lambda 콘솔로 이동
  2. 해당 Lambda 함수를 선택(Step Functions 첫 번째 함수 'Timestamp Generation')
  3. Configuration 탭에서 Triggers 섹션으로 이동
  4. Add Trigger 버튼을 클릭
  5. Trigger configuration에서 S3를 선택하고
    • Bucket: user-audio-file
    • Event type: All object create events
    • Prefix: 필요한 경우 특정 경로를 설정
    • Suffix: 필요한 경우 파일 형식을 설정 (예: .wav)
  6. Add를 클릭하여 저장

② EventBridge를 사용하여 트리거 설정

  • S3 버킷 이벤트 알림 설정
  1. AWS Management Console에서 S3로 이동
  2. 사용자 음성이 업로드되는 S3 버킷(user-audio-file)을 선택
  3. Properties 탭에서 아래로 스크롤하여 Event notifications 섹션으로 이동
  4. Create event notification 버튼을 클릭
  5. Event name: StepFunctionsTrigger
  6. Event types: All object create events를 선택
  7. Destination: Amazon EventBridge를 선택
  8. Save changes를 클릭하여 설정을 저장
  • EventBridge에서 Step Functions 연결
  1. AWS Management Console에서 EventBridge로 이동
  2. Rules 섹션으로 이동 후 Create rule 클릭
  3. Name: TriggerStepFunctionsOnS3Upload
  4. Define pattern에서 Event source를 선택
    • Event source: AWS services
    • AWS service: S3
    • Event type: Object Created
  5. Specific bucket name에 user-audio-file을 입력
  6. Prefix: path/to/ (경로를 지정해 특정 파일 업로드만 트리거할 경우 설정)
  7. Next step 클릭
  8. Target을 Step Functions state machine으로 선택
  9. State machine: 생성한 상태 머신 선택 (MyStateMachine)
  10. Input transformer를 사용하여 이벤트를 상태 머신의 입력 형식으로 변환
{
    "bucket_name": "$.detail.bucket.name",
    "user_audio_key": "$.detail.object.key"
}

  11. Next step → Create rule을 클릭
 

3) Lambda 함수

  • Lambda에서의 로컬 환경 개념
    • AWS Lambda 함수는 실행될 때 AWS에서 제공하는 임시 실행 환경에서 실행됨
    • 이 환경에는 파일을 임시로 저장할 수 있는 디렉토리(/tmp)가 제공됨
    • /tmp는 512MB의 저장 공간만 사용할 수 있으며, Lambda 함수 실행이 종료되면 해당 디렉토리의 모든 데이터는 삭제됨
    • "S3에서 파일 URL 가져와서 바로 Lambda 함수를 실행할 수 있는게 X, 임시 디렉토리에 저장해서 함수를 실행해야함"

① Timestamp 생성 Lambda 함수

import boto3
import os
import re
from google.cloud import speech
from google.cloud.speech_v1 import RecognitionAudio, RecognitionConfig

# S3 클라이언트 생성
s3 = boto3.client('s3')


def split_into_syllables(word):
    """
    단어를 음절로 분리.
    """
    return re.findall(r'[aeiouy]+[^aeiouy]*', word, re.IGNORECASE)


def generate_syllable_timestamps(word, start_time, end_time):
    """
    단어 타임스탬프를 음절 타임스탬프로 변환.
    """
    syllables = split_into_syllables(word)
    duration_per_syllable = (end_time - start_time) / len(syllables)
    syllable_timestamps = []
    current_time = start_time
    for syllable in syllables:
        syllable_timestamps.append({
            "syllable": syllable,
            "start_time": current_time,
            "end_time": current_time + duration_per_syllable
        })
        current_time += duration_per_syllable
    return syllable_timestamps


def transcribe_audio(file_path):
    """
    Google Speech-to-Text API를 사용해 단어별 타임스탬프 추출.
    """
    client = speech.SpeechClient()

    with open(file_path, "rb") as audio_file:
        audio_data = audio_file.read()

    audio = RecognitionAudio(content=audio_data)
    config = RecognitionConfig(
        encoding=RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=16000,
        language_code="en-US",
        enable_word_time_offsets=True
    )
    response = client.recognize(config=config, audio=audio)

    word_timestamps = []
    for result in response.results:
        for word_info in result.alternatives[0].words:
            word_timestamps.append({
                "word": word_info.word,
                "start_time": word_info.start_time.total_seconds(),
                "end_time": word_info.end_time.total_seconds()
            })

    return word_timestamps


def parse_user_audio_key(user_audio_key):
    """
    사용자 음성 경로를 분석하여 표준 음성 경로를 생성하는 데 필요한 정보를 추출.
    """
    parts = user_audio_key.split('/')
    content_type = parts[2]  # conversation, novel, phonics
    level = parts[3].split('_')[1]  # level_1 → 1
    title = parts[4].replace('_', ' ')  # At_the_Toy_Store → At the Toy Store
    line_number = parts[5].split('_')[1].split('.')[0]  # audio_line_1.wav → 1
    return content_type, level, title, line_number


def lambda_handler(event, context):
    """
    AWS Lambda 핸들러 함수.
    """
    # 입력 데이터
    bucket_name = event['bucket_name']
    user_audio_key = event['user_audio_key']

    # 사용자 경로에서 표준 음성 경로 정보 추출
    content_type, level, title, line_number = parse_user_audio_key(user_audio_key)

    # 표준 음성 경로 생성
    standard_audio_key = f"standard-audio-file/{content_type}/level_{level}/{title}/audio_line_{line_number}.wav"

    # 로컬 파일 경로
    user_audio_path = "/tmp/user_audio.wav"
    standard_audio_path = "/tmp/standard_audio.wav"
    google_credentials_path = "/tmp/google_credentials.json"

    # S3에서 파일 다운로드
    secret_key_bucket = 'the-secret-key'
    google_credentials_key = "credentials/epaproject_blar_blar.json"

    s3.download_file(bucket_name, user_audio_key, user_audio_path)
    s3.download_file('standard-audio-file', standard_audio_key, standard_audio_path)
    s3.download_file(secret_key_bucket, google_credentials_key, google_credentials_path)

    # Google API 키 설정
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = google_credentials_path

    # 사용자와 표준 음성의 단어 타임스탬프 추출
    user_word_timestamps = transcribe_audio(user_audio_path)
    standard_word_timestamps = transcribe_audio(standard_audio_path)

    # 음절 타임스탬프 생성
    user_syllable_timestamps = []
    standard_syllable_timestamps = []

    for word_data in user_word_timestamps:
        user_syllable_timestamps.extend(
            generate_syllable_timestamps(
                word_data['word'],
                word_data['start_time'],
                word_data['end_time']
            )
        )

    for word_data in standard_word_timestamps:
        standard_syllable_timestamps.extend(
            generate_syllable_timestamps(
                word_data['word'],
                word_data['start_time'],
                word_data['end_time']
            )
        )

    # 결과 반환
    return {
        "user_word_timestamps": user_word_timestamps,
        "user_syllable_timestamps": user_syllable_timestamps,
        "standard_word_timestamps": standard_word_timestamps,
        "standard_syllable_timestamps": standard_syllable_timestamps
    }

 
② Pitch∙Rhythm 패턴 유사도 Lambda 함수

import boto3
import librosa
import numpy as np
import parselmouth

def preprocess_audio(input_path, target_sr=16000):
    """오디오 파일 읽기 및 리샘플링"""
    y, sr = librosa.load(input_path, sr=None)
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
    return y, target_sr

def extract_sentence_pitch(y, sr):
    """음성 파일에서 피치 추출"""
    snd = parselmouth.Sound(y, sr)
    pitch = snd.to_pitch()
    frame_frequencies = pitch.selected_array["frequency"]
    valid_frequencies = frame_frequencies[frame_frequencies > 0]
    return valid_frequencies if len(valid_frequencies) > 0 else np.array([0])

def calculate_cosine_similarity(vec1, vec2):
    """NumPy를 사용한 코사인 유사도 계산"""
    if len(vec1) == 0 or len(vec2) == 0:
        return 0.0
    vec1 = np.asarray(vec1)
    vec2 = np.asarray(vec2)
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    if norm_vec1 == 0 or norm_vec2 == 0:
        return 0.0
    return dot_product / (norm_vec1 * norm_vec2)

def calculate_pitch_similarity(user_pitch, ref_pitch):
    """피치 패턴 유사도 계산"""
    return calculate_cosine_similarity(user_pitch, ref_pitch)

def lambda_handler(event, context):
    # 입력 데이터
    bucket_name = event['bucket_name']
    user_audio_key = event['user_audio_key']
    standard_audio_key = event['standard_audio_key']

    # 로컬 경로
    user_audio_path = "/tmp/user_audio.wav"
    standard_audio_path = "/tmp/standard_audio.wav"

    # S3에서 사용자 및 표준 음성 다운로드
    s3 = boto3.client('s3')
    s3.download_file(bucket_name, user_audio_key, user_audio_path)
    s3.download_file(bucket_name, standard_audio_key, standard_audio_path)

    # 사용자와 표준 음성의 피치 추출
    y_user, sr_user = preprocess_audio(user_audio_path)
    y_ref, sr_ref = preprocess_audio(standard_audio_path)

    user_pitch = extract_sentence_pitch(y_user, sr_user)
    ref_pitch = extract_sentence_pitch(y_ref, sr_ref)

    # 피치 패턴 유사도 계산
    pitch_similarity = calculate_pitch_similarity(user_pitch, ref_pitch)

    # 결과 반환
    return {
        "pitch_similarity": pitch_similarity
    }

 

 ③ 발화 속도 비율, 발화 중단 패턴 유사도, 잘못 인식된 단어 비율/리스트 Lambda 함수

import boto3
import numpy as np

def calculate_speed_ratio(user_timestamps, ref_timestamps):
    """발화 속도 비율 계산"""
    if len(user_timestamps) == 0 or len(ref_timestamps) == 0:
        return 0.0

    user_duration = user_timestamps[-1]["end_time"] - user_timestamps[0]["start_time"]
    ref_duration = ref_timestamps[-1]["end_time"] - ref_timestamps[0]["start_time"]

    user_speed = len(user_timestamps) / user_duration if user_duration > 0 else 0
    ref_speed = len(ref_timestamps) / ref_duration if ref_duration > 0 else 0

    return user_speed / ref_speed if ref_speed > 0 else 0

def calculate_pause_pattern_similarity(user_timestamps, ref_timestamps):
    """중단 패턴 유사도 계산"""
    user_pauses = [word["end_time"] - word["start_time"] for word in user_timestamps]
    ref_pauses = [word["end_time"] - word["start_time"] for word in ref_timestamps]

    return calculate_cosine_similarity(user_pauses, ref_pauses)

def calculate_cosine_similarity(vec1, vec2):
    """벡터 간 코사인 유사도 계산 (NumPy만 사용)"""
    if len(vec1) == 0 or len(vec2) == 0:
        return 0.0
    vec1 = np.asarray(vec1)
    vec2 = np.asarray(vec2)

    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)

    if norm_vec1 == 0 or norm_vec2 == 0:
        return 0.0

    return dot_product / (norm_vec1 * norm_vec2)

def lambda_handler(event, context):
    # 입력 데이터
    user_timestamps = event["user_timestamps"]
    ref_timestamps = event["ref_timestamps"]

    # 발화 속도 비율 계산
    speed_ratio = calculate_speed_ratio(user_timestamps, ref_timestamps)

    # 중단 패턴 유사도 계산
    pause_similarity = calculate_pause_pattern_similarity(user_timestamps, ref_timestamps)

    # 잘못 인식된 단어 비율 계산
    user_words = [word["word"] for word in user_timestamps]
    ref_words = [word["word"] for word in ref_timestamps]

    mispronounced_words = list(set(user_words) - set(ref_words))
    total_words = len(ref_words)
    mispronounced_ratio = len(mispronounced_words) / total_words if total_words > 0 else 0

    # 결과 반환
    return {
        "speed_ratio": speed_ratio,
        "pause_similarity": pause_similarity,
        "mispronounced_words": mispronounced_words,
        "mispronounced_ratio": mispronounced_ratio
    }

 
④ 채점 결과 PostgreSQL 저장 Lambda  함수

import boto3
import psycopg2
import json

def load_config_from_s3(bucket_name, config_key):
    """
    S3에서 설정 파일(JSON 형식)을 가져오기
    """
    s3 = boto3.client('s3')
    config_path = "/tmp/config.json"

    # S3에서 파일 다운로드
    s3.download_file(bucket_name, config_key, config_path)

    # JSON 로드
    with open(config_path, 'r') as file:
        config = json.load(file)
    
    return config

def insert_score_to_db(db_config, user_data):
    """
    PostgreSQL 데이터베이스에 채점 결과를 삽입
    """
    try:
        # PostgreSQL 연결
        connection = psycopg2.connect(
            dbname=db_config["NAME"],
            user=db_config["USER"],
            password=db_config["PASSWORD"],
            host=db_config["HOST"],
            port=db_config["PORT"]
        )
        cursor = connection.cursor()

        # SQL 삽입 쿼리
        insert_query = """
        INSERT INTO app_userpronunciation (
            user_id, content_type_id, object_id, audio_file, pitch_similarity, 
            rhythm_similarity, speed_ratio, pause_similarity, 
            mispronounced_words, mispronounced_ratio, feedback, status, created_at
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
        """
        cursor.execute(insert_query, (
            user_data['user_id'],
            user_data['content_type_id'],
            user_data['object_id'],
            user_data['audio_file'],
            user_data['pitch_similarity'],
            user_data['rhythm_similarity'],
            user_data['speed_ratio'],
            user_data['pause_similarity'],
            json.dumps(user_data['mispronounced_words']),  # 리스트를 JSON 형식으로 저장
            user_data['mispronounced_ratio'],
            user_data['feedback'],
            'completed'
        ))

        # 변경 사항 커밋
        connection.commit()
        cursor.close()
        connection.close()

        return {"status": "success", "message": "Score inserted successfully"}
    except Exception as e:
        return {"status": "error", "message": str(e)}

def lambda_handler(event, context):
    """
    Lambda 핸들러 함수
    """
    # S3에서 PostgreSQL 설정 로드
    bucket_name = "the-secret-key"  # S3 버킷 이름
    config_key = "postgres_config.json"  # S3에 저장된 설정 파일 이름
    db_config = load_config_from_s3(bucket_name, config_key)

    # Lambda 호출 이벤트에서 데이터 추출
    user_data = {
        "user_id": event.get("user_id"),
        "content_type_id": event.get("content_type_id"),
        "object_id": event.get("object_id"),
        "audio_file": event.get("audio_file"),
        "pitch_similarity": event.get("pitch_similarity"),
        "rhythm_similarity": event.get("rhythm_similarity"),
        "speed_ratio": event.get("speed_ratio"),
        "pause_similarity": event.get("pause_similarity"),
        "mispronounced_words": event.get("mispronounced_words", []),
        "mispronounced_ratio": event.get("mispronounced_ratio", 0.0),
        "feedback": event.get("feedback", "")
    }

    # PostgreSQL에 채점 결과 저장
    result = insert_score_to_db(db_config, user_data)
    return result

 
 

 

📙 내일 일정

  • 중간 프로젝트