본문 바로가기
TIL _Today I Learned/2024.12

[DAY 106] 최종 프로젝트_ Labeling Pipeline

by gamdong2 2024. 12. 18.
[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.12.18

📕 프로젝트 작업 내역

  • LLM을 활용한 라벨링 자동화
  • GraphRAG을 활용한 라벨링 자동화

 

📗 수행 결과

1. LLM을 활용한 라벨링 자동화

1) 라벨링 자동화 파이프라인

문제 입력 → 대분류 추출 (GPT-4) → 대분류에 해당하는 JSON 파일 S3에서 로드 → 로컬에 JSON 파일 저장 → 최하위 분류 추출 (GPT-4) → 최하위 분류 출력

 

2) 로드맵 파일(JSON) 유형

S3 버킷(big9-project-02-roadmap-bucket) 에 저장된 파일 유형을 다음 두 가지로 예상

 

  (i) 추출된 대분류와 매핑이 가능한 파일명(main.py)

  • big9-project-02-roadmap-bucket/roadmap_2022
    • 01_num_cal.json
    • 02_change_of_relationship.json
    • 03_shape_meas.json
    • 04_data_and_possibility.json

  (ii) 추출된 대분류와 매핑이 불가능한 파일명(main.py)

  • big9-project-02-roadmap-bucket/roadmap_2022_test
    • 01_test
    • 02_test
    • 03_test
    • 04_test

3) 라벨링 소스코드 구성

  • json_utils.py: JSON 데이터를 계층 구조로 요약하여 문자열로 반환
# JSON 데이터를 계층 구조로 요약하여 문자열로 반환
def summarize_json_hierarchy(data, level=1):
    summary = ""
    indent = "  " * level
    if isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, dict) and "name" in value:
                summary += f"{indent}- {value['name']}\n"
                if "children" in value:
                    summary += summarize_json_hierarchy(value["children"], level + 1)
            elif isinstance(value, dict):
                summary += summarize_json_hierarchy(value, level + 1)
            else:
                summary += f"{indent}- {value}\n"
    return summary
  • openai_utils.py: LLM을 통해 주어진 문제에서 대분류를 추출
import openai
import os
from dotenv import load_dotenv
from yura_json_utils import summarize_json_hierarchy

# .env 파일 로드 및 OpenAI API 키 설정
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

"""
GPT를 통해 대분류 및 최하위 분류 추출
- problem_text: 문제 텍스트 입력
- json_data: 로드맵 데이터 계층 정보
"""

# GPT-4를 사용하여 주어진 문제에서 대분류를 추출
def extract_category_from_problem(problem_text):
    prompt = (
        "다음 수학 문제에서 가장 적합한 대분류를 추출하세요. "
        "대분류는 '변화와 관계', '도형과 측정', '자료와 가능성' 중 하나를 먼저 확인하고, "
        "그래프나 표와 관련된 문제는 우선적으로 '자료와 가능성'으로 분류하고, "
        "길이, 넓이, 들이, 무게 등의 단위 관련 문제는 '도형과 측정'으로 분류하며, "
        "배열, 규칙, 비율과 관련된 문제는 '변화와 관계'로 분류하세요. "
        "어디에도 속하지 않으면 '수와 연산'으로 분류하세요. "
        f"문제: {problem_text}\n대분류:"
    )
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt},
            ],
            max_tokens=50,
            temperature=0.5,
        )
        category = response.choices[0].message["content"].strip()
        valid_categories = ['변화와 관계', '도형과 측정', '자료와 가능성', '수와 연산']
        return category if category in valid_categories else '수와 연산'
    except Exception as e:
        print(f"[ERROR] LLM 추출 실패: {e}")
        return '수와 연산'

# GPT-4를 사용하여 주어진 JSON 데이터에서 문제의 최하위 분류를 추출
def extract_leaf_category_from_problem(problem_text, json_data):
    json_summary = summarize_json_hierarchy(json_data)
    prompt = (
        "다음은 특정 대분류에 속하는 수학 학습 주제 계층 구조입니다:\n"
        f"{json_summary}\n\n"
        "그리고 다음은 문제입니다:\n"
        f"{problem_text}\n\n"
        "이 문제에 적합한 최하위 분류(학습 주제)를 반환하세요."
    )
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt},
            ],
            max_tokens=200,
            temperature=0.5,
        )
        return response.choices[0].message["content"].strip()
    except Exception as e:
        print(f"[ERROR] 최하위 분류 추출 실패: {e}")
        return None
  • s3_utils.py: S3와 상호작용(파일 로드, 다운로드, 대분류 검색)
