[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.10.31
📕 학습 목록
- Data Pipeline 시험
- PySpark 시험
📗 기억할 내용
[Data Pipeline]
1. connect.py 수정
- DBconnector 클래스가 PostgreSQL과 MySQL을 모두 지원하도록 수정
- pymysql, psycopg2를 모두 사용할 수 있도록 둘다 임포트
- DBconnector 클래스의 초기화 메서드와 mysql_connect 메서드를 db_connect 메서드로 일반화하여 다양한 DB 엔진에 연결할 수 있도록 수정
# connect.py
import pymysql
import psycopg2 # PostgreSQL을 위한 라이브러리
import db.query as mysql_query
from sqlalchemy import create_engine
class DBconnector:
def __init__(self, engine, orm_engine, host, database, user, password, port):
self.engine = engine
self.orm_engine = orm_engine
self.conn_params = dict(
host=host,
db=database,
user=user,
password=password,
port=port
)
self.orm_conn_params = (
f"{orm_engine}://{user}:{password}@{host}:{port}/{database}"
)
self.conn = None # 초기화된 DB 연결 (pymysql 또는 psycopg2)
self.orm_conn = None # 초기화된 SQLAlchemy ORM 연결
self.queries = mysql_query.queries if engine == "mysql" else {}
def __enter__(self):
# with 문을 사용할 때 DB 연결 객체 반환
print("접속")
self.conn = self.db_connect() # DB 연결 객체 설정 (MySQL 또는 PostgreSQL)
return self
def __exit__(self, exc_type, exc_value, traceback):
# with 문을 종료할 때 DB 연결을 닫음
if self.conn and not self.conn.closed:
self.conn.close()
print("종료")
def db_connect(self):
# 지정된 엔진에 따라 MySQL 또는 PostgreSQL 연결 설정
if self.engine == "mysql":
return pymysql.connect(**self.conn_params) # pymysql 연결 객체 반환
elif self.engine == "postgresql":
return psycopg2.connect(**self.conn_params) # psycopg2 연결 객체 반환
else:
raise ValueError(f"{self.engine}은(는) 지원되지 않는 데이터베이스입니다.")
def orm_connect(self):
# SQLAlchemy 엔진을 사용해 ORM 연결 객체 생성
self.orm_conn = create_engine(self.orm_conn_params)
return self.orm_conn # SQLAlchemy 연결 객체 반환
def get_query(self, table_name):
# 지정된 테이블 이름에 대한 쿼리를 가져옴
try:
_query = self.queries[table_name]
return _query
except KeyError:
print(f"{table_name} 테이블에 대한 쿼리를 찾을 수 없습니다.")
return None
2. fake data 생성 함수 작성
from faker import Faker # Faker 라이브러리 - 가상 데이터를 생성하기 위해 사용
import shortuuid # shortuuid 라이브러리 - 짧은 UUID를 생성하기 위해 사용
import pandas as pd # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작
def create_fakedataframe(count: int) -> pd.DataFrame:
# count 수만큼 가상 사용자 데이터를 생성하고 리스트에 저장
fake_data_list = [create_fakeuser() for _ in range(count)]
return pd.DataFrame(fake_data_list) # 리스트를 데이터프레임으로 변환하여 반환
def create_fakeuser() -> dict:
fake = Faker("ko_KR") # 한국어 기반의 가상 데이터 생성을 위해 Faker 인스턴스 생성
fake_profile = fake.profile() # 가상 프로필 생성
key_list = ["name", "ssn", "job", "residence", "blood_group", "sex", "birthdate"]
fake_dict = {}
for key in key_list:
fake_dict[key] = fake_profile[key]
fake_dict["uuid"] = shortuuid.uuid() # 고유한 UUID 생성하여 추가
fake_dict["birthdate"] = fake_dict["birthdate"].strftime("%Y%m%d") # birthdate를 'YYYYMMDD' 형식으로 변환
return fake_dict
[PySpark]
* Spark 설정 및 데이터 불러오기
# JDK 설치 및 Spark 파일 다운로드, 압축 해제
!apt-get install openjdk-8-jdk-headless # JDK 설치
!wget -q http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz # Spark 파일 다운로드
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz # 압축 해제
!pip install findspark # PySpark 사용을 위한 패키지 설치
# 환경 변수 설정
import os
import findspark
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64" # JAVA 경로 설정
os.environ['SPARK_HOME'] = '/content/spark-3.0.0-bin-hadoop3.2' # Spark 경로 설정
findspark.init() # Spark 환경 초기화
# SparkSession 생성
from pyspark.sql import SparkSession, Row
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F
spark = SparkSession.builder \
.master("local") \
.appName("Colab") \
.getOrCreate()
# 데이터 불러오기
point_his = spark.read.parquet('./sample_data/point_his.parquet')
member_dup = spark.read.parquet('./sample_data/member_dup.parquet')
member = spark.read.parquet('./sample_data/member.parquet')
item_his = spark.read.parquet('./sample_data/item_his.parquet')
study_his = spark.read.parquet('./sample_data/study_his.parquet')
regdate = spark.read.parquet('./sample_data/regdate.parquet')
문제 1) 날짜 형식 변환 (yyyyMMdd → yyyy-MM-dd)
# point_his 테이블의 proc_ymd 컬럼을 yyyy-MM-dd 형식으로 변경
point_ymd = point_his.withColumn(
'proc_ymd',
F.date_format(F.to_date('proc_ymd', 'yyyyMMdd'), 'yyyy-MM-dd') # 날짜 형식 변환
)
point_ymd.show(5) # 결과 확인
문제 2) Null 값이 있는 레벨의 날짜 확인
# item_his 테이블에서 lv 컬럼이 null인 유저 필터링
item = item_his.filter(F.col('lv').isNull())
item = item.dropDuplicates(subset=['idx', 'proc_ymd']) # 중복 제거 (날짜별로 한 번만)
item.groupBy('proc_ymd').count().orderBy('count', ascending=False).show(1) # 날짜별 null 레벨 유저 수 집계, 가장 많은 날짜 확인
문제3) 특정 조건의 아이템 착용 빈도
# regdate, member, item_his 테이블을 활용하여 특정 날짜의 유료 회원 착용 아이템 분석
join_data = member.join(regdate, member.idx == regdate.idx, 'inner') \
.filter((F.col('status') == '유료회원') & (F.col('regdate') == '20230315')) \
.select(member["idx"], "status", "regdate")
reg_member_items = join_data.join(item_his, join_data["idx"] == item_his["idx"], "inner") \
.select(join_data["idx"], "status", "regdate", "codename")
top_items = reg_member_items.groupBy('codename') \
.agg(F.count('codename').alias('wear_count')) \
.orderBy(F.desc("wear_count")) \
.limit(3) # 가장 많이 착용한 아이템 상위 3개 선택
top_items.show()
문제4) 학년이 여러 개인 경우 가장 높은 학년만 남기기
- 랭크 함수
- row_number(): 데이터 순서대로 고유한 번호 부여 (중복 순위 없음)
- rank(): 중복 값이 있으면 건너뛰고 다음 번호 부여
- dense_rank(): 중복 값이 있어도 연속된 순위 부여
# grade에서 숫자 추출 후, 각 idx마다 높은 학년만 남겨 1:1 대응
member_grade = member_dup.withColumn('grade_reg', F.regexp_extract('grade', '\\d+', 0))
window_var = W.Window.partitionBy('idx').orderBy(F.col('grade_reg').desc())
member_match = member_grade.withColumn("rank", F.rank().over(window_var))
member_match.filter(F.col('rank') == 1).select('idx', 'grade', 'rank').show(10) # 가장 높은 학년만 남김
문제 5) 특정 레벨 구간에 해당하는 유저의 등록 날짜 확인
# 레벨이 151~160인 유저 필터링 후, 각 idx에서 가장 높은 레벨을 선택
item_df = item_his.withColumn('lv', F.col('lv').cast(T.IntegerType())) \
.filter((F.col('lv') >= 151) & (F.col('lv') <= 160))
window_var = W.Window.partitionBy('idx').orderBy(F.col('lv').desc())
item_df = item_df.withColumn("max_lv", F.row_number().over(window_var)) \
.filter(F.col('max_lv') == 1) \
.drop('max_lv')
# 각 idx에서 딱 한 번만 등록된 날짜 필터링
item_reg = item_df.join(regdate, "idx", "inner") \
.select("idx", "lv", "regdate")
res = item_reg.groupBy('regdate').count() \
.filter(F.col('count') == 1) \
.select('regdate')
res.show(10) # 등록 날짜 중복 없이 한 번만 등록된 날짜 출력
문제 6) Spark ML - 선형 회귀 분석 (Linear Regression)
# Spark ML 선형 회귀 모델 예제
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# 데이터 로드 및 구조 확인
df = spark.read.csv('./sample_data/california_housing_train.csv', header=True, inferSchema=True)
df.printSchema()
print('Shape of the dataset: ', (df.count(), len(df.columns)))
# 특징 열을 결합하여 'features' 열로 변환
featureassembler = VectorAssembler(
inputCols=['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income'],
outputCol='features'
)
output = featureassembler.transform(df)
final_data = output.select('features', 'median_house_value')
# 데이터 분할 (train/test)
train_data, test_data = final_data.randomSplit([0.75, 0.25], seed=42)
# 선형 회귀 모델 정의 및 학습
lr = LinearRegression(featuresCol='features', labelCol='median_house_value')
result_fit = lr.fit(train_data)
# 모델 평가 및 예측
pred_train = result_fit.evaluate(train_data)
pred = result_fit.evaluate(test_data)
# 평가 결과 출력
print("R2-Score for train set: ", pred_train.r2)
print("R2-Score for test set: ", pred.r2)
📙 내일 일정
- 면접 특강 및 모의 면접
'TIL _Today I Learned > 2024.10' 카테고리의 다른 글
[DAY 71] PySpark (0) | 2024.10.30 |
---|---|
[DAY 70] SQLAlchemy ORM (1) | 2024.10.29 |
[DAY 69] SQLAlchemy ORM (1) | 2024.10.28 |
[DAY 68] 데이터 엔지니어링 (1) | 2024.10.25 |
[DAY 67] AWS 아키텍처 그리기 (0) | 2024.10.24 |