본문 바로가기
TIL _Today I Learned/2024.12

[DAY 109] 최종 프로젝트_ Labeling Pipeline 자동화

by gamdong2 2024. 12. 23.
[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.12.23

📕 프로젝트 작업 내역

  • Labeling Pipeline 자동화

 

📗 수행 결과

1. Labeling Pipeline 자동화

1) 라벨링 자동화 파이프라인

사용자 문제 업로드 (Django API) → S3 업로드 → Lambda 함수 실행 (S3 이벤트 트리거)  → EC2 내 라벨링 파이프라인 실행 (YOLO > OCR > LLM) → 라벨링 결과 저장 (MongoDB) → 사용자 응답 (Django API)

 
2) Django → S3 업로드

S3 버킷에 업로드된 파일 유형을 다음 두 가지로 예상
 
  (i) 텍스트 업로드 (Django 에서 .txt 로 변환 후 업로드)

  • big9-project-02-question-bucket/text

  (ii) 이미지 파일 업로드 (.png or .jpeg)

  • big9-project-02-question-bucket/image

 
3) S3 업로드 → Lambda 함수 실행 (S3 이벤트 트리거)

(1) Lambda 함수 생성
(i) 텍스트 파일 업로드

  • lambda 함수명: start_llm

(ii) 이미지 파일 업로드

  • lambda 함수명: start_yolo_to_ocr

 
(2) S3 이벤트 알림 생성
(i) 텍스트 파일 업로드

  • S3 버킷명: big9-project-02-question-bucket
  • 폴더명: image

(ii) 이미지 파일 업로드

  • S3 버킷명: big9-project-02-question-bucket
  • 폴더명: text

(3) S3 업로드 → Lambda 함수 트리거 테스트
(i) 텍스트 파일 업로드

  • S3에 텍스트 파일(text/.txt) 업로드
  • Lambda 콘솔 → 모니터링 → CloudWatch → 로그 스트림 → 로그 이벤트 확인

(ii) 이미지 파일 업로드

  • S3에 이미지 파일(image/.png) 업로드
  • Lambda 콘솔 → 모니터링 → CloudWatch → 로그 스트림 → 로그 이벤트 확인

 
4) EC2 내 라벨링 폴더 생성

(1) YOLO/OCR 실행 스크립트 생성

  • 폴더 경로: /home/ubuntu/labeling
  • 파일명: yolo_to_ocr.py
import boto3
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from ultralytics import YOLO
import json
from PIL import Image
import uuid

# S3 설정
s3_client = boto3.client('s3')
MODEL_BUCKET_NAME = "big9-project-02-model-bucket"
YOLO_MODEL_PATH = "yolov8_test.pt"
OCR_MODEL_PATH = "trocr_test.pt"

IMAGE_BUCKET_NAME = "big9-project-02-question-bucket/image"
LOCAL_IMAGE_PATH = "/tmp/temp_image.jpg"
OUTPUT_JSON_PATH = "/tmp/result.json"

# 1. YOLO 모델 로드
def load_yolo_model():
    local_yolo_path = "/tmp/yolo_model.pt"
    s3_client.download_file(MODEL_BUCKET_NAME, YOLO_MODEL_PATH, local_yolo_path)
    print(f"YOLO model downloaded to {local_yolo_path}")
    model = YOLO(local_yolo_path)
    return model

# 2. Fine-tuned TrOCR 모델 로드
def load_finetuned_trocr_model():
    local_trocr_path = "/tmp/ocr_model.pt"
    s3_client.download_file(MODEL_BUCKET_NAME, OCR_MODEL_PATH, local_trocr_path)
    print(f"Fine-tuned OCR model downloaded to {local_trocr_path}")
    processor = TrOCRProcessor.from_pretrained(local_trocr_path)
    model = VisionEncoderDecoderModel.from_pretrained(local_trocr_path)
    return processor, model

# 3. S3에서 이미지 다운로드
def download_image_from_s3(bucket_name, key, local_path):
    s3_client.download_file(bucket_name, key, local_path)
    print(f"Image downloaded from S3: {key} to {local_path}")
    return local_path