import openai
import os
from dotenv import load_dotenv
from yura_json_utils import summarize_json_hierarchy

# .env 파일 로드 및 OpenAI API 키 설정
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

"""
GPT를 통해 대분류 및 최하위 분류 추출
- problem_text: 문제 텍스트 입력
- json_data: 로드맵 데이터 계층 정보
"""

# GPT-4를 사용하여 주어진 문제에서 대분류를 추출
def extract_category_from_problem(problem_text):
    prompt = (
        "다음 수학 문제에서 가장 적합한 대분류를 추출하세요. "
        "대분류는 '변화와 관계', '도형과 측정', '자료와 가능성' 중 하나를 먼저 확인하고, "
        "그래프나 표와 관련된 문제는 우선적으로 '자료와 가능성'으로 분류하고, "
        "길이, 넓이, 들이, 무게 등의 단위 관련 문제는 '도형과 측정'으로 분류하며, "
        "배열, 규칙, 비율과 관련된 문제는 '변화와 관계'로 분류하세요. "
        "어디에도 속하지 않으면 '수와 연산'으로 분류하세요. "
        f"문제: {problem_text}\n대분류:"
    )
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt},
            ],
            max_tokens=50,
            temperature=0.5,
        )
        category = response.choices[0].message["content"].strip()
        valid_categories = ['변화와 관계', '도형과 측정', '자료와 가능성', '수와 연산']
        return category if category in valid_categories else '수와 연산'
    except Exception as e:
        print(f"[ERROR] LLM 추출 실패: {e}")
        return '수와 연산'

# GPT-4를 사용하여 주어진 JSON 데이터에서 문제의 최하위 분류를 추출
def extract_leaf_category_from_problem(problem_text, json_data):
    json_summary = summarize_json_hierarchy(json_data)
    prompt = (
        "다음은 특정 대분류에 속하는 수학 학습 주제 계층 구조입니다:\n"
        f"{json_summary}\n\n"
        "그리고 다음은 문제입니다:\n"
        f"{problem_text}\n\n"
        "이 문제에 적합한 최하위 분류(학습 주제)를 반환하세요."
    )
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt},
            ],
            max_tokens=200,
            temperature=0.5,
        )
        return response.choices[0].message["content"].strip()
    except Exception as e:
        print(f"[ERROR] 최하위 분류 추출 실패: {e}")
        return None
  • problem_processor.py: 문제 텍스트를 처리하여 대분류 및 최하위 분류 추출
import time
import os
from yura_s3_utils import load_json_from_s3, search_s3_files_for_category, download_file_from_s3
from yura_openai_utils import extract_category_from_problem, extract_leaf_category_from_problem

"""
문제 텍스트를 처리하여 대분류 및 최하위 분류 추출
- problem_text: 문제 텍스트
- bucket_name: JSON 파일이 저장된 S3 버킷 이름
- category_map: 대분류와 S3 파일 매핑
- prefix: S3 경로 접두사
"""

def process_math_problem(problem_text, bucket_name, category_map, prefix, model="gpt-4"):

    # Step 1: 문제에서 대분류를 추출
    start_time = time.time()
    category = extract_category_from_problem(problem_text)
    category_time = time.time() - start_time
    print(f"[INFO] 추출된 대분류: {category}")

    # Step 2: 대분류와 매핑된 JSON 파일 로로드 시도
    json_data = None
    if category in category_map:
        s3_key = os.path.join(prefix, category_map[category])
        json_data = load_json_from_s3(bucket_name, s3_key)
        if json_data:
            print(f"[INFO] 대분류 '{category}'와 매핑된 JSON 파일을 로드했습니다.")
        else:
            print(f"[INFO] 대분류 '{category}'에 해당하는 JSON 파일 로드 실패.")

    # Step 3: 매핑 실패 시 S3 전체 검색
    if not json_data:
        print("[INFO] 매핑 실패. S3 파일을 검색합니다...")
        search_start_time = time.time()
        json_data = search_s3_files_for_category(bucket_name, prefix, category)
        search_time = time.time() - search_start_time
        category_time += search_time

        if not json_data:
            print(f"[ERROR] S3 내 파일에서 대분류 '{category}'에 해당하는 데이터를 찾을 수 없습니다.")
            return category, None, category_time, 0

    # Step 4: 최하위 분류 추출
    start_time = time.time()
    leaf_category = extract_leaf_category_from_problem(problem_text, json_data)
    leaf_category_time = time.time() - start_time

    # Step 5: 결과 반환
    return category, leaf_category, category_time, leaf_category_time
  • main.py: 전체 프로세스를 실행하여 결과 출력
    • roadmap_2022: 추출된 대분류와 매핑이 가능한 JSON 파일이 들어있는 폴더
    • roadmap_2022_test: 추출된 대분류와 매핑이 불가능한 JSON 파일이 들어있는 폴더
