[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.10.28
📕 학습 목록
- 기능별 파일로 모듈을 분리하여 저장 → 필요 시 해당 모듈을 호출
📗 기억할 내용
[유용한 VS Code 단축키]
Shortcut | Description |
Ctrl + D | 같은 단어 연속 선택 |
Ctrl + F | 현재 파일에서 단어 검색 |
Ctrl + Shift + F | 현재 디렉토리의 모든 파일에 대해 단어 검색 |
Ctrl + B | Explorer 창 on/off |
Alt + 방향키(위/아래) | 선택한 코드 블럭 위아래로 이동 |
Shift + Alt + 방향키(위/아래) | 선택한 코드 블럭 위아래로 복사 |
Ctrl + ~ | 터미널 창 on/off |
Ctrl + Shift + 5 | 터미널 창 split |
Ctrl + 2 | 작업 창 split |
F1 | Show Command Palette |
📘코드 실습
[모듈 생성 & 호출]
1. 모듈 및 공통 정보들을 각각의 파일 형태로 나누어 관리
1) 디렉토리 구조 및 역할
[각 파일의 역할]
1. 최상위 파일 및 폴더
- .env: 데이터베이스나 API 키와 같은 민감한 환경 변수를 저장
- app.py: 애플리케이션의 진입점 파일로, 서버를 실행하거나 특정 기능을 실행
- controller.py: ETL 파이프라인을 관리하는 컨트롤러 파일로, 데이터를 추출, 변환, 로드하고 임시 파일을 정리
- .ipynb: Jupyter Notebook 파일로, 분석, 테스트 또는 실험 코드가 포함되어 있음
- main.py: 애플리케이션의 메인 실행 파일로, 초기 설정과 함께 전체 워크플로우를 관리
- settings.py: 애플리케이션의 설정 파일로, 데이터베이스 연결 정보, 파일 경로와 같은 주요 설정 저장
2. dataset: 데이터셋 파일을 저장하는 폴더로, 분석이나 모델 학습을 위한 데이터를 보관
3. db: 데이터베이스 관련 파일을 모아놓은 폴더
- connector.py: 데이터베이스 연결 객체를 생성. DB에 연결하는 기능을 제공
- pgsql_query.py: PostgreSQL 데이터베이스에 사용되는 쿼리들을 저장
4. fakedata: 가상 데이터를 생성하고 삽입하는 기능을 제공하는 폴더
- create.py: 가상 데이터를 생성하는 로직을 포함
- insert.py: 생성된 가상 데이터를 데이터베이스에 삽입
- process.py: 가상 데이터를 처리하는 데 필요한 추가적인 로직을 포함
5. pipeline: ETL 파이프라인을 구성하는 각 단계의 파일을 포함
- extract.py: 데이터베이스로부터 데이터를 추출
- load.py: 변환된 데이터를 데이터베이스에 로드
- remove.py: ETL 작업 후 임시 파일이나 불필요한 데이터를 삭제하는 기능을 수행
- transform.py: 데이터를 변환하는 로직을 포함
6. temp_storage: 임시 파일을 저장하는 폴더로, ETL 과정에서 사용되는 임시 데이터를 보관
2) 파일 생성
① settings.py
# settings.py
import os # 운영 체제와의 상호 작용을 위해 os 모듈 임포트
import dotenv # .env 파일을 쉽게 로드하기 위해 dotenv 모듈 임포트
# .env 파일이 프로젝트 루트 디렉토리에 위치해 있을 경우,
# dotenv.load_dotenv() 함수로 경로를 명시하지 않아도 .env 파일을 자동으로 로드 가능함
# dotenv.find_dotenv()와 dotenv.load_dotenv()를 사용하지 않고도 .env 파일이 로드될 수 있음
# .env 파일 경로 찾기
env_path = dotenv.find_dotenv() # .env 파일의 위치를 자동으로 찾아 반환
dotenv.load_dotenv(env_path) # 찾은 경로에서 .env 파일 로드하여 환경 변수 설정
# temp_path 정보를 매번 Day2.ipynb 파일에 선언하는 번거로움을 줄이기 위해
# settings.py 파일에 저장하고 필요할 때마다 불러올 수 있도록 설정함
TEMP_PATH = "c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028\\temp_storage"
# 데이터베이스 설정을 위한 변수들 모음
DB_SETTINGS = {
"POSTGRES": {
'engine': os.getenv("POSTGRES_ENGINE"),
'orm_engine': os.getenv("POSTGRES_ENGINE"),
'host': os.getenv("POSTGRES_HOST"),
'database': os.getenv("POSTGRES_DB"),
'user': os.getenv("POSTGRES_USER"),
'password': os.getenv("POSTGRES_PASSWORD"),
'port': os.getenv("POSTGRES_PORT")
},
"KDT9": {
'engine': os.getenv("POSTGRES_ENGINE"),
'orm_engine': os.getenv("POSTGRES_ENGINE"),
'host': os.getenv("POSTGRES_HOST"),
'database': os.getenv("POSTGRES_DB_2"),
'user': os.getenv("POSTGRES_USER"),
'password': os.getenv("POSTGRES_PASSWORD"),
'port': os.getenv("POSTGRES_PORT")
}
}
② .env
# .env
POSTGRES_ENGINE = 'postgresql'
POSTGRES_USER = 'postgres'
POSTGRES_PASSWORD = '1234'
POSTGRES_HOST = '127.0.0.1'
POSTGRES_PORT = '5432'
POSTGRES_DB = 'postgres'
③ db>connector.py
# connector.py
import psycopg2 # psycopg2 라이브러리 - PostgreSQL에 연결하기 위한 라이브러리
import db.pgsql_query as postgresql_query # PostgreSQL 쿼리를 담고 있는 모듈 임포트
from sqlalchemy import create_engine # SQLAlchemy - ORM 연결을 위해 사용
class DBconnector:
def __init__(self, engine, orm_engine, host, database, user, password, port):
# DBconnector 클래스 초기화 메서드로, 데이터베이스 연결에 필요한 파라미터를 설정
self.engine = engine
self.orm_engine = orm_engine
self.conn_params = dict( # psycopg2를 위한 DB 연결 파라미터 설정
host=host,
dbname=database,
user=user,
password=password,
port=port
)
# SQLAlchemy 연결을 위한 연결 문자열 생성
self.orm_conn_params = (
f"{orm_engine}://{user}:{password}@{host}:{port}/{database}"
)
self.orm_connect() # ORM 연결을 설정
# PostgreSQL 엔진을 사용할 경우, PostgreSQL 연결과 쿼리 설정
if self.engine == 'postgresql':
self.connect = self.postgres_connect() # PostgreSQL 연결 설정
self.queries = postgresql_query.queries # PostgreSQL 쿼리 저장
# elif self.engine == 'mysql': # 다른 DB도 추가할 수 있음
def __enter__(self):
# with 문을 사용할 때 DB 연결 객체 반환
print("접속")
return self
def __exit__(self, exc_type, exc_value, traceback):
# with 문을 종료할 때 DB 연결을 닫음
self.conn.close()
print("종료")
def postgres_connect(self):
# psycopg2를 사용한 PostgreSQL 연결 설정
self.conn = psycopg2.connect(**self.conn_params)
return self
def orm_connect(self):
# SQLAlchemy 엔진을 사용해 ORM 연결 객체 생성
self.orm_conn = create_engine(self.orm_conn_params)
return self.orm_conn # SQLAlchemy 연결 객체를 반환하여, 외부에서 ORM 연결을 사용할 수 있게 함
def get_query(self, table_name):
# 지정된 테이블 이름에 대한 쿼리를 가져옴
_query = self.queries[table_name]
return _query
④ db>query.py
# pgsql_query.py
queries = {
#"lecture": "SELECT * FROM lecture",
# "tbl": "SELECT * FROM tbl LIMIT 5",
"fake": "SELECT * FROM fake"
}
⑤ pipeline>extract.py
# extract.py
import pandas as pd # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작
# Extract 함수
def extractor(db_connector, table_name):
# db_connector: 데이터베이스 연결 객체
# table_name: 데이터를 추출할 테이블 이름
with db_connector as connected:
# with 문을 사용해 db_connector의 연결을 관리 (자동으로 열고 닫힘)
try:
_query = connected.get_query(table_name) # 쿼리를 가져오는 함수 호출, `get_query` 메서드가 존재해야 함
con = connected.orm_conn # SQLAlchemy ORM 연결 객체
df = pd.read_sql(_query, con) # SQL 쿼리를 실행하고 결과를 데이터프레임으로 변환
return df # 성공 시 추출된 데이터를 데이터프레임 형태로 반환
except Exception as e:
print(f"Extract MSG: {e}") # 예외 발생 시 에러 메시지를 출력
return False # 실패 시 False 반환
⑥ pipeline>load.py
# load.py
import pandas as pd # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작
def loader(db_connector, df, table_name):
# DataFrame 데이터를 지정된 테이블에 로드하는 함수
# db_connector: 데이터베이스 연결 객체
# df: 로드할 데이터가 담긴 DataFrame
# table_name: 데이터를 삽입할 테이블 이름
with db_connector as connected:
# with 문을 사용해 db_connector의 연결을 관리 (자동으로 열고 닫힘)
try:
orm_conn = connected.orm_conn # SQLAlchemy ORM 연결 객체 가져오기
# 데이터프레임을 지정된 테이블에 삽입, 기존 테이블이 있을 경우 대체 (if_exists="replace")
df.to_sql(name=table_name, con=orm_conn, if_exists='replace', index=False)
return True # 데이터 삽입 성공 시 True 반환
except Exception as e:
print(f"Error MSG : {e}") # 예외 발생 시 에러 메시지를 출력
return False # 데이터 삽입 실패 시 False 반환
⑦ remove.py
# remove.py
import shutil # shutil 라이브러리 - 파일과 디렉토리를 조작하기 위한 라이브러리
import os # os 라이브러리 - 운영 체제의 파일 시스템을 조작하기 위한 라이브러리
# remover 함수
def remover(path):
# 지정된 경로에 있는 디렉토리를 삭제하고, 동일한 이름으로 빈 디렉토리를 생성하는 함수
# path: 삭제할 디렉토리 경로
try:
shutil.rmtree(path) # 지정된 경로의 디렉토리와 모든 하위 파일/폴더를 삭제
os.makedirs(path) # 동일한 경로에 빈 디렉토리를 다시 생성
return True # 성공적으로 수행된 경우 True 반환
except Exception as e:
print(f"Remover Error MSG: {e}") # 예외 발생 시 에러 메시지를 출력
return False # 실패 시 False 반환
⑧ transform.py
# transform.py
import os # os 라이브러리 - 운영 체제의 파일 시스템을 조작하기 위한 라이브러리
import pandas as pd # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작
# transformer 함수
def transformer(temp_path, batch_date, df, table_name):
# 데이터를 변환하여 임시 파일로 저장하는 함수
# temp_path: 임시 저장 폴더 경로
# batch_date: 배치 날짜 (형식: "YYYYMMDD")
# df: 저장할 데이터가 담긴 데이터프레임
# table_name: 저장할 파일 이름의 기준이 될 테이블 이름
path = create_path(temp_path, batch_date) # 날짜에 맞는 경로 생성
save_to_file(df, path, table_name) # 데이터프레임을 파일로 저장
return df # 데이터프레임 반환 (변환 후 확인 용도)
# create_path 함수
def create_path(temp_path, batch_date):
# 배치 날짜에 맞게 연도, 월, 일 디렉토리 구조를 생성하는 함수
# temp_path: 임시 저장 폴더 경로
# batch_date: 배치 날짜 (형식: "YYYYMMDD")
_y = batch_date[:4] # 연도 추출
_m = batch_date[4:6] # 월 추출
_d = batch_date[6:] # 일 추출
# "lecture/yyyy=YYYY/mm=MM/dd=DD" 형식으로 경로 생성
_path = os.path.join(temp_path, 'lecture', f'yyyy={_y}', f'mm={_m}', f'dd={_d}')
return _path # 생성된 경로 반환
# save_to_file 함수
def save_to_file(df, path, table_name):
# 데이터프레임을 지정한 경로와 파일 이름으로 저장하는 함수
# df: 저장할 데이터프레임
# path: 저장할 디렉토리 경로
# table_name: 파일 이름의 기준이 될 테이블 이름
if len(df) > 0: # 데이터프레임이 비어 있지 않은 경우에만 저장
os.makedirs(path, mode=0o777, exist_ok=True) # 지정한 경로 생성, 이미 있으면 생략
save_path = os.path.join(path, f'{table_name}.csv') # 파일 저장 경로 생성
df.to_csv(save_path, index=False) # 인덱스를 제외하고 CSV 파일로 저장
return True # 성공적으로 저장된 경우 True 반환
else:
print("EMPTY FILE") # 데이터프레임이 비어 있을 경우 경고 메시지 출력
return False # 데이터프레임이 비어 있는 경우 False 반환
3) connector.py & settings.py: 데이터베이스 연결
① 파일 임포트
from db.connector import DBconnector
from settings import DB_SETTINGS
DB_SETTINGS["POSTGRES"] # POSTGRES 데이터베이스 설정 정보를 불러옴
"""
{'host': '127.0.0.1',
'database': 'postgres',
'user': 'postgres',
'password': '1234',
'port': '5432'}
"""
② 데이터베이스 연결
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])
# **DB_SETTINGS["POSTGRES"]는 DB_SETTINGS["POSTGRES"] 딕셔너리를 펼쳐서 키워드 인자로 전달
# 예: DBconnector(host="127.0.0.1", database="mydatabase", user="username", ...)
# with 문을 사용하여 DBconnector를 통해 데이터베이스에 연결
with db_connector as connected:
# `connected.conn`은 데이터베이스 연결 객체를 참조
conn = connected.conn
# 커서 생성 - SQL 쿼리 실행에 사용됨
cursor = conn.cursor()
# `lecture` 테이블에서 최대 5개의 행을 선택하는 쿼리 실행
cursor.execute("SELECT * FROM lecture LIMIT 5")
# 쿼리 결과를 가져와 출력
print(cursor.fetchall())
# `cursor.fetchall()`은 쿼리 결과의 모든 행을 리스트 형태로 반환하고, 이를 출력함
"""
Start DBConnector!
Enter
[(6, 'Margaret', 1880, 'F', 1578), (7, 'Ida', 1880, 'F', 1472), (8, 'Alice', 1880, 'F', 1414), (9, 'Bertha', 1880, 'F', 1320), (10, 'Sarah', 1880, 'F', 1288)]
Exit
"""
[tip] 현재 작업 디렉터리 확인 → 파이썬의 모듈 검색 경로에 새로운 디렉터리 추가
- sys.path.append()를 사용하는 이유: Python은 기본적으로 다음 경로에서 모듈을 찾음
- 현재 스크립트가 실행되는 디렉터리
- Python 설치 경로에 포함된 표준 라이브러리 디렉터리
- PYTHONPATH 환경 변수에 지정된 경로
- 패키지 설치 경로 (site-packages)
이 범위 안에 모듈이 있다면 from 디렉터리 import 모듈처럼 바로 사용할 수 있음. 하지만, 위 경로에 포함되지 않은 디렉터리에 모듈이 있는 경우 sys.path.append()로 해당 디렉터리를 추가해 주어야 함
import os # 현재 작업 디렉터리(cwd; Current Working Directory) 확인 os.getcwd() """ 'c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028' """ # 새로운 경로를 파이썬 모듈 검색 경로에 추가 import sys # 새로운 경로를 Python 모듈 검색 경로에 추가 # sys.path: 파이썬이 모듈을 찾는 경로들이 저장된 리스트. import를 사용할 때 이 경로를 모듈 검색 범위에 포함시킴 sys.path.append(r"c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028") # 코드 활용 예시 # 241028 디렉터리 내의 module.py 파일이 있다면, 위에서 sys.path.append()를 통해 241028 경로를 추가했기 때문에 그 경로에 대해서도 모듈 검색을 수행함; 비로소 import 를 통해 module.py를 불러올 수 있음 import module
4) query.py: 쿼리 정보를 파일로 관리
① connector.py 에서 클래스 & settings.py 에서 데이터베이스 설정 정보 호출 → 인스턴스 생성 & 메서드호출
from db.connector import DBconnector # DBconnector 클래스 임포트
from settings import DB_SETTINGS # 데이터베이스 설정 정보 임포트
# get_query 로 쿼리 불러오기 예제
# DBconnector 클래스의 db_connector 인스턴스 생성 시, DB_SETTINGS['POSTGRES'] 딕셔너리의 키와 값이
# __init__ 메서드의 각 매개변수 (host, database, user, password, port)에 할당됨
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DBconnector 인스턴스 생성, POSTGRES 설정값을 사용해 초기화
db_connector.get_query('lecture') # 'lecture' 테이블에 해당하는 SQL 쿼리 가져오기
② pgsql_query 에서 쿼리 목록 호출 → for 문으로 쿼리를 하나씩 출력
from db.pgsql_query import queries
# for 문을 이용해 queries 안에 있는 각 쿼리를 하나씩 출력함
for tbl in queries.keys(): # queries 딕셔너리의 키(테이블 이름)를 하나씩 순회
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DBconnector 인스턴스 생성, POSTGRES 설정 값 사용
_query = db_connector.get_query(tbl) # 테이블 이름에 해당하는 쿼리 가져오기
print(_query) # 쿼리 출력
5) extract.py: 쿼리를 받아 데이터베이스에 조회 → 결과를 Pandas Dataframe 으로 변환
① connector.py 에서 클래스 호출 → 인스턴스 생성 → 데이터베이스연결 → 쿼리 실행 → 쿼리 결과를 데이터프레임으로 반환
from db.connector import DBconnector
from settings import DB_SETTINGS
import pandas as pd
# 데이터 불러오기
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DBconnector 인스턴스 생성, POSTGRES 설정 값으로 초기화
# 데이터베이스 연결 및 쿼리 실행
with db_connector as connected: # with문으로 안전하게 db_connector 객체 사용
_query = connected.get_query('lecture') # 'lecture' 테이블에 대한 SQL 쿼리 가져오기
con = connected.conn # 데이터베이스 연결 객체 가져오기
df = pd.read_sql(_query, con) # 쿼리 결과를 데이터프레임으로 읽어옴
# 결과 출력
print(df) # 데이터프레임 출력
print(type(df)) # 데이터프레임의 데이터 타입 출력 (예상 출력: <class 'pandas.core.frame.DataFrame'>)
"""
접속
종료
id name year gender count
0 6 Margaret 1880 F 1578
1 7 Ida 1880 F 1472
2 8 Alice 1880 F 1414
...
44 50 Josephine 1880 F 544
"""
② extractor.py 에서 함수(extractor) 호출 → 쿼리 결과로 얻어진 데이터프레임 반환
# extractor 함수 호출하여 'lecture' 테이블의 데이터를 가져옴
return_extractor = extractor(db_connector, 'lecture')
# 가져온 데이터프레임의 상위 5개 행 출력
return_extractor.head()
6) transform.py: 데이터를 로컬 저장소에 정해진 날짜 경로 형식으로 정리해 보관
[tip] transform.py의 역할 및 사용 환경별 기능
- Batch 날짜별 저장 경로 생성 및 데이터프레임 저장: 주어진 날짜(batch_date) 정보를 기반으로 연도/월/일 폴더 구조를 생성하고, 해당 경로에 데이터프레임을 CSV 파일로 저장
- 이행 환경에 따른 구성 가능성: 데이터 이동 또는 저장 방식에 따라 설정이 달라질 수 있음
- Database -> Staging Server -> Cloud / Database: 데이터가 데이터베이스에서 임시 저장소(스테이징 서버)로 이동한 후 최종 목적지인 클라우드나 다른 데이터베이스에 저장되는 경우
- Database -- Directory Connection -> Cloud / Database: 데이터베이스에서 바로 로컬 디렉토리로 연결하여 파일로 저장한 후, 클라우드나 데이터베이스로 이행하는 경우
- 목적지 데이터베이스의 성격에 따른 처리: 저장할 데이터베이스의 성격에 따라 transform.py에 추가적인 데이터 처리 기능을 포함할 수 있음. 다음과 같이 데이터베이스의 용도에 따라 다르게 처리될 수 있음
- Data Lake: 데이터가 거의 가공되지 않은 상태로 저장됨. 원시 데이터 형태를 유지함
- Data Warehouse: 데이터가 분석 용도로 활용되므로, 결측치나 공백 처리 등 간단한 전처리가 필요할 수 있음
- Data Mart: 특정 부서나 업무에서 사용할 목적의 데이터 저장소. group by나 필터링을 통해 더 구체적이고 목적에 맞는 데이터로 처리되어야 함
① 저장 경로 생성
- Database 이름/Table 이름/yyyy={}/mm={}/dd={}/{table_name}.csv
- ex: postgres/lecture/2024/10/28/lecture.csv`
import pandas as pd
from db.connector import DBconnector
from settings import DB_SETTINGS
from pipeline.extract import extractor
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DBconnector 인스턴스 생성, POSTGRES 설정 값으로 초기화
table_name = 'lecture' # 데이터를 가져올 테이블 이름 지정
# extractor 함수로 데이터 추출
return_extractor = extractor(db_connector, table_name)
# 데이터가 정상적으로 추출되었는지 확인 후 출력
if isinstance(return_extractor, pd.DataFrame): # 추출 결과가 데이터프레임인지 확인
print(return_extractor.head()) # 데이터프레임 상위 5개 행 출력
else:
print("데이터를 가져오지 못했습니다.") # 데이터가 없거나 오류 발생 시 메시지 출력
② Batch 날짜 설정
# Batch 날짜 설정 방법 1
from datetime import datetime
batch_date = datetime.now() # 현재 날짜와 시간 저장
format_date = batch_date.strftime('%Y%m%d') # 'YYYYMMDD' 형식의 문자열로 변환
# 연도, 월, 일을 문자열에서 슬라이싱하여 각각 변수에 저장
_y = format_date[:4] # 연도 추출 ('YYYY')
_m = format_date[4:6] # 월 추출 ('MM')
_d = format_date[6:] # 일 추출 ('DD')
_y, _m, _d # 연도, 월, 일 출력
# Batch 날짜 설정 방법 2
f"{batch_date: %Y}", f"{batch_date: %m}", f"{batch_date: %d}" # 연도, 월, 일을 각각 포맷팅하여 추출
③ 현재 작업 디렉토리 확인 → 경로 추가하여 최종 경로(_path) 생성
import os
os.getcwd() # 현재 작업 디렉토리 경로 출력
# "현재 위치 + temp_storage" 경로를 변수에 저장
temp_path = "c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028\\temp_storage" # 기본 저장 경로 지정
# temp_path 뒤에 'postgres'와 'lecture' 디렉토리를 추가하여 새로운 경로 생성
_path = os.path.join(temp_path, 'postgres', 'lecture') # 지정한 경로들을 결합하여 _path에 저장
_path # 최종 경로 출력
④ transform.py 에서 함수(create_path) 호출 → 경로(path) 생성
import os
from datetime import datetime
from transform import create_path
# 기본 저장 경로 설정
temp_path = "c:\\Users\\user\\Desktop\\yura\\Hadoop\\241028\\temp_storage"
# 현재 날짜를 'YYYYMMDD' 형식으로 저장
batch_date = datetime.now().strftime('%Y%m%d')
# 경로 생성
path = create_path(temp_path, batch_date) # transform.py의 create_path 함수 사용
⑤ transform.py 에서 함수(save_to_file) 호출 → 데이터프레임(df)을 csv / json / parquet 로 저장
import pandas as pd
from transform import save_to_file
# mode=777: 리눅스 명령어로 모든 권한(read/write 등) 부여
os.makedirs(path, mode=777, exist_ok=True) # exist_ok=True로 폴더가 있어도 재생성하지 않음
# 예제 데이터프레임 생성
df = return_extractor
# CSV 파일 저장 (save_to_file 함수 활용)
save_to_file(df, path, 'lecture') # CSV 형식으로 'lecture.csv' 파일 저장
# JSON 파일 저장
save_path_json = os.path.join(path, 'lecture.json')
df.to_json(save_path_json, orient='records', indent=4, force_ascii=False) # JSON 형식으로 저장
# Parquet 파일 저장
save_path_parquet = os.path.join(path, 'lecture.parquet')
df.to_parquet(save_path_parquet, engine='pyarrow', compression='gzip', index=False) # Parquet 형식으로 저장
⑥ transform.py 의 함수(transformer) 호출 → 경로 생성 & 해당 경로에 데이터프레임 저장
from transform import transformer
# 테이블 이름 설정
table_name = 'lecture'
# transformer 함수 호출하여 경로 생성 및 데이터프레임 저장 통합 수행
result = transformer(temp_path, batch_date, df, table_name)
# 결과 확인
if result:
print("데이터가 성공적으로 저장되었습니다.")
else:
print("데이터 저장에 실패하였습니다.")
⑦ 데이터 추출 → 경로 생성 → 저장
[tip] 코드 실행 흐름
1. DBconnector 객체 생성
- connector.py의 DBconnector 클래스를 사용해 db_connector 객체를 생성
- settings.py에서 호출한 DB_SETTINGS['POSTGRES'] 설정을 db_connector 객체에 전달하여 PostgreSQL 데이터베이스와의 연결을 설정
2. 데이터 추출 (extractor)
- extract.py 모듈의 extractor 함수를 사용하여 db_connector와 table_name('lecture')을 통해 데이터베이스 쿼리를 실행
- 쿼리 결과를 return_extractor 변수에 데이터프레임 형태로 저장
3. 조건문으로 데이터 확인
- return_extractor가 None이 아니며 비어 있지 않을 경우(not return_extractor.empty)에만 다음 단계로 진행
- 추출된 데이터프레임이 비었거나 데이터 추출이 실패한 경우 "DataFrame이 비었거나 데이터 추출에 실패했습니다." 메시지를 출력
4. 데이터 저장 (transformer)
- 추출된 데이터프레임이 유효할 경우, transform.py의 transformer 함수를 사용하여 TEMP_PATH와 batch_date를 기반으로 저장 경로를 설정하고 데이터프레임을 지정된 파일 형식으로 저장
- transformer 함수는 TEMP_PATH, batch_date, return_extractor(데이터프레임), table_name을 매개변수로 받아 경로 설정과 데이터프레임 저장을 통합하여 수행
# 모듈 불러오기
from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from datetime import datetime
# DBconnector 객체 생성 및 설정값 적용
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DB_SETTINGS의 POSTGRES 설정으로 DBconnector 초기화
table_name = 'lecture' # 데이터 추출 대상 테이블 이름 지정
batch_date = datetime.now().strftime('%Y%m%d') # 현재 날짜를 'YYYYMMDD' 형식으로 저장
# 데이터 추출
return_extractor = extractor(db_connector, table_name) # extractor 함수 호출하여 데이터베이스에서 데이터 추출
# 추출된 데이터프레임 반환 (데이터 추출 성공 시 데이터프레임, 실패 시 None)
# 추출된 데이터 유효성 검사 및 저장
if return_extractor is not None and not return_extractor.empty: # 데이터프레임이 비어있지 않으면 저장 단계로 진행
# transformer 함수 호출하여 경로 생성 및 데이터프레임 저장 수행
return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name)
else:
# 추출된 데이터가 없거나 실패 시 메시지 출력
print("DataFrame이 비었거나 데이터 추출에 실패했습니다.")
7) load.py: 만들어진 파일을 특정 저장소에 적재
① 데이터베이스 연결
# DB 세팅 정보 설정
engine = 'postgresql'
user = 'postgres'
password = '1234'
host = '127.0.0.1'
port = '5432'
database = 'postgres'
# 데이터베이스 연결 생성
db = create_engine(f'{engine}://{user}:{password}@{host}:{port}/{database}') # SQLAlchemy 엔진을 사용해 데이터베이스 연결 설정
db # 연결 객체 출력하여 확인
② csv 파일을 판다스 데이터프레임으로 저장 & 연결된 데이터베이스에 데이터프레임 저장
from sqlalchemy import create_engine # SQLAlchemy 모듈에서 create_engine 함수를 임포트하여 데이터베이스 연결 설정
# DB 세팅 정보 설정
engine = 'postgresql'
user = 'postgres'
password = '1234'
host = '127.0.0.1'
port = '5432'
database = 'postgres'
# 데이터베이스 연결 생성
db = create_engine(f'{engine}://{user}:{password}@{host}:{port}/{database}') # SQLAlchemy 엔진을 사용해 데이터베이스 연결 설정
db # 연결 객체 출력하여 확인
import pandas as pd # 데이터 처리를 위한 pandas 라이브러리 임포트
# CSV 파일 불러오기
df = pd.read_csv('./dataset/data-01/names.csv') # 데이터셋 파일을 읽어 데이터프레임 생성
df.head() # 데이터프레임의 상위 5개 행 출력하여 확인
# 데이터프레임의 데이터 타입 확인
df.dtypes # 각 열의 데이터 타입을 확인하여 출력
# 데이터베이스에 데이터프레임 저장
df.to_sql(name='point', con=db, if_exists='replace') # 데이터프레임을 'point' 테이블로 데이터베이스에 저장
# if_exists 옵션 설명
# 'fail': 테이블이 이미 존재하면 실패
# 'append': 테이블이 존재하면 기존 테이블 아래에 데이터 추가
# 'replace': 테이블이 존재하면 기존 테이블을 덮어씌움 (교체)
③ load.py 에서 함수(loader) 호출 → 테이블(lecture)에 데이터 적재
[tip] 코드 실행 흐름
1. 데이터베이스 연결 및 설정
- DBconnector 객체를 생성하고 PostgreSQL 설정(DB_SETTINGS['POSTGRES'])을 적용하여 데이터베이스 연결을 설정
- table_name과 batch_date를 설정하여 데이터를 추출하고 저장할 대상과 날짜를 지정
2. 데이터 추출 (Extract)
- extractor 함수를 호출하여 table_name('lecture')에 해당하는 데이터를 추출하고, 추출된 데이터를 return_extractor에 저장
- 추출된 데이터프레임이 유효하지 않으면 이후 단계로 진행하지 않음
3. 데이터 변환 및 저장 경로 생성 (Transform)
- transformer 함수를 호출하여 TEMP_PATH와 batch_date를 기반으로 저장 경로를 생성하고, 데이터를 해당 경로에 저장
- 변환 후 저장된 데이터프레임은 return_transformer에 저장
4. 데이터 로드 (Load)
- loader 함수가 return_transformer를 db_connector 객체와 함께 사용하여 데이터베이스의 'lecture' 테이블에 저장
- loader 함수는 db_connector의 orm_conn을 사용하여 데이터프레임을 데이터베이스에 저장하며, if_exists 옵션을 통해 테이블이 이미 존재할 경우의 동작을 결정 (replace, append 등)
import pandas as pd
# 필요한 모듈 임포트
from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from datetime import datetime
# 데이터베이스 연결 설정 및 테이블 이름, 날짜 설정
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DB_SETTINGS의 POSTGRES 설정으로 DBconnector 초기화
table_name = 'lecture' # 데이터 추출 대상 테이블 이름 지정
batch_date = datetime.now().strftime('%Y%m%d') # 현재 날짜를 'YYYYMMDD' 형식으로 저장
# 데이터 추출 단계
return_extractor = extractor(db_connector, table_name) # extractor 함수 호출하여 'lecture' 테이블의 데이터를 추출
# 추출된 데이터프레임이 유효하지 않으면 이후 단계는 실행되지 않음
# 추출된 데이터 유효성 검사 및 변환 단계 실행
if return_extractor is not None and not return_extractor.empty: # 데이터가 유효한 경우에만 변환 단계로 진행
# transformer 함수 호출하여 TEMP_PATH와 batch_date에 맞는 저장 경로 생성 및 데이터프레임 저장
return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name)
# return_transformer에는 변환 후 데이터프레임이 저장됨 (CSV 파일 저장)
# 변환된 데이터 유효성 검사 및 로드 단계 실행
if return_transformer is not None and not return_transformer.empty: # 변환된 데이터가 유효한 경우에만 로드 단계로 진행
# loader 함수 호출하여 변환된 데이터를 PostgreSQL 데이터베이스의 'lecture' 테이블로 로드
return_loader = loader(db_connector, return_transformer, table_name)
# loader 함수는 DBconnector의 orm_conn 연결 객체를 사용해 데이터프레임을 SQL 테이블로 저장함
8) remove.py: 임시 저장(load)된 파일을 자동으로 삭제
① connector.py에서 클래스(DBconnector) 호출 → settings.py의 데이터베이스 설정값을 토대로 데이터베이스 연결
import pandas as pd
from datetime import datetime
from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from pipeline.remove import remover
# DB 연결 및 테이블 이름, 날짜 설정
db_connector = DBconnector(**DB_SETTINGS['POSTGRES']) # DB_SETTINGS의 POSTGRES 설정으로 DBconnector 초기화
table_name = 'lecture' # 데이터 추출 대상 테이블 이름 지정
batch_date = datetime.now().strftime('%Y%m%d') # 현재 날짜를 'YYYYMMDD' 형식으로 저장
② remove.py에서 함수(remover) 호출 → 기존 경로 삭제 및 재생성
# 기존 TEMP_PATH 삭제 및 재생성
remover(TEMP_PATH) # TEMP_PATH 경로를 삭제하고 새로 생성
③ extract.py에서 함수(extractor) 호출 → 테이블(lecture)의 데이터 추출
# 데이터 추출 단계
return_extractor = extractor(db_connector, table_name) # extractor 함수 호출하여 'lecture' 테이블의 데이터 추출
# 추출된 데이터 유효성 검사 및 변환 단계 실행
if return_extractor is not None and not return_extractor.empty: # 데이터가 유효한 경우에만 변환 단계로 진행
return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name) # transformer 함수 호출하여 경로 생성 및 저장
# 변환된 데이터 유효성 검사 및 로드 단계 실행
if return_transformer is not None and not return_transformer.empty: # 변환된 데이터가 유효한 경우에만 로드 단계로 진행
return_loader = loader(db_connector, return_transformer, table_name) # loader 함수 호출하여 데이터베이스에 데이터 저장
[파이썬 심화]
1. 데이터 구조와 관련 메서드
1) 딕셔너리 (Dictionary)
- list(aa): 딕셔너리의 키 값을 리스트 형태로 변환
aa = {"aa": 1, "bb": 2}
print(list(aa)) # ['aa', 'bb']
2. 모듈 동적 임포트
- __import__: 문자열을 통해 모듈을 동적으로 불러올 수 있음. 여기서는 db.pgsql_query 모듈의 queries 속성을 가져옴
queries = __import__("db.pgsql_query", fromlist=[""])
print(queries.queries)
3. 클래스와 객체지향 프로그래밍(OOP)
1) 클래스 예제: 화물선 클래스
- 클래스: 데이터를 처리하는데 필요한 속성과 메서드를 포함한 객체를 정의하는 방법
- __init__: 클래스의 생성자로 객체 생성 시 초기화하는 메서드
- self: 클래스 내에서 객체 자신을 참조하기 위해 사용
- unload, can_depart, load: 화물선의 상태를 조작하는 메서드들
class Cargoship:
def __init__(self, capacity):
self.cargo = []
self.capacity = capacity
def unload(self, port):
port_list = [p[0] for p in self.cargo]
if port in port_list:
unloaded = [i for i in self.cargo if i[0] == port]
return unloaded
else:
return None
def can_depart(self):
_depart = True if sum([i[1] for i in self.cargo]) <= self.capacity else False
return _depart
def load(self, new_cargo):
self.cargo = new_cargo
4. JSON 데이터 처리
- JSON(JavaScript Object Notation): 데이터를 키-값 형태로 저장하는 데이터 형식. 파이썬에서는 json 모듈을 통해 JSON을 처리할 수 있음
- json.dumps(): 파이썬 객체를 JSON 형식의 문자열로 변환
- sorted(): lambda를 사용하여 가격과 이름 순으로 JSON 객체를 정렬
- 문자열 결합: ",".join()을 통해 각 JSON 객체를 문자열로 결합하고, 대괄호를 추가하여 리스트 형식의 JSON 문자열로 만듦
import json
aa = [{"name": "eggs", "price": 1}, {"name": "coffee", "price": 9.99}, {"name": "rice", "price": 4.04}]
sorted_items = sorted(aa, key=lambda x: (x["price"], x["name"]))
joined_json = ",".join([json.dumps(i) for i in sorted_items])
final_json = "[" + joined_json + "]"
5. 날짜 형식 변환
1) 날짜 문자열을 일관된 형식(YYYYMMDD)으로 변환하기
- 정규표현식 (Regular Expressions): re.match()를 사용하여 문자열 패턴을 검사
- \d{4}/\d{2}/\d{2}: 연도-월-일 형식(YYYY/MM/DD)
- \d{2}/\d{2}/\d{4}: 일-월-연도 형식(DD/MM/YYYY)
- \d{2}-\d{2}-\d{4}: 월-일-연도 형식(MM-DD-YYYY)
import re
from datetime import datetime
def transform_date_format(dates):
date_list = []
for date_str in dates:
if re.match(r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])", date_str):
transformed_date = datetime.strptime(date_str, '%Y/%m/%d').strftime("%Y%m%d")
elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
transformed_date = datetime.strptime(date_str, '%d/%m/%Y').strftime("%Y%m%d")
elif re.match(r"(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])-\d{4}", date_str):
transformed_date = datetime.strptime(date_str, '%m-%d-%Y').strftime("%Y%m%d")
date_list.append(transformed_date)
return date_list
dates = ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
print(transform_date_format(dates))
6. 정규표현식을 활용한 패턴 매칭과 날짜 변환 자동화
- 날짜 형식별 정규표현식을 리스트로 정의하여 각 날짜에 맞는 포맷으로 변환할 수 있음
- 정규표현식 패턴 리스트: 각 날짜 형식에 해당하는 패턴을 리스트로 정리하고, match_pattern 함수에서 일치하는 패턴의 인덱스를 반환
- transform_date_format 함수: 패턴 인덱스를 기반으로 변환할 날짜 형식을 선택하여 YYYYMMDD 형식으로 변환
import re
from datetime import datetime
def match_pattern(date_str):
pattern_list = [
r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])",
r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}",
r"(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])-\d{4}"
]
return [bool(re.search(ptn, date_str)) for ptn in pattern_list].index(True)
def transform_date_format(dates):
date_list = []
for date_str in dates:
pattern_idx = match_pattern(date_str)
if pattern_idx == 0:
date_list.append(datetime.strptime(date_str, '%Y/%m/%d').strftime('%Y%m%d'))
elif pattern_idx == 1:
date_list.append(datetime.strptime(date_str, '%d/%m/%Y').strftime('%Y%m%d'))
elif pattern_idx == 2:
date_list.append(datetime.strptime(date_str, '%m-%d-%Y').strftime('%Y%m%d'))
return date_list
dates = ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
print(transform_date_format(dates))
📙 내일 일정
- SQLAlchemy ORM 실습
'TIL _Today I Learned > 2024.10' 카테고리의 다른 글
[DAY 71] PySpark (0) | 2024.10.30 |
---|---|
[DAY 70] SQLAlchemy ORM (1) | 2024.10.29 |
[DAY 68] 데이터 엔지니어링 (1) | 2024.10.25 |
[DAY 67] AWS 아키텍처 그리기 (0) | 2024.10.24 |
[DAY 66] AWS 클라우드 환경 이해 및 실습 (0) | 2024.10.23 |