# 4. YOLO로 바운딩 박스 추출
def extract_bboxes(yolo_model, image_path):
    # YOLO 모델로 바운딩 박스 추출
    results = yolo_model(image_path)  # YOLO는 이미 640x640으로 리사이즈
    bboxes = []

    for result in results[0].boxes:
        if int(result.cls.item()) == 0:  # 클래스가 0인 경우만 처리
            bboxes.append((
                0,  # class_id
                result.xywh[0][0].item(),  # x_center (0~1)
                result.xywh[0][1].item(),  # y_center (0~1)
                result.xywh[0][2].item(),  # width (0~1)
                result.xywh[0][3].item()   # height (0~1)
            ))
    return bboxes

# 5. TrOCR로 텍스트 추출
def extract_text_from_bboxes(processor, model, image_path, bboxes):
    # TrOCR 모델로 바운딩 박스를 이용해 텍스트 추출
    image = Image.open(image_path).resize((384, 384))  # TrOCR 입력 크기로 리사이즈
    question_text = []

    for bbox in bboxes:
        _, x_center, y_center, width, height = bbox

        # 상대 좌표를 TrOCR 입력 크기에 맞게 변환
        x_min = int((x_center - width / 2) * 384)
        y_min = int((y_center - height / 2) * 384)
        x_max = int((x_center + width / 2) * 384)
        y_max = int((y_center + height / 2) * 384)

        # 텍스트 추출
        cropped_image = image.crop((x_min, y_min, x_max, y_max))
        inputs = processor(images=cropped_image, return_tensors="pt").pixel_values
        outputs = model.generate(inputs)
        text = processor.batch_decode(outputs, skip_special_tokens=True)[0]
        question_text.append(text.strip())
    return question_text

# 6. JSON 저장
def save_to_json(image_id, bboxes, question_text, output_path):
    data = {
        "image_id": image_id,
        "bboxes": bboxes,  # YOLO에서 제공한 class_id 포함한 바운딩 박스
        "question_text": " ".join(question_text)  # 추출된 텍스트를 공백으로 연결
    }
    with open(output_path, "w", encoding="utf-8") as json_file:
        json.dump(data, json_file, ensure_ascii=False, indent=4)
    print(f"Saved JSON to {output_path}")
    return output_path

# 메인 함수
def main():
    # 1. 고유 image_id 생성
    image_id = str(uuid.uuid4())  # UUID로 고유 ID 생성

    # 2. S3 이미지 경로 설정
    image_key = f"{image_id}.jpg"  # 이미지 파일 이름 생성

    # 3. 모델 로드
    yolo_model = load_yolo_model()
    processor, trocr_model = load_finetuned_trocr_model()

    # 4. S3에서 이미지 다운로드
    local_image_path = download_image_from_s3(IMAGE_BUCKET_NAME, image_key, LOCAL_IMAGE_PATH)

    # 5. YOLO로 바운딩 박스 추출
    bboxes = extract_bboxes(yolo_model, local_image_path)

    # 6. TrOCR로 텍스트 추출
    if bboxes:
        question_text_list = extract_text_from_bboxes(processor, trocr_model, local_image_path, bboxes)
        # JSON 저장
        save_to_json(image_id, bboxes, question_text_list, OUTPUT_JSON_PATH)
    else:
        print("No valid bounding boxes (class 0) found.")

# 실행
if __name__ == "__main__":
    main()

 
(2) LLM 실행 스크립트 생성

  • 폴더 경로: /home/ubuntu/labeling/LLM
  • 파일명
    • .env
    • json_utils.py
    • openai_utils.py
    • problem_processor.py
    • s3_utils.py
    • main_text.py 
# main_text.py: MongoDB와 연결 테스트 용
import os
from dotenv import load_dotenv
from pymongo import MongoClient

# .env 파일 로드
load_dotenv()

# 환경 변수에서 MongoDB 정보 가져오기
mongo_user = os.getenv("MONGO_USER")
mongo_password = os.getenv("MONGO_PASSWORD")
mongo_host = os.getenv("MONGO_HOST")
mongo_port = os.getenv("MONGO_PORT")
mongo_db = os.getenv("MONGO_DB")
collection_name = os.getenv("MONGO_COLLECTION_NAME")

# MongoDB URI 구성
mongo_uri = f"mongodb://{mongo_user}:{mongo_password}@{mongo_host}:{mongo_port}/?authSource=admin"

# MongoDB 클라이언트 연결
mongo_client = MongoClient(mongo_uri)
db = mongo_client[mongo_db]
collection = db[collection_name]