from yura_s3_utils import get_random_problem_files
import boto3
from yura_problem_processor import process_math_problem

"""
전체 프로세스를 실행, 결과 출력
"""

def process_multiple_problems():
    # 텍스트 파일 (문제 파일) 관련 정보
    text_bucket_name = "big9-project-02-training-bucket"  # 문제 텍스트 파일이 위치한 S3 버킷
    text_problem_path = "model_train/training/labels/"    # 문제 텍스트 파일이 위치한 S3 폴더

    # JSON 파일 (로드맵 파일) 관련 정보
    json_bucket_name = "big9-project-02-roadmap-bucket"   # JSON 파일이 위치한 S3 버킷
    json_prefix = "roadmap_2022_test/"                    # JSON 파일이 위치한 S3 폴더 (or "roadmap_2022")
    category_map = {
        "수와 연산": "01_num_cal.json",
        "변화와 관계": "02_change_of_relationship.json",
        "도형과 측정": "03_shape_meas.json",
        "자료와 가능성": "04_data_and_possibility.json",
    }

    # 텍스트 파일에서 최대 10개의 문제 파일만 가져오기
    random_files = get_random_problem_files(
        bucket_name=text_bucket_name, 
        path=text_problem_path, 
        num_files=10
    )

    total_time = 0
    results = []

    for file in random_files:
        s3 = boto3.client("s3")
        problem_text = ""

        try:
            # S3에서 문제 텍스트 파일 가져오기
            response = s3.get_object(Bucket=text_bucket_name, Key=file)
            content = response["Body"].read().decode("utf-8")
 
            # 문제 텍스트 추출: [question_text] 부분만 가져오기
            if "[question_text]" in content:
                # '[question_text]' 이후 첫 번째 줄 추출
                problem_text = content.split("[question_text]")[1].strip().split("\n")[0].strip()
                print(f"[INFO] 문제 텍스트 추출 성공: {problem_text}")
            else:
                print(f"[WARNING] {file}에 [question_text] 섹션이 없습니다.")
                continue
        except Exception as e:
            print(f"[ERROR] {file}에서 문제 텍스트를 불러오는 데 실패했습니다: {e}")
            continue
      
        # 문제 처리
        category, leaf_category, category_time, leaf_category_time = process_math_problem(
            problem_text=problem_text, 
            bucket_name=json_bucket_name, 
            category_map=category_map, 
            prefix=json_prefix
        )

        # 결과 저장
        result = {
            "문제 텍스트": problem_text,
            "대분류": category,
            "최하위 분류": leaf_category,
            "대분류 처리 시간": category_time,
            "최하위 분류 처리 시간": leaf_category_time,
        }

        results.append(result)
        total_time += category_time + leaf_category_time

    # 평균 처리 시간 계산
    average_time = total_time / len(results) if results else 0

    # 결과 출력
    for result in results:
        print(f"문제 텍스트: {result['문제 텍텍스트']}")
        print(f"대분류: {result['대분류']}")
        print(f"최하위 분류: {result['최하위 분류']}")
        print(f"대분류 처리 시간: {result['대분류 처리 시간']:.4f}초")
        print(f"최하위 분류 처리 시간: {result['최하위 분류 처리 시간']:.4f}초\n")

    print(f"[INFO] 평균 처리 시간: {average_time:.4f}초")

if __name__ == "__main__":
    process_multiple_problems()

 

4) 라벨링 결과

  • 러닝 타임 비교 (n=10)
  JSON 파일 매핑 될 때 JSON 파일 매핑 안될 때
대분류 처리 시간 평균(s) 1.01 ± 0.39 1.05 ± 0.24
최하위 분류 처리 시간 평균(s) 4.74 ± 2.12 3.92 ± 2.11
[tip] 소스코드를 하나의 파일로 나타냈을 때 러닝 타임 비교 (n=10)
  JSON 파일 매핑 될 때 JSON 파일 매핑 안될 때
대분류 처리 시간 평균(s) 1.30 ± 1.01 0.91 ± 0.20
최하위 분류 처리 시간 평균(s) 4.03 ± 2.30 4.59 ± 2.39
import openai
import boto3
import json
import os
import time
import random
from functools import wraps
from dotenv import load_dotenv

