본문 바로가기
TIL _Today I Learned/2024.10

[DAY 72] Data Pipeline 및 PySpark 시험

by gamdong2 2024. 10. 31.
[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.10.31

📕 학습 목록

  • Data Pipeline 시험
  • PySpark 시험

 

📗 기억할 내용

[Data Pipeline]

1. connect.py 수정

  • DBconnector 클래스가 PostgreSQL과 MySQL을 모두 지원하도록 수정
    • pymysql,  psycopg2를 모두 사용할 수 있도록 둘다 임포트
    • DBconnector 클래스의 초기화 메서드와 mysql_connect 메서드를 db_connect 메서드로 일반화하여 다양한 DB 엔진에 연결할 수 있도록 수정
# connect.py

import pymysql
import psycopg2  # PostgreSQL을 위한 라이브러리
import db.query as mysql_query
from sqlalchemy import create_engine

class DBconnector:
    def __init__(self, engine, orm_engine, host, database, user, password, port):
        self.engine = engine
        self.orm_engine = orm_engine
        self.conn_params = dict(
            host=host, 
            db=database, 
            user=user, 
            password=password, 
            port=port
        )
        self.orm_conn_params = (
            f"{orm_engine}://{user}:{password}@{host}:{port}/{database}"
        )
        self.conn = None  # 초기화된 DB 연결 (pymysql 또는 psycopg2)
        self.orm_conn = None  # 초기화된 SQLAlchemy ORM 연결
        self.queries = mysql_query.queries if engine == "mysql" else {}

    def __enter__(self):
        # with 문을 사용할 때 DB 연결 객체 반환
        print("접속")
        self.conn = self.db_connect()  # DB 연결 객체 설정 (MySQL 또는 PostgreSQL)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # with 문을 종료할 때 DB 연결을 닫음
        if self.conn and not self.conn.closed:
            self.conn.close()
            print("종료")

    def db_connect(self):
        # 지정된 엔진에 따라 MySQL 또는 PostgreSQL 연결 설정
        if self.engine == "mysql":
            return pymysql.connect(**self.conn_params)  # pymysql 연결 객체 반환
        elif self.engine == "postgresql":
            return psycopg2.connect(**self.conn_params)  # psycopg2 연결 객체 반환
        else:
            raise ValueError(f"{self.engine}은(는) 지원되지 않는 데이터베이스입니다.")

    def orm_connect(self):
        # SQLAlchemy 엔진을 사용해 ORM 연결 객체 생성
        self.orm_conn = create_engine(self.orm_conn_params)
        return self.orm_conn  # SQLAlchemy 연결 객체 반환

    def get_query(self, table_name):
        # 지정된 테이블 이름에 대한 쿼리를 가져옴
        try:
            _query = self.queries[table_name]
            return _query
        except KeyError:
            print(f"{table_name} 테이블에 대한 쿼리를 찾을 수 없습니다.")
            return None

 

2. fake data 생성 함수 작성

from faker import Faker  # Faker 라이브러리 - 가상 데이터를 생성하기 위해 사용
import shortuuid  # shortuuid 라이브러리 - 짧은 UUID를 생성하기 위해 사용
import pandas as pd  # Pandas 라이브러리 - 데이터프레임을 사용하여 데이터 조작

def create_fakedataframe(count: int) -> pd.DataFrame:  
    # count 수만큼 가상 사용자 데이터를 생성하고 리스트에 저장
    fake_data_list = [create_fakeuser() for _ in range(count)]
    return pd.DataFrame(fake_data_list)  # 리스트를 데이터프레임으로 변환하여 반환

def create_fakeuser() -> dict:  
    fake = Faker("ko_KR")  # 한국어 기반의 가상 데이터 생성을 위해 Faker 인스턴스 생성
    fake_profile = fake.profile()  # 가상 프로필 생성
    key_list = ["name", "ssn", "job", "residence", "blood_group", "sex", "birthdate"]

    fake_dict = {}
    for key in key_list:
        fake_dict[key] = fake_profile[key]

    fake_dict["uuid"] = shortuuid.uuid()  # 고유한 UUID 생성하여 추가
    fake_dict["birthdate"] = fake_dict["birthdate"].strftime("%Y%m%d")  # birthdate를 'YYYYMMDD' 형식으로 변환

    return fake_dict

 

 

[PySpark]

* Spark 설정 및 데이터 불러오기

# JDK 설치 및 Spark 파일 다운로드, 압축 해제
!apt-get install openjdk-8-jdk-headless  # JDK 설치
!wget -q http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz  # Spark 파일 다운로드
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz  # 압축 해제
!pip install findspark  # PySpark 사용을 위한 패키지 설치

# 환경 변수 설정
import os
import findspark

os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"  # JAVA 경로 설정
os.environ['SPARK_HOME'] = '/content/spark-3.0.0-bin-hadoop3.2'  # Spark 경로 설정
findspark.init()  # Spark 환경 초기화

# SparkSession 생성
from pyspark.sql import SparkSession, Row
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F

spark = SparkSession.builder \
        .master("local") \
        .appName("Colab") \
        .getOrCreate()

# 데이터 불러오기
point_his = spark.read.parquet('./sample_data/point_his.parquet')
member_dup = spark.read.parquet('./sample_data/member_dup.parquet')
member = spark.read.parquet('./sample_data/member.parquet')
item_his = spark.read.parquet('./sample_data/item_his.parquet')
study_his = spark.read.parquet('./sample_data/study_his.parquet')
regdate = spark.read.parquet('./sample_data/regdate.parquet')

 

문제 1) 날짜 형식 변환 (yyyyMMdd → yyyy-MM-dd)

