본문 바로가기
TIL _Today I Learned/2024.10

[DAY 69] SQLAlchemy ORM

by gamdong2 2024. 10. 28.
[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.10.28

📕 학습 목록

  • 기능별 파일로 모듈을 분리하여 저장 → 필요 시 해당 모듈을 호출

 

📗 기억할 내용

[유용한 VS Code 단축키]

Shortcut Description
Ctrl + D 같은 단어 연속 선택
Ctrl + F 현재 파일에서 단어 검색
Ctrl + Shift + F 현재 디렉토리의 모든 파일에 대해 단어 검색
Ctrl + B Explorer 창 on/off
Alt + 방향키(위/아래) 선택한 코드 블럭 위아래로 이동
Shift + Alt + 방향키(위/아래) 선택한 코드 블럭 위아래로 복사
Ctrl + ~ 터미널 창 on/off
Ctrl + Shift + 5 터미널 창 split
Ctrl + 2 작업 창 split
F1 Show Command Palette

 

📘코드 실습

[모듈 생성 & 호출]

1. 모듈 및 공통 정보들을 각각의 파일 형태로 나누어 관리

1) 디렉토리 구조 및 역할

 

[각 파일의 역할]
1. 최상위 파일 및 폴더
  - .env: 데이터베이스나 API 키와 같은 민감한 환경 변수를 저장
  - app.py: 애플리케이션의 진입점 파일로, 서버를 실행하거나 특정 기능을 실행
  - controller.py: ETL 파이프라인을 관리하는 컨트롤러 파일로, 데이터를 추출, 변환, 로드하고 임시 파일을 정리
  - .ipynb: Jupyter Notebook 파일로, 분석, 테스트 또는 실험 코드가 포함되어 있음
  - main.py: 애플리케이션의 메인 실행 파일로, 초기 설정과 함께 전체 워크플로우를 관리
  - settings.py: 애플리케이션의 설정 파일로, 데이터베이스 연결 정보, 파일 경로와 같은 주요 설정 저장
2. dataset: 데이터셋 파일을 저장하는 폴더로, 분석이나 모델 학습을 위한 데이터를 보관
3. db: 데이터베이스 관련 파일을 모아놓은 폴더
  - connector.py: 데이터베이스 연결 객체를 생성. DB에 연결하는 기능을 제공
  - pgsql_query.py: PostgreSQL 데이터베이스에 사용되는 쿼리들을 저장
4. fakedata: 가상 데이터를 생성하고 삽입하는 기능을 제공하는 폴더
  - create.py: 가상 데이터를 생성하는 로직을 포함
  - insert.py: 생성된 가상 데이터를 데이터베이스에 삽입
  - process.py: 가상 데이터를 처리하는 데 필요한 추가적인 로직을 포함
5. pipeline: ETL 파이프라인을 구성하는 각 단계의 파일을 포함
  - extract.py: 데이터베이스로부터 데이터를 추출
  - load.py: 변환된 데이터를 데이터베이스에 로드
  - remove.py: ETL 작업 후 임시 파일이나 불필요한 데이터를 삭제하는 기능을 수행
  - transform.py: 데이터를 변환하는 로직을 포함
6. temp_storage: 임시 파일을 저장하는 폴더로, ETL 과정에서 사용되는 임시 데이터를 보관

 

2) 파일 생성

① settings.py

# settings.py
import os  # 운영 체제와의 상호 작용을 위해 os 모듈 임포트
import dotenv  # .env 파일을 쉽게 로드하기 위해 dotenv 모듈 임포트

# .env 파일이 프로젝트 루트 디렉토리에 위치해 있을 경우,
# dotenv.load_dotenv() 함수로 경로를 명시하지 않아도 .env 파일을 자동으로 로드 가능함
# dotenv.find_dotenv()와 dotenv.load_dotenv()를 사용하지 않고도 .env 파일이 로드될 수 있음

# .env 파일 경로 찾기
env_path = dotenv.find_dotenv()  # .env 파일의 위치를 자동으로 찾아 반환
dotenv.load_dotenv(env_path)  # 찾은 경로에서 .env 파일 로드하여 환경 변수 설정

# temp_path 정보를 매번 Day2.ipynb 파일에 선언하는 번거로움을 줄이기 위해
# settings.py 파일에 저장하고 필요할 때마다 불러올 수 있도록 설정함
TEMP_PATH = "c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028\\temp_storage"

# 데이터베이스 설정을 위한 변수들 모음
DB_SETTINGS = {
    "POSTGRES": { 
        'engine': os.getenv("POSTGRES_ENGINE"), 
        'orm_engine': os.getenv("POSTGRES_ENGINE"), 
        'host': os.getenv("POSTGRES_HOST"),  
        'database': os.getenv("POSTGRES_DB"),  
        'user': os.getenv("POSTGRES_USER"),  
        'password': os.getenv("POSTGRES_PASSWORD"),  
        'port': os.getenv("POSTGRES_PORT")  
    },
    "KDT9": { 
        'engine': os.getenv("POSTGRES_ENGINE"), 
        'orm_engine': os.getenv("POSTGRES_ENGINE"),  
        'host': os.getenv("POSTGRES_HOST"),
        'database': os.getenv("POSTGRES_DB_2"),  
        'user': os.getenv("POSTGRES_USER"),  
        'password': os.getenv("POSTGRES_PASSWORD"), 
        'port': os.getenv("POSTGRES_PORT") 
    }
}

 
② .env