# MongoDB 연결 테스트
try:
    mongo_client.admin.command("ping")
    print(f"[INFO] Connected to MongoDB Database: {mongo_db}, Collection: {collection_name}")
except Exception as e:
    print(f"[ERROR] MongoDB connection failed: {e}")
    exit()

# MongoDB에 데이터 삽입 테스트
try:
    test_data = {"key": "value", "status": "test"}
    result = collection.insert_one(test_data)
    print(f"[INFO] Test data inserted with ID: {result.inserted_id}")
except Exception as e:
    print(f"[ERROR] Failed to insert test data: {e}")
finally:
    mongo_client.close()

 

5) S3 - Lambda 함수 - EC2 연결

(1) Lambda 에서 SSM으로 EC2 접속 및 스크립트 실행

# 기존에 EC2 에 SSM Agent 설치한 상태
# SSM Agent 활성화 및 시작
sudo systemctl enable snap.amazon-ssm-agent.amazon-ssm-agent.service
sudo systemctl start snap.amazon-ssm-agent.amazon-ssm-agent.service
# SSM Agent 상태 확인
sudo systemctl status snap.amazon-ssm-agent.amazon-ssm-agent.service
  • SSM Agent 상태: 실행중

  • SSM - EC2 연결 확인
# SSM-EC2 연결 확인
aws ssm describe-instance-information

"""
{
    "InstanceInformationList": [
        {
            "InstanceId": "i-xxx",
            "PingStatus": "Online",
            "LastPingDateTime": "2024-12-23T11:33:14.897000+00:00",
            "AgentVersion": "3.3.987.0",
            "IsLatestVersion": false,
            "PlatformType": "Linux",
            "PlatformName": "Ubuntu",
            "PlatformVersion": "24.04",
            "ResourceType": "EC2Instance",
            "IPAddress": "xxx",
            "ComputerName": "ip-xxx",
            "SourceId": "i-xxx",
            "SourceType": "AWS::EC2::Instance"
        },
        ...
    ]
}
"""

 
(2) Lambda 함수에서 SSM API 호출

  •  EC2에 존재하는 SSM 문서 목록 확인 (AWS-RunShellScript)
aws ssm list-documents --filters Key=Name,Values=AWS-RunShellScript

"""
{
    "DocumentIdentifiers": [
        {
            "Name": "AWS-RunShellScript",
            "CreatedDate": "2017-08-31T00:01:22.641000+00:00",
            "Owner": "Amazon",
            "PlatformTypes": [
                "Linux",
                "MacOS"
            ],
            "DocumentVersion": "1",
            "DocumentType": "Command",
            "SchemaVersion": "1.2",
            "DocumentFormat": "JSON",
            "Tags": []
        }
    ]
}
"""
  • Lambda 함수에서 SSM 호출 테스트 코드 작성
import boto3
import time

def lambda_handler(event, context):
    ssm = boto3.client('ssm', region_name='ap-northeast-2')
    instance_id = 'i-xxx'  # 대상 EC2 인스턴스 ID
    try:
        print("Starting SSM send-command...")
        response = ssm.send_command(
            InstanceIds=[instance_id],
            DocumentName='AWS-RunShellScript',  # 실행할 SSM 문서
            Parameters={
                'commands': ['echo "Hello from Lambda!"']
            }
        )

        # Command ID 반환
        command_id = response['Command']['CommandId']
        print(f"Command ID: {command_id}")

        # 명령 상태 확인 (폴링)
        timeout = 60  # 최대 60초 대기
        elapsed_time = 0
        while elapsed_time < timeout:
            time.sleep(3)  # 3초마다 확인
            elapsed_time += 3
            result = ssm.get_command_invocation(
                CommandId=command_id,
                InstanceId=instance_id
            )
            status = result['Status']
            print(f"Command Status: {status}")
            
            if status in ['Success', 'Failed', 'Cancelled', 'TimedOut']:
                print("Command completed.")
                print(f"Output: {result['StandardOutputContent']}")
                return {
                    'statusCode': 200,
                    'body': f"Command Output: {result['StandardOutputContent']}"
                }

        print("Command timed out.")
        return {
            'statusCode': 500,
            'body': "Command did not complete in time."
        }

    except Exception as e:
        print(f"Error occurred: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error: {str(e)}"
        }
  • 비어있는 json 파일 {} 로 테스트 (구성>제한 시간 2분으로 늘림)
    • 두 Lambda 함수에서 Lambda → SSM → EC2 연결 성공!

 