# point_his 테이블의 proc_ymd 컬럼을 yyyy-MM-dd 형식으로 변경
point_ymd = point_his.withColumn(
    'proc_ymd', 
    F.date_format(F.to_date('proc_ymd', 'yyyyMMdd'), 'yyyy-MM-dd')  # 날짜 형식 변환
)
point_ymd.show(5)  # 결과 확인

 

문제 2) Null 값이 있는 레벨의 날짜 확인

# item_his 테이블에서 lv 컬럼이 null인 유저 필터링
item = item_his.filter(F.col('lv').isNull())
item = item.dropDuplicates(subset=['idx', 'proc_ymd'])  # 중복 제거 (날짜별로 한 번만)
item.groupBy('proc_ymd').count().orderBy('count', ascending=False).show(1)  # 날짜별 null 레벨 유저 수 집계, 가장 많은 날짜 확인

 

문제3) 특정 조건의 아이템 착용 빈도

# regdate, member, item_his 테이블을 활용하여 특정 날짜의 유료 회원 착용 아이템 분석
join_data = member.join(regdate, member.idx == regdate.idx, 'inner') \
                  .filter((F.col('status') == '유료회원') & (F.col('regdate') == '20230315')) \
                  .select(member["idx"], "status", "regdate")

reg_member_items = join_data.join(item_his, join_data["idx"] == item_his["idx"], "inner") \
                            .select(join_data["idx"], "status", "regdate", "codename")

top_items = reg_member_items.groupBy('codename') \
                            .agg(F.count('codename').alias('wear_count')) \
                            .orderBy(F.desc("wear_count")) \
                            .limit(3)  # 가장 많이 착용한 아이템 상위 3개 선택
top_items.show()

 

문제4) 학년이 여러 개인 경우 가장 높은 학년만 남기기

  • 랭크 함수
    • row_number(): 데이터 순서대로 고유한 번호 부여 (중복 순위 없음)
    • rank(): 중복 값이 있으면 건너뛰고 다음 번호 부여
    • dense_rank(): 중복 값이 있어도 연속된 순위 부여
# grade에서 숫자 추출 후, 각 idx마다 높은 학년만 남겨 1:1 대응
member_grade = member_dup.withColumn('grade_reg', F.regexp_extract('grade', '\\d+', 0))
window_var = W.Window.partitionBy('idx').orderBy(F.col('grade_reg').desc())
member_match = member_grade.withColumn("rank", F.rank().over(window_var))

member_match.filter(F.col('rank') == 1).select('idx', 'grade', 'rank').show(10)  # 가장 높은 학년만 남김

 

문제 5) 특정 레벨 구간에 해당하는 유저의 등록 날짜 확인

# 레벨이 151~160인 유저 필터링 후, 각 idx에서 가장 높은 레벨을 선택
item_df = item_his.withColumn('lv', F.col('lv').cast(T.IntegerType())) \
                  .filter((F.col('lv') >= 151) & (F.col('lv') <= 160))
window_var = W.Window.partitionBy('idx').orderBy(F.col('lv').desc())

item_df = item_df.withColumn("max_lv", F.row_number().over(window_var)) \
                 .filter(F.col('max_lv') == 1) \
                 .drop('max_lv')

# 각 idx에서 딱 한 번만 등록된 날짜 필터링
item_reg = item_df.join(regdate, "idx", "inner") \
                  .select("idx", "lv", "regdate")

res = item_reg.groupBy('regdate').count() \
              .filter(F.col('count') == 1) \
              .select('regdate')
res.show(10)  # 등록 날짜 중복 없이 한 번만 등록된 날짜 출력

 

문제 6) Spark ML - 선형 회귀 분석 (Linear Regression)

# Spark ML 선형 회귀 모델 예제

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# 데이터 로드 및 구조 확인
df = spark.read.csv('./sample_data/california_housing_train.csv', header=True, inferSchema=True)
df.printSchema()
print('Shape of the dataset: ', (df.count(), len(df.columns)))

# 특징 열을 결합하여 'features' 열로 변환
featureassembler = VectorAssembler(
    inputCols=['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income'], 
    outputCol='features'
)
output = featureassembler.transform(df)
final_data = output.select('features', 'median_house_value')

# 데이터 분할 (train/test)
train_data, test_data = final_data.randomSplit([0.75, 0.25], seed=42)

# 선형 회귀 모델 정의 및 학습
lr = LinearRegression(featuresCol='features', labelCol='median_house_value')
result_fit = lr.fit(train_data)

# 모델 평가 및 예측
pred_train = result_fit.evaluate(train_data)
pred = result_fit.evaluate(test_data)

# 평가 결과 출력
print("R2-Score for train set: ", pred_train.r2)
print("R2-Score for test set: ", pred.r2)

 

 

📙 내일 일정

  • 면접 특강 및 모의 면접


 
 
 
 
 

'TIL _Today I Learned > 2024.10' 카테고리의 다른 글

[DAY 71] PySpark  (0) 2024.10.30
[DAY 70] SQLAlchemy ORM  (1) 2024.10.29
[DAY 69] SQLAlchemy ORM  (1) 2024.10.28
[DAY 68] 데이터 엔지니어링  (1) 2024.10.25
[DAY 67] AWS 아키텍처 그리기  (0) 2024.10.24