# .env
POSTGRES_ENGINE = 'postgresql'
POSTGRES_USER = 'postgres'
POSTGRES_PASSWORD = '1234'
POSTGRES_HOST = '127.0.0.1' 
POSTGRES_PORT = '5432'
POSTGRES_DB = 'postgres'

 
③ db>connector.py

# connector.py
import psycopg2                          # psycopg2 라이브러리 - PostgreSQL에 연결하기 위한 라이브러리
import db.pgsql_query as postgresql_query # PostgreSQL 쿼리를 담고 있는 모듈 임포트
from sqlalchemy import create_engine      # SQLAlchemy - ORM 연결을 위해 사용

class DBconnector:
    def __init__(self, engine, orm_engine, host, database, user, password, port):
        # DBconnector 클래스 초기화 메서드로, 데이터베이스 연결에 필요한 파라미터를 설정
        self.engine = engine
        self.orm_engine = orm_engine
        self.conn_params = dict(           # psycopg2를 위한 DB 연결 파라미터 설정
            host=host,
            dbname=database,
            user=user,
            password=password,
            port=port
        )
        # SQLAlchemy 연결을 위한 연결 문자열 생성
        self.orm_conn_params = (
            f"{orm_engine}://{user}:{password}@{host}:{port}/{database}"
        )
        self.orm_connect()                # ORM 연결을 설정

        # PostgreSQL 엔진을 사용할 경우, PostgreSQL 연결과 쿼리 설정
        if self.engine == 'postgresql':
            self.connect = self.postgres_connect()  # PostgreSQL 연결 설정
            self.queries = postgresql_query.queries # PostgreSQL 쿼리 저장
        # elif self.engine == 'mysql':   # 다른 DB도 추가할 수 있음

    def __enter__(self):
        # with 문을 사용할 때 DB 연결 객체 반환
        print("접속")
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # with 문을 종료할 때 DB 연결을 닫음
        self.conn.close()
        print("종료")

    def postgres_connect(self):
        # psycopg2를 사용한 PostgreSQL 연결 설정
        self.conn = psycopg2.connect(**self.conn_params)
        return self
    
    def orm_connect(self):
        # SQLAlchemy 엔진을 사용해 ORM 연결 객체 생성
        self.orm_conn = create_engine(self.orm_conn_params)
        return self.orm_conn  # SQLAlchemy 연결 객체를 반환하여, 외부에서 ORM 연결을 사용할 수 있게 함

    def get_query(self, table_name):
        # 지정된 테이블 이름에 대한 쿼리를 가져옴
        _query = self.queries[table_name]
        return _query

 

④ db>query.py

# pgsql_query.py
queries = {
    #"lecture": "SELECT * FROM lecture",
    # "tbl": "SELECT * FROM tbl LIMIT 5",
    "fake": "SELECT * FROM fake"
}

 
⑤ pipeline>extract.py

# extract.py
import pandas as pd  # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작

# Extract 함수
def extractor(db_connector, table_name):
    # db_connector: 데이터베이스 연결 객체
    # table_name: 데이터를 추출할 테이블 이름
    
    with db_connector as connected:
        # with 문을 사용해 db_connector의 연결을 관리 (자동으로 열고 닫힘)

        try:
            _query = connected.get_query(table_name)  # 쿼리를 가져오는 함수 호출, `get_query` 메서드가 존재해야 함
            con = connected.orm_conn  # SQLAlchemy ORM 연결 객체
            df = pd.read_sql(_query, con)  # SQL 쿼리를 실행하고 결과를 데이터프레임으로 변환
            return df  # 성공 시 추출된 데이터를 데이터프레임 형태로 반환
        
        except Exception as e:
            print(f"Extract MSG: {e}")  # 예외 발생 시 에러 메시지를 출력
            return False  # 실패 시 False 반환

 
⑥ pipeline>load.py

# load.py
import pandas as pd  # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작

def loader(db_connector, df, table_name):
    # DataFrame 데이터를 지정된 테이블에 로드하는 함수
    # db_connector: 데이터베이스 연결 객체
    # df: 로드할 데이터가 담긴 DataFrame
    # table_name: 데이터를 삽입할 테이블 이름
    
    with db_connector as connected:
        # with 문을 사용해 db_connector의 연결을 관리 (자동으로 열고 닫힘)
        
        try:
            orm_conn = connected.orm_conn  # SQLAlchemy ORM 연결 객체 가져오기
            # 데이터프레임을 지정된 테이블에 삽입, 기존 테이블이 있을 경우 대체 (if_exists="replace")
            df.to_sql(name=table_name, con=orm_conn, if_exists='replace', index=False)
            return True  # 데이터 삽입 성공 시 True 반환
        
        except Exception as e:
            print(f"Error MSG : {e}")  # 예외 발생 시 에러 메시지를 출력
            return False  # 데이터 삽입 실패 시 False 반환

 
⑦ remove.py

# remove.py
import shutil  # shutil 라이브러리 - 파일과 디렉토리를 조작하기 위한 라이브러리
import os      # os 라이브러리 - 운영 체제의 파일 시스템을 조작하기 위한 라이브러리