(3) S3 이벤트 트리거 → Lambda → SSM API → EC2 → 스크립트 실행 테스트

  • 목표
    • S3 이벤트 → Lambda 호출
    • Lambda → SSM API 호출 → EC2 인스턴스에 명령 전달
    • EC2 내 파이썬 스크립트 실행 시도 확인

① Lambda 함수 코드 변경

import boto3

def lambda_handler(event, context):
    ssm = boto3.client('ssm', region_name='ap-northeast-2')

    try:
        print("Sending SSM command to EC2...")
        response = ssm.send_command(
            InstanceIds=['i-xxx'],  # EC2 인스턴스 ID
            DocumentName='AWS-RunShellScript',
            Parameters={
                'commands': [
                    "eval \"$(/home/ubuntu/miniconda3/bin/conda shell.bash hook)\" && conda activate label_pl",  # 가상환경 실행
                    "python3 /home/ubuntu/labeling/LLM/main_text.py"
                   #"python3 /home/ubuntu/labeling/yolo_to_ocr.py"
                ]
            }
        )

        # Command ID 반환
        command_id = response['Command']['CommandId']
        print(f"Command ID: {command_id}")

        return {
            'statusCode': 200,
            'body': f"SSM Command successfully sent. Command ID: {command_id}"
        }

    except Exception as e:
        print(f"Error occurred: {str(e)}")
        return {
            'statusCode': 500,
            'body': f"Error: {str(e)}"
        }


 ② S3  버킷(big9-project-02-question-bucket)에 테스트 파일 업로드
  (i) 텍스트 파일 업로드

  • 폴더 경로: text/
  • 테스트 파일: test.txt

  (ii) 이미지 파일 업로드

  • 폴더 경로: image/
  • 테스트 파일: test.png

③-1 CloudWatch 결과 확인
  (i) 텍스트 파일 업로드

  • Lambda > CloudWatch 로그 확인
    • SSM 명령이 성공적으로 EC2에 전송됨
    • Command ID 생성

 
  (ii) 이미지 파일 업로드

  • Lambda > CloudWatch 로그 확인
    • SSM 명령이 성공적으로 EC2에 전송됨
    • Command ID 생성

③-2 SSM Agent 결과 확인
  (i) 텍스트 파일 업로드

  • EC2 > SSM Agent 로그 확인
    • Command ID 입력
    • S3 에 txt 파일 업로드 → Lambda → SSM → EC2 → main.py → MongoDB 에 테스트 데이터 적재 성공!
aws ssm get-command-invocation \
    --command-id "<COMMAND_ID>" \
    --instance-id "<INSTANCE_ID>" \
    --region "ap-northeast-2"
    

"""
{
    "CommandId": "<COMMAND_ID>",
    "InstanceId": "<INSTANCE_ID>",
    "Comment": "",
    "DocumentName": "AWS-RunShellScript",
    "DocumentVersion": "$DEFAULT",
    "PluginName": "aws:runShellScript",
    "ResponseCode": 0,
    "ExecutionStartDateTime": "2024-12-23T14:51:32.446Z",
    "ExecutionElapsedTime": "PT2.502S",
    "ExecutionEndDateTime": "2024-12-23T14:51:34.446Z",
    "Status": "Success",  # 성공!
    "StatusDetails": "Success",
    "StandardOutputContent": "[INFO] Connected to MongoDB Database: labelresults, Collection: results\n[INFO] Test data inserted with ID: 676978f65c732ce6b5fa5773\n",
    "StandardOutputUrl": "",
    "StandardErrorContent": "",
    "StandardErrorUrl": "",
    "CloudWatchOutputConfig": {
        "CloudWatchLogGroupName": "",
        "CloudWatchOutputEnabled": false
    }
}
"""

 
  (ii) 이미지 파일 업로드

  • Lambda > CloudWatch 로그 확인
    • Command ID 입력
    • 실패! Error 내용: ModuleNotFoundError: No module named 'transformers'
aws ssm get-command-invocation \
    --command-id "<COMMAND_ID>" \
    --instance-id "<INSTANCE_ID>" \
    --region "ap-northeast-2"
    