# .env 파일 로드
load_dotenv()

# OpenAI API 키 설정
openai.api_key = os.getenv("OPENAI_API_KEY")

# JSON 로드맵 구성: 대분류(Category) -> -> -> 최하위 분류
"""
# 라벨링 파이프라인
문제 입력
→ 대분류 추출 (GPT-4)
→ 대분류에 해당하는 JSON 파일 S3에서 로드
→ 로컬에 JSON 파일 저장
→ 최하위 분류 추출 (GPT-4)
→ 최하위 분류 출력
"""

# S3에서 JSON 파일 로드
# 지정된 S3 버킷과 파일 경로를 기반으로 JSON 파일을 로드
# 로드한 파일의 내용을 JSON 형식으로 변환하여 반환

def load_json_from_s3(bucket_name, file_path):
    s3 = boto3.client("s3")
    try:
        response = s3.get_object(Bucket=bucket_name, Key=file_path)
        content = response["Body"].read().decode("utf-8")
        return json.loads(content)
    except Exception as e:
        print(f"[ERROR] S3에서 파일 로드 실패: {e}")
        return None

# LLM을 사용해 대분류 추출
# 입력된 수학 문제를 기반으로 GPT 모델을 사용해 대분류를 추출

def extract_category_from_problem(problem_text):
    prompt = (
        "다음 수학 문제에서 가장 적합한 대분류를 추출하세요. "
        "대분류는 '변화와 관계', '도형과 측정', '자료와 가능성' 중 하나를 먼저 확인하고, "
        "그래프나 표와 관련된 문제는 우선적으로 '자료와 가능성'으로 분류하고, "
        "길이, 넓이, 들이, 무게 등의 단위 관련 문제는 '도형과 측정'으로 분류하며, "
        "배열, 규칙, 비율과 관련된 문제는 '변화와 관계'로 분류하세요. "
        "어디에도 속하지 않으면 '수와 연산'으로 분류하세요. "
        f"문제: {problem_text}\n대분류:"
    )
    try:
        start_time = time.time()
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "system", "content": "You are a helpful assistant."},
                      {"role": "user", "content": prompt}],
            max_tokens=50,
            temperature=0.5,
        )
        category_time = time.time() - start_time
        category = response.choices[0].message["content"].strip()
        return category, category_time
    except Exception as e:
        print(f"[ERROR] LLM 추출 실패: {e}")
        return None, 0

# LLM을 사용한 최하위 분류 추출 함수
# 주어진 문제 텍스트와 JSON 데이터를 기반으로 GPT를 사용해 최하위 분류를 추출

def extract_leaf_category_from_problem(text, json_data):
    json_summary = summarize_json_hierarchy(json_data)
    prompt = (
        "다음은 특정 대분류에 속하는 수학 학습 주제 계층 구조입니다:\n"
        f"{json_summary}\n\n"
        "그리고 다음은 문제입니다:\n"
        f"{text}\n\n"
        "이 문제에 적합한 최하위 분류(학습 주제)를 반환하세요."
    )
    try:
        start_time = time.time()
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "system", "content": "You are a helpful assistant."},
                      {"role": "user", "content": prompt}],
            max_tokens=200,
            temperature=0.5,
        )
        leaf_category_time = time.time() - start_time
        return response.choices[0].message["content"].strip(), leaf_category_time
    except Exception as e:
        print(f"[ERROR] LLM 추출 실패: {e}")
        return None, 0

# JSON 계층 구조 요약 함수
# JSON 데이터를 계층 구조로 요약하여 문자열로 변환

def summarize_json_hierarchy(data, level=1):
    summary = ""
    indent = "  " * level
    if isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, dict) and "name" in value:
                summary += f"{indent}- {value['name']}\n"
                if "children" in value:
                    summary += summarize_json_hierarchy(value["children"], level + 1)
            elif isinstance(value, dict):
                summary += summarize_json_hierarchy(value, level + 1)
            else:
                summary += f"{indent}- {value}\n"
    return summary

# S3에서 JSON 파일 다운로드
# 지정된 S3 버킷의 파일을 로컬 경로로 다운로드

def download_file_from_s3(bucket_name, s3_key, local_path):
    s3 = boto3.client('s3')
    try:
        s3.download_file(bucket_name, s3_key, local_path)
        print(f"[INFO] S3에서 {s3_key}를 {local_path}에 다운로드했습니다.")
    except Exception as e:
        print(f"[ERROR] S3 파일 다운로드 실패: {e}")