# remover 함수
def remover(path):
    # 지정된 경로에 있는 디렉토리를 삭제하고, 동일한 이름으로 빈 디렉토리를 생성하는 함수
    # path: 삭제할 디렉토리 경로

    try:
        shutil.rmtree(path)  # 지정된 경로의 디렉토리와 모든 하위 파일/폴더를 삭제
        os.makedirs(path)    # 동일한 경로에 빈 디렉토리를 다시 생성
        return True          # 성공적으로 수행된 경우 True 반환
    
    except Exception as e:
        print(f"Remover Error MSG: {e}")  # 예외 발생 시 에러 메시지를 출력
        return False         # 실패 시 False 반환

 
⑧ transform.py

# transform.py
import os        # os 라이브러리 - 운영 체제의 파일 시스템을 조작하기 위한 라이브러리
import pandas as pd  # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작

# transformer 함수
def transformer(temp_path, batch_date, df, table_name):
    # 데이터를 변환하여 임시 파일로 저장하는 함수
    # temp_path: 임시 저장 폴더 경로
    # batch_date: 배치 날짜 (형식: "YYYYMMDD")
    # df: 저장할 데이터가 담긴 데이터프레임
    # table_name: 저장할 파일 이름의 기준이 될 테이블 이름

    path = create_path(temp_path, batch_date)  # 날짜에 맞는 경로 생성

    save_to_file(df, path, table_name)  # 데이터프레임을 파일로 저장

    return df  # 데이터프레임 반환 (변환 후 확인 용도)

# create_path 함수
def create_path(temp_path, batch_date):
    # 배치 날짜에 맞게 연도, 월, 일 디렉토리 구조를 생성하는 함수
    # temp_path: 임시 저장 폴더 경로
    # batch_date: 배치 날짜 (형식: "YYYYMMDD")

    _y = batch_date[:4]  # 연도 추출
    _m = batch_date[4:6] # 월 추출
    _d = batch_date[6:]  # 일 추출

    # "lecture/yyyy=YYYY/mm=MM/dd=DD" 형식으로 경로 생성
    _path = os.path.join(temp_path, 'lecture', f'yyyy={_y}', f'mm={_m}', f'dd={_d}')

    return _path  # 생성된 경로 반환

# save_to_file 함수
def save_to_file(df, path, table_name):
    # 데이터프레임을 지정한 경로와 파일 이름으로 저장하는 함수
    # df: 저장할 데이터프레임
    # path: 저장할 디렉토리 경로
    # table_name: 파일 이름의 기준이 될 테이블 이름

    if len(df) > 0:  # 데이터프레임이 비어 있지 않은 경우에만 저장
        os.makedirs(path, mode=0o777, exist_ok=True)  # 지정한 경로 생성, 이미 있으면 생략
        save_path = os.path.join(path, f'{table_name}.csv')  # 파일 저장 경로 생성

        df.to_csv(save_path, index=False)  # 인덱스를 제외하고 CSV 파일로 저장
        return True  # 성공적으로 저장된 경우 True 반환
    else:
        print("EMPTY FILE")  # 데이터프레임이 비어 있을 경우 경고 메시지 출력
        return False  # 데이터프레임이 비어 있는 경우 False 반환

 

3) connector.py & settings.py: 데이터베이스 연결

① 파일 임포트

from db.connector import DBconnector  
from settings import DB_SETTINGS  

DB_SETTINGS["POSTGRES"]  # POSTGRES 데이터베이스 설정 정보를 불러옴

"""
{'host': '127.0.0.1',
 'database': 'postgres',
 'user': 'postgres',
 'password': '1234',
 'port': '5432'}
 """

 

② 데이터베이스 연결

db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])  
# **DB_SETTINGS["POSTGRES"]는 DB_SETTINGS["POSTGRES"] 딕셔너리를 펼쳐서 키워드 인자로 전달
# 예: DBconnector(host="127.0.0.1", database="mydatabase", user="username", ...)

# with 문을 사용하여 DBconnector를 통해 데이터베이스에 연결
with db_connector as connected:
    # `connected.conn`은 데이터베이스 연결 객체를 참조
    conn = connected.conn  
    # 커서 생성 - SQL 쿼리 실행에 사용됨
    cursor = conn.cursor()  
    # `lecture` 테이블에서 최대 5개의 행을 선택하는 쿼리 실행
    cursor.execute("SELECT * FROM lecture LIMIT 5")  
    # 쿼리 결과를 가져와 출력
    print(cursor.fetchall())  
    # `cursor.fetchall()`은 쿼리 결과의 모든 행을 리스트 형태로 반환하고, 이를 출력함
    
"""
Start DBConnector!
Enter
[(6, 'Margaret', 1880, 'F', 1578), (7, 'Ida', 1880, 'F', 1472), (8, 'Alice', 1880, 'F', 1414), (9, 'Bertha', 1880, 'F', 1320), (10, 'Sarah', 1880, 'F', 1288)]
Exit
"""
[tip] 현재 작업 디렉터리 확인 → 파이썬의 모듈 검색 경로에 새로운 디렉터리 추가
- sys.path.append()를 사용하는 이유: Python은 기본적으로 다음 경로에서 모듈을 찾음
  • 현재 스크립트가 실행되는 디렉터리
  • Python 설치 경로에 포함된 표준 라이브러리 디렉터리
  • PYTHONPATH 환경 변수에 지정된 경로
  • 패키지 설치 경로 (site-packages)