"""
{
    "CommandId": "<COMMAND_ID>",
    "InstanceId": "<INSTANCE_ID>",
    "Comment": "",
    "DocumentName": "AWS-RunShellScript",
    "DocumentVersion": "$DEFAULT",
    "PluginName": "aws:runShellScript",
    "ResponseCode": 1,
    "ExecutionStartDateTime": "2024-12-23T14:51:42.837Z",
    "ExecutionElapsedTime": "PT2.087S",
    "ExecutionEndDateTime": "2024-12-23T14:51:44.837Z",
    "Status": "Failed",  # 실패!
    "StatusDetails": "Failed",
    "StandardOutputContent": "",
    "StandardOutputUrl": "",
    "StandardErrorContent": "Traceback (most recent call last):\n  File \"/home/ubuntu/labeling/yolo_to_ocr.py\", line 2, in <module>\n    from transformers import TrOCRProcessor, VisionEncoderDecoderModel\nModuleNotFoundError: No module named 'transformers'\nfailed to run commands: exit status 1",
    "StandardErrorUrl": "",
    "CloudWatchOutputConfig": {
        "CloudWatchLogGroupName": "",
        "CloudWatchOutputEnabled": false
    }
}
"""

 

  • EC2에서 스크립트 실행해가며, 필요한 추가 모듈 설치
# 스크립트 실행
python /home/ubuntu/labeling/yolo_to_ocr.py

# 추가 모듈 설치
pip install transformers
pip install ultralytics

sudo apt-get update
sudo apt-get install -y libgl1
  • Hugging Face에서 제공하는 trocr-small-korean 모델은 디렉토리 형식의 구조를 가짐 ( .pt X)
    • 모델 S3 버킷에 OCR 디렉토리 저장
trocr_directory/
├── pytorch_model.bin         # 모델 가중치 파일
├── config.json               # 모델 설정 파일
├── generation_config.json    # 생성 관련 설정 파일
├── preprocessor_config.json  # 전처리기 설정 파일
├── tokenizer_config.json     # 토크나이저 설정 파일
├── tokenizer.json            # 토크나이저 정의 파일
├── special_tokens_map.json   # 특별 토큰 매핑 파일
├── sentencepiece.bpe.model   # SentencePiece 토크나이저 모델 파일
  • yolo_to_ocr.py 코드 수정: subprocess 라이브러리 사용하여 두 번째 파이썬 스크립트(main_image.py)가 자동 실행되게 함
import boto3
import subprocess
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from ultralytics import YOLO
from dotenv import load_dotenv
import os
import stat
import json
from PIL import Image
import numpy as np


# 환경 변수 로드
load_dotenv()

# S3 설정
s3_client = boto3.client('s3')
MODEL_BUCKET_NAME = os.getenv("MODEL_BUCKET_NAME")
YOLO_MODEL_PATH = os.getenv("YOLO_MODEL_PATH")
OCR_MODEL_PATH = os.getenv("OCR_MODEL_PATH")
IMAGE_BUCKET_NAME = os.getenv("IMAGE_BUCKET_NAME")
IMAGE_PREFIX = os.getenv("IMAGE_PREFIX")  # 이미지 경로(prefix)
LOCAL_YOLO_PATH = "/tmp/temp_yolo.pt"  # YOLO 모델 임시 저장 경로
LOCAL_OCR_PATH = "/tmp/temp_ocr/"  # OCR 모델 임시 저장 경로
LOCAL_IMAGE_PATH = "/tmp/temp_image.png"  # 이미지 임시 저장 경로
OUTPUT_JSON_PATH = "/tmp/result.json"  # YOLO - OCR 결과 임시 저장 경로



# 1. YOLO 모델 로드
def load_yolo_model():
    try:
        if os.path.exists(LOCAL_YOLO_PATH):
            # 파일 쓰기 권한 확인 및 추가
            if not os.access(LOCAL_YOLO_PATH, os.W_OK):
                print(f"[WARNING] No write permission for {LOCAL_YOLO_PATH}. Changing permissions.")
                os.chmod(LOCAL_YOLO_PATH, stat.S_IWUSR | stat.S_IRUSR)

            # 기존 파일 삭제
            os.remove(LOCAL_YOLO_PATH)
            print(f"[INFO] Existing file {LOCAL_YOLO_PATH} deleted.")
        
        # S3에서 YOLO 모델 다운로드
        s3_client.download_file(MODEL_BUCKET_NAME, YOLO_MODEL_PATH, LOCAL_YOLO_PATH)
        print(f"[INFO] YOLO model downloaded to {LOCAL_YOLO_PATH}")
        
        # YOLO 모델 로드
        model = YOLO(LOCAL_YOLO_PATH)
        return model
    except Exception as e:
        print(f"[ERROR] Failed to load YOLO model: {e}")
        raise e