def load_json_data(local_path):
    try:
        with open(local_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return data
    except Exception as e:
        print(f"[ERROR] JSON 파일 로드 실패: {e}")
        return None

# 대분류 매핑 실패 시 S3의 모든 파일을 탐색하도록 설정하는 데코레이터

def ensure_category_or_fallback(category):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            bucket_name, s3_key, local_path = func(*args, **kwargs)
            if bucket_name and s3_key and local_path:
                return bucket_name, s3_key, local_path

            print(f"[INFO] 대분류 매핑 실패. S3 내 모든 파일을 검색합니다.")
            s3 = boto3.client("s3")
            result = s3.list_objects_v2(Bucket="big9-project-02-roadmap-bucket", Prefix="roadmap_2022_test/")
            if "Contents" in result:
                for obj in result["Contents"]:
                    file_path = obj["Key"]
                    local_path = os.path.join("/tmp", os.path.basename(file_path))
                    download_file_from_s3("big9-project-02-roadmap-bucket", file_path, local_path)
                    json_data = load_json_data(local_path)
                    if json_data:
                        summary = summarize_json_hierarchy(json_data)
                        if category in summary:
                            return "big9-project-02-roadmap-bucket", file_path, local_path
            print(f"[ERROR] S3 내 파일에서 대분류 '{category}'에 해당하는 데이터를 찾을 수 없습니다.")
            return None, None, None
        return wrapper
    return decorator

# 대분류와 파일 매핑
@ensure_category_or_fallback("수와 연산")
def get_file_path_from_category(category):
    category_file_map = {
        "수와 연산": "01_num_cal.json",
        "변화와 관계": "02_change_of_relationship.json",
        "도형과 측정": "03_shape_meas.json",
        "자료와 가능성": "04_data_and_possibility.json",
    }
    file_name = category_file_map.get(category)
    if file_name:
        s3_key = os.path.join("roadmap_2022_test/", file_name)
        local_path = os.path.join("/tmp", file_name)
        return "big9-project-02-roadmap-bucket", s3_key, local_path
    return None, None, None

# 문제 처리
# S3에서 txt 파일을 불러와 라벨링 결과를 도출

def get_random_problem_file(bucket_name, path):
    s3 = boto3.client("s3")
    try:
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=path)
        file_list = [
            obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".txt")
        ]
        if file_list:
            return random.choice(file_list)
        else:
            print("[ERROR] 텍스트 파일이 존재하지 않습니다.")
            return None
    except Exception as e:
        print(f"[ERROR] S3에서 텍스트 파일을 가져오는 중 오류 발생: {e}")
        return None

def process_single_problem():
    text_bucket_name = "big9-project-02-training-bucket"
    text_problem_path = "model_train/training/labels/"

    category_map = {
        "수와 연산": "01_num_cal.json",
        "변화와 관계": "02_change_of_relationship.json",
        "도형과 측정": "03_shape_meas.json",
        "자료와 가능성": "04_data_and_possibility.json",
    }

    random_file = get_random_problem_file(
        bucket_name=text_bucket_name,
        path=text_problem_path
    )

    if random_file:
        s3 = boto3.client("s3")
        problem_text = ""

        try:
            response = s3.get_object(Bucket=text_bucket_name, Key=random_file)
            content = response["Body"].read().decode("utf-8")
            if "[question_text]" in content:
                problem_text = content.split("[question_text]")[1].strip().split("\n")[0].strip()
                print(f"[INFO] 문제 텍스트 추출 성공: {problem_text}")
            else:
                print(f"[WARNING] {random_file}에 [question_text] 섹션이 없습니다.")
                return
        except Exception as e:
            print(f"[ERROR] {random_file}에서 문제 텍스트를 불러오는 데 실패했습니다: {e}")
            return

        category, category_time = extract_category_from_problem(problem_text)
        bucket_name, s3_key, local_path = get_file_path_from_category(category)

        if bucket_name and s3_key and local_path:
            download_file_from_s3(bucket_name, s3_key, local_path)
            json_data = load_json_data(local_path)

            if json_data:
                leaf_category, leaf_category_time = extract_leaf_category_from_problem(problem_text, json_data)
                print(f"[INFO] 대분류: {category} (처리 시간: {category_time:.2f}초)")
                print(f"[INFO] 최하위 분류: {leaf_category} (처리 시간: {leaf_category_time:.2f}초)")
            else:
                print(f"[INFO] 적절한 최하위 분류를 찾지 못했습니다.")

if __name__ == "__main__":
    process_single_problem()

 

 

 

📙 내일 일정

  • 최종 프로젝트