이 범위 안에 모듈이 있다면 from 디렉터리 import 모듈처럼 바로 사용할 수 있음. 하지만, 위 경로에 포함되지 않은 디렉터리에 모듈이 있는 경우 sys.path.append()로 해당 디렉터리를 추가해 주어야 함

import os

# 현재 작업 디렉터리(cwd; Current Working Directory) 확인
os.getcwd()

"""
'c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028'
"""

# 새로운 경로를 파이썬 모듈 검색 경로에 추가
import sys

# 새로운 경로를 Python 모듈 검색 경로에 추가
# sys.path: 파이썬이 모듈을 찾는 경로들이 저장된 리스트. import를 사용할 때 이 경로를 모듈 검색 범위에 포함시킴
sys.path.append(r"c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028")

# 코드 활용 예시
# 241028 디렉터리 내의 module.py 파일이 있다면, 위에서 sys.path.append()를 통해 241028 경로를 추가했기 때문에 그 경로에 대해서도 모듈 검색을 수행함; 비로소 import 를 통해 module.py를 불러올 수 있음
import module

 

4) query.py: 쿼리 정보를 파일로 관리

① connector.py 에서 클래스 & settings.py 에서 데이터베이스 설정 정보 호출 → 인스턴스 생성 & 메서드호출 

from db.connector import DBconnector  # DBconnector 클래스 임포트
from settings import DB_SETTINGS  # 데이터베이스 설정 정보 임포트

# get_query 로 쿼리 불러오기 예제
# DBconnector 클래스의 db_connector 인스턴스 생성 시, DB_SETTINGS['POSTGRES'] 딕셔너리의 키와 값이 
# __init__ 메서드의 각 매개변수 (host, database, user, password, port)에 할당됨
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DBconnector 인스턴스 생성, POSTGRES 설정값을 사용해 초기화
db_connector.get_query('lecture')  # 'lecture' 테이블에 해당하는 SQL 쿼리 가져오기

 
② pgsql_query 에서 쿼리 목록 호출 → for 문으로 쿼리를 하나씩 출력

from db.pgsql_query import queries 

# for 문을 이용해 queries 안에 있는 각 쿼리를 하나씩 출력함
for tbl in queries.keys():  # queries 딕셔너리의 키(테이블 이름)를 하나씩 순회
    db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DBconnector 인스턴스 생성, POSTGRES 설정 값 사용
    _query = db_connector.get_query(tbl)  # 테이블 이름에 해당하는 쿼리 가져오기
    print(_query)  # 쿼리 출력

 

5) extract.py: 쿼리를 받아 데이터베이스에 조회 → 결과를 Pandas Dataframe 으로 변환

① connector.py 에서 클래스 호출 → 인스턴스 생성 → 데이터베이스연결 → 쿼리 실행 → 쿼리 결과를 데이터프레임으로 반환 

from db.connector import DBconnector 
from settings import DB_SETTINGS  
import pandas as pd  

# 데이터 불러오기
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DBconnector 인스턴스 생성, POSTGRES 설정 값으로 초기화

# 데이터베이스 연결 및 쿼리 실행
with db_connector as connected:  # with문으로 안전하게 db_connector 객체 사용
    _query = connected.get_query('lecture')  # 'lecture' 테이블에 대한 SQL 쿼리 가져오기
    con = connected.conn  # 데이터베이스 연결 객체 가져오기
    df = pd.read_sql(_query, con)  # 쿼리 결과를 데이터프레임으로 읽어옴

# 결과 출력
print(df)  # 데이터프레임 출력
print(type(df))  # 데이터프레임의 데이터 타입 출력 (예상 출력: <class 'pandas.core.frame.DataFrame'>)

"""
접속
종료
    id       name  year gender  count
0    6   Margaret  1880      F   1578
1    7        Ida  1880      F   1472
2    8      Alice  1880      F   1414
...
44  50  Josephine  1880      F    544
"""

 
② extractor.py 에서 함수(extractor) 호출 → 쿼리 결과로 얻어진 데이터프레임 반환

# extractor 함수 호출하여 'lecture' 테이블의 데이터를 가져옴
return_extractor = extractor(db_connector, 'lecture')  

# 가져온 데이터프레임의 상위 5개 행 출력
return_extractor.head()

 

6) transform.py: 데이터를 로컬 저장소에 정해진 날짜 경로 형식으로 정리해 보관

[tip] transform.py의 역할 및 사용 환경별 기능
  • Batch 날짜별 저장 경로 생성 및 데이터프레임 저장: 주어진 날짜(batch_date) 정보를 기반으로 연도/월/일 폴더 구조를 생성하고, 해당 경로에 데이터프레임을 CSV 파일로 저장
  • 이행 환경에 따른 구성 가능성: 데이터 이동 또는 저장 방식에 따라 설정이 달라질 수 있음
    • Database -> Staging Server -> Cloud / Database: 데이터가 데이터베이스에서 임시 저장소(스테이징 서버)로 이동한 후 최종 목적지인 클라우드나 다른 데이터베이스에 저장되는 경우
    • Database -- Directory Connection -> Cloud / Database: 데이터베이스에서 바로 로컬 디렉토리로 연결하여 파일로 저장한 후, 클라우드나 데이터베이스로 이행하는 경우
  • 목적지 데이터베이스의 성격에 따른 처리: 저장할 데이터베이스의 성격에 따라 transform.py에 추가적인 데이터 처리 기능을 포함할 수 있음. 다음과 같이 데이터베이스의 용도에 따라 다르게 처리될 수 있음
    • Data Lake: 데이터가 거의 가공되지 않은 상태로 저장됨. 원시 데이터 형태를 유지함
    • Data Warehouse: 데이터가 분석 용도로 활용되므로, 결측치나 공백 처리 등 간단한 전처리가 필요할 수 있음
    • Data Mart: 특정 부서나 업무에서 사용할 목적의 데이터 저장소. group by나 필터링을 통해 더 구체적이고 목적에 맞는 데이터로 처리되어야 함