# 2. Fine-tuned TrOCR 모델 로드
def load_finetuned_trocr_model():
    os.makedirs(LOCAL_OCR_PATH, exist_ok=True)  # 디렉터리가 없으면 생성

    # S3에서 필요한 파일 리스트 정의
    trocr_files = [
        "config.json",
        "pytorch_model.bin",
        "tokenizer.json",
        "tokenizer_config.json",
        "preprocessor_config.json",
        "generation_config.json"
    ]

    # 파일 개별 다운로드
    for file_name in trocr_files:
        s3_file_path = f"{OCR_MODEL_PATH}/{file_name}"
        local_file_path = f"{LOCAL_OCR_PATH}/{file_name}"

        # 파일이 이미 존재하는 경우 처리
        if os.path.exists(local_file_path):
            # 파일 권한 확인 및 수정
            if not os.access(local_file_path, os.W_OK):
                print(f"[WARNING] No write permission for {local_file_path}. Changing permissions.")
                os.chmod(local_file_path, stat.S_IWUSR | stat.S_IRUSR)
            
            # 기존 파일 삭제
            try:
                os.remove(local_file_path)
                print(f"[INFO] Existing file {local_file_path} deleted.")
            except Exception as e:
                print(f"[ERROR] Failed to delete {local_file_path}: {e}")
                raise e

        # S3에서 파일 다운로드
        try:
            s3_client.download_file(MODEL_BUCKET_NAME, s3_file_path, local_file_path)
            print(f"[INFO] Downloaded {file_name} to {local_file_path}")
        except Exception as e:
            print(f"[ERROR] Failed to download {file_name}: {e}")
            raise e

    # 모델과 프로세서 로드
    try:
        processor = TrOCRProcessor.from_pretrained(LOCAL_OCR_PATH)
        model = VisionEncoderDecoderModel.from_pretrained(LOCAL_OCR_PATH)
        print(f"[INFO] Fine-tuned TrOCR model loaded from {LOCAL_OCR_PATH}")
        return processor, model
    except Exception as e:
        print(f"[ERROR] Failed to load TrOCR model: {e}")
        raise e

# 3. S3에서 이미지 목록 가져오기
def list_images_in_s3(bucket_name, prefix):
    try:
        response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        if "Contents" not in response:
            raise ValueError("No files found in the specified bucket and prefix.")
        files = [obj["Key"] for obj in response["Contents"] if obj["Key"].endswith((".png", ".jpg"))]
        print(f"Found {len(files)} image files.")
        return files
    except Exception as e:
        print(f"[ERROR] Failed to list images from S3: {e}")
        raise e

# 4. S3에서 이미지 다운로드
def download_image_from_s3(bucket_name, key, local_path):
    try:
        # 로컬 경로에 디렉토리 생성 확인
        os.makedirs(os.path.dirname(local_path), exist_ok=True)

        if os.path.exists(local_path):
            os.remove(local_path)  # 기존 파일 삭제
        s3_client.download_file(bucket_name, key, local_path)
        print(f"Image downloaded from S3: {key} to {local_path}")
        return local_path
    except Exception as e:
        print(f"[ERROR] Failed to download image from S3: {e}")
        raise e

def extract_bboxes(yolo_model, image_path):
    try:
        # YOLO 모델 추론
        results = yolo_model(image_path)  # YOLO는 파일 경로 입력을 직접 지원
        bboxes = []

        for result in results[0].boxes:
            if int(result.cls.item()) == 0:  # 클래스가 0인 경우만 처리
                bboxes.append([
                    0,  # class_id
                    result.xywh[0][0].item(),  # x_center
                    result.xywh[0][1].item(),  # y_center
                    result.xywh[0][2].item(),  # width
                    result.xywh[0][3].item()   # height
                ])
        
        # 바운딩 박스가 없는 경우 기본값 반환  (just 연결 테스트용!)
        if not bboxes:
            print("[WARNING] No bounding boxes found. Returning default bounding box.")
            bboxes.append([0, 0, 0, 0, 0])
        
        return bboxes

    except Exception as e:
        print(f"[ERROR] Failed to extract bounding boxes: {e}")
        return [[0, 0, 0, 0, 0]]  # 오류 발생 시 기본값 반환  (just 연결 테스트용!)

# 6. TrOCR로 텍스트 추출
def extract_text_from_bboxes(processor, model, image_path, bboxes):
    try:
        image = Image.open(image_path).resize((384, 384))  # TrOCR 입력 크기로 리사이즈
        question_text = []

        for bbox in bboxes:
            _, x_center, y_center, width, height = bbox

            x_min = max(0, int((x_center - width / 2) * 384))
            y_min = max(0, int((y_center - height / 2) * 384))
            x_max = min(384, int((x_center + width / 2) * 384))
            y_max = min(384, int((y_center + height / 2) * 384))

            if x_min >= x_max or y_min >= y_max:
                print("[WARNING] Skipping invalid bounding box with zero or negative area.")
                continue

            cropped_image = image.crop((x_min, y_min, x_max, y_max))
            inputs = processor(images=cropped_image, return_tensors="pt").pixel_values
            outputs = model.generate(inputs)
            text = processor.batch_decode(outputs, skip_special_tokens=True)[0]
            question_text.append(text.strip())

        # 텍스트가 없을 경우 기본 메시지 추가  (just 연결 테스트용!)
        if not question_text:
            question_text.append("테스트 메시지입니다")

        return question_text
    except Exception as e:
        print(f"[ERROR] Failed to extract text from bounding boxes: {e}")
        return ["테스트 메시지입니다"]  # 오류 발생 시 기본값 반환  (just 연결 테스트용!)

# 7. JSON 저장
def save_to_json(image_id, bboxes, question_text, output_path):
    try:
        # 기존 JSON 파일 삭제
        if os.path.exists(output_path):
            os.remove(output_path)
            print(f"[INFO] Existing JSON file deleted: {output_path}")

        data = {
            "image_id": image_id,
            "bboxes": bboxes,
            "question_text": " ".join(question_text)
        }
        with open(output_path, "w", encoding="utf-8") as json_file:
            json.dump(data, json_file, ensure_ascii=False, indent=4)
        print(f"Saved JSON to {output_path}")
        return output_path
    except Exception as e:
        print(f"[ERROR] Failed to save JSON: {e}")
        raise e

# 메인 함수
def main():

    # 모델 로드
    yolo_model = load_yolo_model()
    processor, trocr_model = load_finetuned_trocr_model()
    # S3 이미지 경로 설정
    image_files = list_images_in_s3(IMAGE_BUCKET_NAME, IMAGE_PREFIX)
    for idx, image_key in enumerate(image_files, start=1):

        # 고유 image_id 생성
        image_id = f"image_{idx}"
        # S3에서 이미지 다운로드
        local_image_path = download_image_from_s3(IMAGE_BUCKET_NAME, image_key, LOCAL_IMAGE_PATH)
        # YOLO로 바운딩 박스 추출
        bboxes = extract_bboxes(yolo_model, local_image_path)
        # TrOCR로 텍스트 추출
        if bboxes:
            question_text_list = extract_text_from_bboxes(processor, trocr_model, local_image_path, bboxes)
            # JSON 저장
            save_to_json(image_id, bboxes, question_text_list, OUTPUT_JSON_PATH)
        else:
            print(f"No valid bounding boxes found for image {image_key}.")


# 실행
if __name__ == "__main__":
    main()  # 현재 작업 실행

    # 다음 작업(두 번째 파일 실행)
    try:
        print("[INFO] Triggering the second script (main_image.py)...")
        subprocess.run(
            ["python", "main_image.py"],  # LLM 폴더에서 main_image.py 실행
            check=True,
            cwd=os.path.join(os.path.dirname(__file__), "LLM")  # LLM 디렉토리로 이동
        )
    except subprocess.CalledProcessError as e:
        print(f"[ERROR] Failed to execute the second script: {e}")
# 자주 발생한 Error
[ERROR] Failed to save JSON: [Errno 1] Operation not permitted: '/tmp/result.json'
PermissionError: [Errno 1] Operation not permitted: '/tmp/result.json'

# Error 분석
파일이 이미 존재하지만 현재 사용자가 해당 파일을 삭제하거나 수정할 권한이 없을 때 발생