① 저장 경로 생성

  • Database 이름/Table 이름/yyyy={}/mm={}/dd={}/{table_name}.csv
    • ex: postgres/lecture/2024/10/28/lecture.csv`
import pandas as pd  
from db.connector import DBconnector  
from settings import DB_SETTINGS  
from pipeline.extract import extractor

db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DBconnector 인스턴스 생성, POSTGRES 설정 값으로 초기화
table_name = 'lecture'  # 데이터를 가져올 테이블 이름 지정

# extractor 함수로 데이터 추출
return_extractor = extractor(db_connector, table_name)  

# 데이터가 정상적으로 추출되었는지 확인 후 출력
if isinstance(return_extractor, pd.DataFrame):  # 추출 결과가 데이터프레임인지 확인
    print(return_extractor.head())  # 데이터프레임 상위 5개 행 출력
else:
    print("데이터를 가져오지 못했습니다.")  # 데이터가 없거나 오류 발생 시 메시지 출력

 
② Batch 날짜 설정

# Batch 날짜 설정 방법 1
from datetime import datetime 

batch_date = datetime.now()  # 현재 날짜와 시간 저장
format_date = batch_date.strftime('%Y%m%d')  # 'YYYYMMDD' 형식의 문자열로 변환

# 연도, 월, 일을 문자열에서 슬라이싱하여 각각 변수에 저장
_y = format_date[:4]  # 연도 추출 ('YYYY')
_m = format_date[4:6]  # 월 추출 ('MM')
_d = format_date[6:]   # 일 추출 ('DD')

_y, _m, _d  # 연도, 월, 일 출력
# Batch 날짜 설정 방법 2
f"{batch_date: %Y}", f"{batch_date: %m}", f"{batch_date: %d}"  # 연도, 월, 일을 각각 포맷팅하여 추출

 
③ 현재 작업 디렉토리 확인 → 경로 추가하여 최종 경로(_path) 생성 

import os 

os.getcwd()  # 현재 작업 디렉토리 경로 출력

# "현재 위치 + temp_storage" 경로를 변수에 저장
temp_path = "c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028\\temp_storage"  # 기본 저장 경로 지정

# temp_path 뒤에 'postgres'와 'lecture' 디렉토리를 추가하여 새로운 경로 생성
_path = os.path.join(temp_path, 'postgres', 'lecture')  # 지정한 경로들을 결합하여 _path에 저장
_path  # 최종 경로 출력

 
④ transform.py 에서 함수(create_path) 호출 → 경로(path) 생성 

import os
from datetime import datetime  
from transform import create_path 

# 기본 저장 경로 설정
temp_path = "c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028\\temp_storage"

# 현재 날짜를 'YYYYMMDD' 형식으로 저장
batch_date = datetime.now().strftime('%Y%m%d')

# 경로 생성
path = create_path(temp_path, batch_date)  # transform.py의 create_path 함수 사용

 
transform.py 에서 함수(save_to_file) 호출 → 데이터프레임(df)을 csv / json / parquet 로 저장

import pandas as pd
from transform import save_to_file

# mode=777: 리눅스 명령어로 모든 권한(read/write 등) 부여
os.makedirs(path, mode=777, exist_ok=True)  # exist_ok=True로 폴더가 있어도 재생성하지 않음

# 예제 데이터프레임 생성
df = return_extractor

# CSV 파일 저장 (save_to_file 함수 활용)
save_to_file(df, path, 'lecture')  # CSV 형식으로 'lecture.csv' 파일 저장

# JSON 파일 저장
save_path_json = os.path.join(path, 'lecture.json')
df.to_json(save_path_json, orient='records', indent=4, force_ascii=False)  # JSON 형식으로 저장

# Parquet 파일 저장
save_path_parquet = os.path.join(path, 'lecture.parquet')
df.to_parquet(save_path_parquet, engine='pyarrow', compression='gzip', index=False)  # Parquet 형식으로 저장

 
⑥ transform.py 의 함수(transformer) 호출 → 경로 생성 & 해당 경로에 데이터프레임 저장

from transform import transformer 

# 테이블 이름 설정
table_name = 'lecture'

# transformer 함수 호출하여 경로 생성 및 데이터프레임 저장 통합 수행
result = transformer(temp_path, batch_date, df, table_name)

# 결과 확인
if result:
    print("데이터가 성공적으로 저장되었습니다.")
else:
    print("데이터 저장에 실패하였습니다.")

 
⑦ 데이터 추출 → 경로 생성 → 저장

[tip] 코드 실행 흐름
1. DBconnector 객체 생성
  - connector.py의 DBconnector 클래스를 사용해 db_connector 객체를 생성
  - settings.py에서 호출한 DB_SETTINGS['POSTGRES'] 설정을 db_connector 객체에 전달하여 PostgreSQL 데이터베이스와의 연결을 설정

2. 데이터 추출 (extractor)
  - extract.py 모듈의 extractor 함수를 사용하여 db_connector와 table_name('lecture')을 통해 데이터베이스 쿼리를 실행
  - 쿼리 결과를 return_extractor 변수에 데이터프레임 형태로 저장

3. 조건문으로 데이터 확인
  - return_extractor가 None이 아니며 비어 있지 않을 경우(not return_extractor.empty)에만 다음 단계로 진행
  - 추출된 데이터프레임이 비었거나 데이터 추출이 실패한 경우 "DataFrame이 비었거나 데이터 추출에 실패했습니다." 메시지를 출력

4. 데이터 저장 (transformer)
  - 추출된 데이터프레임이 유효할 경우, transform.py의 transformer 함수를 사용하여 TEMP_PATH와 batch_date를 기반으로 저장 경로를 설정하고 데이터프레임을 지정된 파일 형식으로 저장
  - transformer 함수는 TEMP_PATH, batch_date, return_extractor(데이터프레임), table_name을 매개변수로 받아 경로 설정과 데이터프레임 저장을 통합하여 수행
# 모듈 불러오기
from db.connector import DBconnector  
from settings import DB_SETTINGS, TEMP_PATH  
from pipeline.extract import extractor
from pipeline.transform import transformer 
from datetime import datetime  

# DBconnector 객체 생성 및 설정값 적용
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DB_SETTINGS의 POSTGRES 설정으로 DBconnector 초기화
table_name = 'lecture'  # 데이터 추출 대상 테이블 이름 지정
batch_date = datetime.now().strftime('%Y%m%d')  # 현재 날짜를 'YYYYMMDD' 형식으로 저장

# 데이터 추출
return_extractor = extractor(db_connector, table_name)  # extractor 함수 호출하여 데이터베이스에서 데이터 추출
# 추출된 데이터프레임 반환 (데이터 추출 성공 시 데이터프레임, 실패 시 None)

# 추출된 데이터 유효성 검사 및 저장
if return_extractor is not None and not return_extractor.empty:  # 데이터프레임이 비어있지 않으면 저장 단계로 진행
    # transformer 함수 호출하여 경로 생성 및 데이터프레임 저장 수행
    return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name)
else:
    # 추출된 데이터가 없거나 실패 시 메시지 출력
    print("DataFrame이 비었거나 데이터 추출에 실패했습니다.")

 

7) load.py: 만들어진 파일을 특정 저장소에 적재

① 데이터베이스 연결

# DB 세팅 정보 설정
engine = 'postgresql'
user = 'postgres'  
password = '1234' 
host = '127.0.0.1' 
port = '5432'  
database = 'postgres' 

# 데이터베이스 연결 생성
db = create_engine(f'{engine}://{user}:{password}@{host}:{port}/{database}')  # SQLAlchemy 엔진을 사용해 데이터베이스 연결 설정
db  # 연결 객체 출력하여 확인

 

② csv 파일을 판다스 데이터프레임으로 저장 & 연결된 데이터베이스에 데이터프레임 저장

from sqlalchemy import create_engine  # SQLAlchemy 모듈에서 create_engine 함수를 임포트하여 데이터베이스 연결 설정

# DB 세팅 정보 설정
engine = 'postgresql'
user = 'postgres'  
password = '1234' 
host = '127.0.0.1' 
port = '5432' 
database = 'postgres'

# 데이터베이스 연결 생성
db = create_engine(f'{engine}://{user}:{password}@{host}:{port}/{database}')  # SQLAlchemy 엔진을 사용해 데이터베이스 연결 설정
db  # 연결 객체 출력하여 확인

import pandas as pd  # 데이터 처리를 위한 pandas 라이브러리 임포트

# CSV 파일 불러오기
df = pd.read_csv('./dataset/data-01/names.csv')  # 데이터셋 파일을 읽어 데이터프레임 생성
df.head()  # 데이터프레임의 상위 5개 행 출력하여 확인

# 데이터프레임의 데이터 타입 확인
df.dtypes  # 각 열의 데이터 타입을 확인하여 출력

# 데이터베이스에 데이터프레임 저장
df.to_sql(name='point', con=db, if_exists='replace')  # 데이터프레임을 'point' 테이블로 데이터베이스에 저장
                                            # if_exists 옵션 설명
                                                # 'fail': 테이블이 이미 존재하면 실패
                                                # 'append': 테이블이 존재하면 기존 테이블 아래에 데이터 추가
                                                # 'replace': 테이블이 존재하면 기존 테이블을 덮어씌움 (교체)

 
③ load.py 에서 함수(loader) 호출 → 테이블(lecture)에 데이터 적재

[tip] 코드 실행 흐름
1. 데이터베이스 연결 및 설정
  - DBconnector 객체를 생성하고 PostgreSQL 설정(DB_SETTINGS['POSTGRES'])을 적용하여 데이터베이스 연결을 설정
  - table_name과 batch_date를 설정하여 데이터를 추출하고 저장할 대상과 날짜를 지정

2. 데이터 추출 (Extract)
  - extractor 함수를 호출하여 table_name('lecture')에 해당하는 데이터를 추출하고, 추출된 데이터를 return_extractor에 저장
  - 추출된 데이터프레임이 유효하지 않으면 이후 단계로 진행하지 않음

3. 데이터 변환 및 저장 경로 생성 (Transform)
  - transformer 함수를 호출하여 TEMP_PATH와 batch_date를 기반으로 저장 경로를 생성하고, 데이터를 해당 경로에 저장
  - 변환 후 저장된 데이터프레임은 return_transformer에 저장

4. 데이터 로드 (Load)
  - loader 함수가 return_transformer를 db_connector 객체와 함께 사용하여 데이터베이스의 'lecture' 테이블에 저장
  - loader 함수는 db_connector의 orm_conn을 사용하여 데이터프레임을 데이터베이스에 저장하며, if_exists 옵션을 통해 테이블이 이미 존재할 경우의 동작을 결정 (replace, append 등)
import pandas as pd 

# 필요한 모듈 임포트
from db.connector import DBconnector 
from settings import DB_SETTINGS, TEMP_PATH  
from pipeline.extract import extractor  
from pipeline.transform import transformer 
from pipeline.load import loader  
from datetime import datetime  

# 데이터베이스 연결 설정 및 테이블 이름, 날짜 설정
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DB_SETTINGS의 POSTGRES 설정으로 DBconnector 초기화
table_name = 'lecture'  # 데이터 추출 대상 테이블 이름 지정
batch_date = datetime.now().strftime('%Y%m%d')  # 현재 날짜를 'YYYYMMDD' 형식으로 저장

# 데이터 추출 단계
return_extractor = extractor(db_connector, table_name)  # extractor 함수 호출하여 'lecture' 테이블의 데이터를 추출
# 추출된 데이터프레임이 유효하지 않으면 이후 단계는 실행되지 않음

# 추출된 데이터 유효성 검사 및 변환 단계 실행
if return_extractor is not None and not return_extractor.empty:  # 데이터가 유효한 경우에만 변환 단계로 진행
    # transformer 함수 호출하여 TEMP_PATH와 batch_date에 맞는 저장 경로 생성 및 데이터프레임 저장
    return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name)
# return_transformer에는 변환 후 데이터프레임이 저장됨 (CSV 파일 저장)

# 변환된 데이터 유효성 검사 및 로드 단계 실행
if return_transformer is not None and not return_transformer.empty:  # 변환된 데이터가 유효한 경우에만 로드 단계로 진행
    # loader 함수 호출하여 변환된 데이터를 PostgreSQL 데이터베이스의 'lecture' 테이블로 로드
    return_loader = loader(db_connector, return_transformer, table_name)
    # loader 함수는 DBconnector의 orm_conn 연결 객체를 사용해 데이터프레임을 SQL 테이블로 저장함

 
8) remove.py: 임시 저장(load)된 파일을 자동으로 삭제

① connector.py에서 클래스(DBconnector) 호출 → settings.py의 데이터베이스 설정값을 토대로 데이터베이스 연결

import pandas as pd
from datetime import datetime
from db.connector import DBconnector 
from settings import DB_SETTINGS, TEMP_PATH 
from pipeline.extract import extractor  
from pipeline.transform import transformer  
from pipeline.load import loader 
from pipeline.remove import remover 

# DB 연결 및 테이블 이름, 날짜 설정
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])  # DB_SETTINGS의 POSTGRES 설정으로 DBconnector 초기화
table_name = 'lecture'  # 데이터 추출 대상 테이블 이름 지정
batch_date = datetime.now().strftime('%Y%m%d')  # 현재 날짜를 'YYYYMMDD' 형식으로 저장


 ② remove.py에서 함수(remover) 호출 → 기존 경로 삭제 및 재생성 

# 기존 TEMP_PATH 삭제 및 재생성
remover(TEMP_PATH)  # TEMP_PATH 경로를 삭제하고 새로 생성


 ③ extract.py에서 함수(extractor) 호출 → 테이블(lecture)의 데이터 추출

# 데이터 추출 단계
return_extractor = extractor(db_connector, table_name)  # extractor 함수 호출하여 'lecture' 테이블의 데이터 추출

# 추출된 데이터 유효성 검사 및 변환 단계 실행
if return_extractor is not None and not return_extractor.empty:  # 데이터가 유효한 경우에만 변환 단계로 진행
    return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name)  # transformer 함수 호출하여 경로 생성 및 저장

# 변환된 데이터 유효성 검사 및 로드 단계 실행
if return_transformer is not None and not return_transformer.empty:  # 변환된 데이터가 유효한 경우에만 로드 단계로 진행
    return_loader = loader(db_connector, return_transformer, table_name)  # loader 함수 호출하여 데이터베이스에 데이터 저장

 

[파이썬 심화]

1. 데이터 구조와 관련 메서드

1) 딕셔너리 (Dictionary)

  • list(aa): 딕셔너리의 키 값을 리스트 형태로 변환
aa = {"aa": 1, "bb": 2}
print(list(aa))  # ['aa', 'bb']


 2. 모듈 동적 임포트

  • __import__: 문자열을 통해 모듈을 동적으로 불러올 수 있음. 여기서는 db.pgsql_query 모듈의 queries 속성을 가져옴
queries = __import__("db.pgsql_query", fromlist=[""])
print(queries.queries)

 

3. 클래스와 객체지향 프로그래밍(OOP)

1) 클래스 예제: 화물선 클래스

  • 클래스: 데이터를 처리하는데 필요한 속성과 메서드를 포함한 객체를 정의하는 방법
  • __init__: 클래스의 생성자로 객체 생성 시 초기화하는 메서드
  • self: 클래스 내에서 객체 자신을 참조하기 위해 사용
  • unload, can_depart, load: 화물선의 상태를 조작하는 메서드들
class Cargoship:
    def __init__(self, capacity):
        self.cargo = []
        self.capacity = capacity

    def unload(self, port):
        port_list = [p[0] for p in self.cargo]
        if port in port_list:
            unloaded = [i for i in self.cargo if i[0] == port]
            return unloaded
        else:
            return None

    def can_depart(self):
        _depart = True if sum([i[1] for i in self.cargo]) <= self.capacity else False
        return _depart

    def load(self, new_cargo):
        self.cargo = new_cargo

 

4. JSON 데이터 처리

  • JSON(JavaScript Object Notation): 데이터를 키-값 형태로 저장하는 데이터 형식. 파이썬에서는 json 모듈을 통해 JSON을 처리할 수 있음
  • json.dumps(): 파이썬 객체를 JSON 형식의 문자열로 변환
  • sorted(): lambda를 사용하여 가격과 이름 순으로 JSON 객체를 정렬
  • 문자열 결합: ",".join()을 통해 각 JSON 객체를 문자열로 결합하고, 대괄호를 추가하여 리스트 형식의 JSON 문자열로 만듦
import json

aa = [{"name": "eggs", "price": 1}, {"name": "coffee", "price": 9.99}, {"name": "rice", "price": 4.04}]
sorted_items = sorted(aa, key=lambda x: (x["price"], x["name"]))
joined_json = ",".join([json.dumps(i) for i in sorted_items])
final_json = "[" + joined_json + "]"

 
 

5. 날짜 형식 변환

1) 날짜 문자열을 일관된 형식(YYYYMMDD)으로 변환하기

  • 정규표현식 (Regular Expressions): re.match()를 사용하여 문자열 패턴을 검사
    • \d{4}/\d{2}/\d{2}: 연도-월-일 형식(YYYY/MM/DD)
    • \d{2}/\d{2}/\d{4}: 일-월-연도 형식(DD/MM/YYYY)
    • \d{2}-\d{2}-\d{4}: 월-일-연도 형식(MM-DD-YYYY)
import re
from datetime import datetime

def transform_date_format(dates):
    date_list = []
    for date_str in dates:
        if re.match(r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])", date_str):
            transformed_date = datetime.strptime(date_str, '%Y/%m/%d').strftime("%Y%m%d")
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
            transformed_date = datetime.strptime(date_str, '%d/%m/%Y').strftime("%Y%m%d")
        elif re.match(r"(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])-\d{4}", date_str):
            transformed_date = datetime.strptime(date_str, '%m-%d-%Y').strftime("%Y%m%d")
        date_list.append(transformed_date)
    return date_list

dates = ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
print(transform_date_format(dates))

 

6. 정규표현식을 활용한 패턴 매칭과 날짜 변환 자동화

  • 날짜 형식별 정규표현식을 리스트로 정의하여 각 날짜에 맞는 포맷으로 변환할 수 있음
  • 정규표현식 패턴 리스트: 각 날짜 형식에 해당하는 패턴을 리스트로 정리하고, match_pattern 함수에서 일치하는 패턴의 인덱스를 반환
  • transform_date_format 함수: 패턴 인덱스를 기반으로 변환할 날짜 형식을 선택하여 YYYYMMDD 형식으로 변환
import re
from datetime import datetime

def match_pattern(date_str):
    pattern_list = [
        r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])",
        r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}",
        r"(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])-\d{4}"
    ]
    return [bool(re.search(ptn, date_str)) for ptn in pattern_list].index(True)

def transform_date_format(dates):
    date_list = []
    for date_str in dates:
        pattern_idx = match_pattern(date_str)
        if pattern_idx == 0:
            date_list.append(datetime.strptime(date_str, '%Y/%m/%d').strftime('%Y%m%d'))
        elif pattern_idx == 1:
            date_list.append(datetime.strptime(date_str, '%d/%m/%Y').strftime('%Y%m%d'))
        elif pattern_idx == 2:
            date_list.append(datetime.strptime(date_str, '%m-%d-%Y').strftime('%Y%m%d'))
    return date_list

dates = ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
print(transform_date_format(dates))

 

 

📙 내일 일정

  • SQLAlchemy ORM 실습

 

 

 

 

 




 
 
 
 

'TIL _Today I Learned > 2024.10' 카테고리의 다른 글

[DAY 71] PySpark  (0) 2024.10.30
[DAY 70] SQLAlchemy ORM  (1) 2024.10.29
[DAY 68] 데이터 엔지니어링  (1) 2024.10.25
[DAY 67] AWS 아키텍처 그리기  (0) 2024.10.24
[DAY 66] AWS 클라우드 환경 이해 및 실습  (0) 2024.10.23