# 해결
# 1) 임시 저장 폴더의 기존 파일 삭제
sudo rm -f /tmp/result.json
# 2) 임시 저장 폴더에 파일을 저장할 때, 기존 파일은 삭제하고 새로 저장되게 코드 수정
def save_to_json(image_id, bboxes, question_text, output_path):
    try:
        # 기존 JSON 파일 삭제
        if os.path.exists(output_path):
            os.remove(output_path)  # <- 여기!!!!!!!!!!!!!!!!!!!
            print(f"[INFO] Existing JSON file deleted: {output_path}")

        data = {
            "image_id": image_id,
            "bboxes": bboxes,
            "question_text": " ".join(question_text)
        }
        with open(output_path, "w", encoding="utf-8") as json_file:
            json.dump(data, json_file, ensure_ascii=False, indent=4)
        print(f"Saved JSON to {output_path}")
        return output_path
    except Exception as e:
        print(f"[ERROR] Failed to save JSON: {e}")
        raise e
  • main_image.py
import os
import json
from dotenv import load_dotenv
from pymongo import MongoClient

# .env 파일 로드
load_dotenv()


# 환경 변수에서 MongoDB 정보 가져오기
mongo_user = os.getenv("MONGO_USER")
mongo_password = os.getenv("MONGO_PASSWORD")
mongo_host = os.getenv("MONGO_HOST")
mongo_port = os.getenv("MONGO_PORT")
mongo_db = os.getenv("MONGO_DB")
collection_name = os.getenv("MONGO_COLLECTION_NAME")

# MongoDB URI 구성
mongo_uri = f"mongodb://{mongo_user}:{mongo_password}@{mongo_host}:{mongo_port}/?authSource=admin"

# MongoDB 클라이언트 연결
mongo_client = MongoClient(mongo_uri)
db = mongo_client[mongo_db]
collection = db[collection_name]

# MongoDB 연결 테스트
try:
    mongo_client.admin.command("ping")
    print(f"[INFO] Connected to MongoDB Database: {mongo_db}, Collection: {collection_name}")
except Exception as e:
    print(f"[ERROR] MongoDB connection failed: {e}")
    exit()

# MongoDB에 데이터 삽입 테스트
try:
    test_data = {"key": "value", "status": "test"}
    result = collection.insert_one(test_data)
    print(f"[INFO] Test data inserted with ID: {result.inserted_id}")
except Exception as e:
    print(f"[ERROR] Failed to insert test data: {e}")
finally:
    mongo_client.close()

# JSON 파일 경로 (첫 번째 스크립트에서 저장된 경로)
OUTPUT_JSON_PATH = "/tmp/result.json"

def perform_llm_task(question_text):
    """
    LLM 작업 수행
    """
    # LLM 수행 로직
    print(f"[INFO] Performing LLM task with question_text: {question_text}")
    # 예: GPT 모델 호출, API 호출 등
    response = f"LLM response for: {question_text}"
    return response

def main():
    # JSON 파일 로드
    if not os.path.exists(OUTPUT_JSON_PATH):
        print(f"[ERROR] JSON file not found at {OUTPUT_JSON_PATH}")
        return

    try:
        with open(OUTPUT_JSON_PATH, "r", encoding="utf-8") as f:
            data = json.load(f)
            print(f"[INFO] Loaded JSON data: {data}")

            # question_text 추출
            question_text = data.get("question_text", "")
            if not question_text:
                print("[WARNING] No question_text found in JSON file.")
                return

            # LLM 작업 수행
            response = perform_llm_task(question_text)
            print(f"[INFO] LLM task completed. Response: {response}")

    except Exception as e:
        print(f"[ERROR] Failed to process JSON file: {e}")

# 실행
if __name__ == "__main__":
    main()


6) 라벨링 결과 MongoDB 적재 확인

  • S3 업로드 → Lambda 함수 실행  → EC2 내 라벨링 파이프라인 실행 → 라벨링 결과 MongoDB 저장 성공!
    • Binary JSON 형식으로 저장
{
  "_id": "ObjectId(...)",
  "key": "value",
  "status": "test"
}

MongoDB에 테스트 파일들(JSON)이 자동 적재된 모습

 

 

📙 내일 일정

  • LLM 출력 내용 수정
  • 학습된 모델로 갈아 끼우기
  • 라벨링 파이프라인 내 CI/CD
  • 업로드 시간 줄이기 5s/20s ->