본문 바로가기
TIL _Today I Learned/2024.10

[DAY 71] PySpark

by gamdong2 2024. 10. 30.
[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.10.30

📕 학습 목록

  • PySpark 개요
  • Spark 컴포넌트
  • RDD
  • Spark 코드 실습

 

📗 기억할 내용

1. PySpark 이론 및 구성 요소

1) PySpark 개요

  • Apache Spark: 분산 데이터 처리를 위한 클러스터 컴퓨팅 프레임워크로, 대용량 데이터 분석과 머신러닝에 최적화됨
  • PySpark: Apache Spark의 Python API로, Spark의 기능을 Python에서 사용할 수 있게 함

2) Spark의 주요 컴포넌트

  • Spark Core: Spark의 기본 엔진, RDD(Resilient Distributed Dataset)를 사용하여 분산 데이터를 처리
  • Spark SQL: SQL과 DataFrame을 이용한 데이터 분석
  • Spark Streaming: 실시간 데이터 스트리밍 분석
  • MLlib: 머신러닝 라이브러리, 다양한 알고리즘 제공
  • GraphX: 그래프와 그래프 병렬 처리를 위한 라이브러리

 

3) RDD (Resilient Distributed Dataset)

  • RDD: Spark Core의 핵심 데이터 구조로 불변성과 병렬 처리 지원
    • Transformation: 새로운 RDD를 생성 (지연 평가, map, filter, flatMap 등)
    • Action: 실제 연산을 수행해 결과를 반환 (collect, count, take 등)
    • Lazy Evaluation: Action이 호출될 때까지 Transformation이 지연 평가됨
  • RDD 생성: parallelize, textFile 메서드로 생성 가능

 

2. PySpark 문법

1) SparkSession 생성

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkApplication") \
    .getOrCreate()

 

 

2) DataFrame 생성 및 기본 연산

# DataFrame 생성
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# 열 선택 및 필터링
df.select("Name").show()
df.filter(df["Age"] > 30).show()

# 집계 및 그룹화
df.groupBy("Age").count().show()

 

3) SQL 사용

df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE Age > 30").show()

 

4. 주요 PySpark 함수와 모듈

  • f (functions): 데이터 변환 및 집계 함수 제공
  • t (types): 데이터 타입 지정 및 캐스팅
  • window: 윈도우 함수 지원으로 순위 및 구간별 분석 가능

1) 주요 함수 예제

  • f.col: 특정 열 참조
from pyspark.sql import functions as f
df.select(f.col("Age")).show()
  • t.IntegerType()와 cast: 열의 데이터 타입 변환
from pyspark.sql.types import IntegerType
df = df.withColumn("AgeInteger", f.col("Age").cast(IntegerType()))
  • f.regexp_extract: 정규 표현식을 이용해 문자열에서 특정 패턴 추출
# 숫자 부분만 추출 
df = df.withColumn("ExtractedData", f.regexp_extract(f.col("column_name"), r"\d+", 0))
  • f.regexp_replace: 정규 표현식을 이용해 문자열을 치환
# 숫자 부분을 빈 문자열로 치환하여 제거
df = df.withColumn("no_numbers", f.regexp_replace(f.col("column_name"), r"\d+", ""))
  • f.like: 특정 패턴이 포함되어 있는지 확인 (SQL의 LIKE 연산자와 유사)
# 'h'로 시작하는 문자열 여부 확인
df = df.withColumn("starts_with_h", f.col("column_name").like("h%"))
  • f.rlike: 정규 표현식을 사용한 패턴 일치 확인(f.like와 비슷하지만, 졍규 표현식 사용 가능)
# 숫자로 끝나는 문자열 여부 확인
df = df.withColumn("ends_with_number", f.col("text").rlike(r"\d+$"))
  • f.split: 정규 표현식을 기준으로 문자열을 분할
# 쉼표를 기준으로 문자열 분할
df = df.withColumn("fruit_list", f.split(f.col("fruits"), ","))
  • f.instr: 특정 문자열이 포함된 위치 변환
# 'is'가 시작하는 위치 반환 (없으면 0 반환)
df = df.withColumn("position_of_is", f.instr(f.col("text"), "is"))
  • f.substring: 문자열의 특정 위치부터 지정된 길이만큼 추출
# 2번째 위치부터 3개의 문자 추출
df = df.withColumn("substring_text", f.substring(f.col("text"), 2, 3))
  • f.length: 문자열의 길이를 반환
# 각 문자열의 길이 계산
df = df.withColumn("text_length", f.length(f.col("text")))

 

5. 윈도우 함수(Window Function)

1) 정의

  • 데이터의 특정 구간(윈도우) 별로 집계∙순위 계산을 할 때 사용하는 함수
  • 일반적인 집계 함수와 달리 행별로 집계된 값을 계산하면서도 원본행을 유지함
  • Spark는 이를 활용해 특정 범위 내에서 누적 합계, 순위, 평균 등을 계산할 수 있음

2) 구성 요소

  • 윈도우 함수를 사용할 때는 WindowSpec 이라는 윈도우 스펙을 정의함. 여기에는 다음 세 가지 요소가 들어감
    • PARTITION BY: 데이터를 그룹화하여 윈도우를 나눔. (예: department별로 윈도우를 나누기)
    • ORDER BY: 각 윈도우 안에서 정렬 기준을 정함. (예: salary 순서로 정렬)
    • ROWS BETWEEN: 윈도우의 범위를 지정. (예: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

 3) 윈도우 함수 종류

  • row_number(): 각 윈도우 내에서 순차적인 행 번호를 부여합니다.
  • rank(): 동일한 값이 있을 때 동일 순위를 부여하며, 다음 순위를 건너뜁니다.
  • dense_rank(): 동일한 값이 있을 때 동일 순위를 부여하지만, 다음 순위를 건너뛰지 않습니다.
  • sum(), avg(), max(), min(): 윈도우 내에서 누적 합계, 평균, 최대, 최소값을 계산합니다.

4) 장점

  • 구간별로 데이터 분석: 특정 조건에 따라 데이터 구간을 나누어 구간별 집계를 할 수 있어, 예를 들어 날짜별 누적 합계나 특정 구간의 평균을 구할 때 유용
  • 원본 데이터 보존: 일반적인 집계와 달리, 원본 행을 유지하면서도 집계된 값을 추가로 계산해줌

5) 윈도우 함수 예시

  • window: 그룹화 분석에 유용, 순위 함수 등과 결합 가능
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("Category").orderBy("Age")
df = df.withColumn("rank", f.rank().over(windowSpec))

 

6. 다양한 Transformation

  • withColumn: 새로운 열 추가 or 기존 열 변환
df = df.withColumn("NewColumn", f.lit("Value"))

 

  • distinct, union, intersect: 중복 제거, DataFrame 간 연산
df_distinct = df.distinct()
df_union = df1.union(df2)

 

7. 집계 함수와 조건 함수

  • 집계 함수: f.sum, f.avg, f.max
df.groupBy("Category").agg(f.sum("Age").alias("Total_Age")).show()
  • 조건 함수(조건에 따라 값 할당): f.when, f.otherwise
df = df.withColumn(
    "Category",
    f.when(f.col("Age") > 30, "Senior").otherwise("Junior")
)

 

8. PySpark 데이터 저장, 로드

  • CSV 파일 읽기∙쓰기
df = spark.read.option("header", True).csv("path/to/file.csv")
df.write.option("header", True).csv("path/to/output.csv")

 

  • Parquet 파일 읽기∙쓰기  
df = spark.read.parquet("path/to/file.parquet")
df.write.parquet("path/to/output.parquet")

 

9. 사용자 정의 함수 (UDF)

  • UDF 등록 및 사용
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def to_uppercase(text):
    return text.upper()

uppercase_udf = udf(to_uppercase, StringType())
df = df.withColumn("UppercaseName", uppercase_udf(f.col("Name")))

 

10. 실시간 데이터 처리 (Spark Streaming)

  • 소켓 스트림 생성 및 처리
streaming_df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
query = streaming_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

 

 

📘코드 실습

[Spark DataFrame API]

1. SPARK 환경 설정 및 Spark 세션 연결

  • Spark 환경을 설정하여 DataFrame을 사용할 수 있는 Spark 세션을 생성
[tip] 세션(Session)
• Session: 프로그램이 실행되는 동안 프로그램과 사용자 간의 연결을 유지하는 상태
• Spark Session: Spark 애플리케이션의 진입점(entry point) 역할을 하는 객체. Spark를 사용하기위한 모든 설정을 포함함. Spark Session이 생성되면 Spark 애플리케이션이 데이터를 로드하고, 처리하고, 저장할 수 있는 환경이 만들어짐
# JDK 설치 및 Spark 파일 다운로드, 압축 해제
!apt-get install openjdk-8-jdk-headless  # JDK 설치
!wget -q http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz  # Spark 파일 다운로드
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz  # 압축 해제
!pip install findspark  # PySpark 사용을 위한 패키지 설치

# 환경 변수 설정
import os
import findspark

# JAVA_HOME과 SPARK_HOME 설정하여 Java 및 Spark 경로를 지정
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['SPARK_HOME'] = '/content/spark-3.0.0-bin-hadoop3.2'
findspark.init()  # Spark 환경 초기화

# SparkSession 생성
from pyspark.sql import SparkSession

# 로컬 모드에서 Spark 세션 시작
spark = SparkSession.builder \
        .master("local") \
        .appName("Colab") \
        .getOrCreate()

# SparkSession 설정 확인
spark.sparkContext.getConf().getAll()  # 현재 설정된 Spark 환경 변수 확인

 

2. 데이터 읽기

  • 데이터 파일을 읽어 Spark DataFrame 으로 변환. header, inferSchema, mode 옵션을 통해 데이터 읽기 방식을 설정할 수 있음
# CSV 파일을 읽어와 DataFrame으로 변환
df = spark.read.option("header", "true").csv('./sample_data/test1.csv')
df.show()  # DataFrame 내용 출력
df.printSchema()  # DataFrame의 스키마(컬럼명 및 타입) 출력
df.dtypes  # 각 컬럼의 데이터 타입 확인

 

3. 컬럼 다루기및  DataFrame 생성

1) 컬럼 선택∙삭제

 

  • 컬럼 선택: 특정 컬럼만 선택
  • 컬럼 삭제: 특정 컬럼을 삭제
df.select(['Name', 'age']).show()  # 'Name'과 'age' 컬럼만 선택하여 출력
df_drop = df.drop('Experience')  # 'Experience' 컬럼 삭제

 

 

2) 컬럼 추가∙변경 

 

  • withColumn: 새로운 컬럼을 추가하거나 기존 컬럼을 변환
  • exprselectExpr: 표현식을 사용해 계산된 컬럼 추가
  • withColumnRenamed: 컬럼명 변경
  • cast: 컬럼의 데이터 타입 변경
from pyspark.sql import functions as F
from pyspark.sql import types as T

# 새 컬럼 추가 ('age' 컬럼을 두 배로 곱하여 'new_column' 컬럼 생성)
df = df.withColumn('new_column', F.col('age') * 2)

# 'age' 컬럼의 타입을 Integer로 변경
df = df.withColumn('age', F.col('age').cast(T.IntegerType()))

 

 

3) DataFrame 생성

from pyspark.sql import Row

# 스키마 정의: 'aaa', 'bbb', 'ccc'라는 3개의 String 타입 컬럼을 포함하는 스키마
schema = T.StructType([
    T.StructField('aaa', T.StringType(), True),
    T.StructField('bbb', T.StringType(), True),
    T.StructField('ccc', T.StringType(), True)
])

# 데이터 준비: Row를 사용하여 데이터 생성
data = [Row('apple', 'banana', 'tomato'), Row('apple1', 'banana1', 'tomato1')]
df_new = spark.createDataFrame(data, schema)  # 정의한 스키마와 데이터로 DataFrame 생성
df_new.show()  # 생성된 DataFrame 출력

 

4, DataFrame 쓰기 및 옵션

  • DataFrame을 파일로 저장하는 여러 가지 옵션을 설정
# CSV 파일로 DataFrame 저장 - 기존 파일 덮어쓰기 모드
df.write.mode('overwrite').csv("output_path")

# 지정된 컬럼을 기준으로 파티션을 나눠서 저장
df.write.partitionBy("column_name").mode("overwrite").csv("output_path")

 

5. 결측치 다루기

1) 결측치 삭제∙채우기

 

  • dropna: 결측치가 포함된 행을 제거
  • fillna: 결측치를 특정 값으로 채움

 

df_na = df.na.drop()  # 모든 결측치가 포함된 행 제거
df_fill = df.fillna(value="10000", subset=['age', 'Experience'])  # 특정 컬럼의 결측치를 '10000'으로 채움

 

6. 필터링

  • 조건에 맞는 데이터를 필터링
# 'age'가 30보다 큰 행을 필터링하여 출력
df.filter(F.col('age') > 30).show()

# 'codename'이 '액세서리'이거나 'price'가 30 미만인 행을 필터링하여 출력
df.filter((F.col('codename') == '액세서리') | (F.col('price') < 30)).show()

 

7. Group By 및 집계

  • 집계 함수를 사용하여 데이터를 그룹화&요약
# 'codename'을 기준으로 그룹화하고, 'price'의 합계와 평균 계산
df.groupBy('codename').agg(
    F.sum('price').alias('total_price'),  # 'price' 컬럼 합계
    F.mean('price').alias('average_price')  # 'price' 컬럼 평균
).show()

 

8. 정렬 Order By

  • 데이터를 특정 열을 기준으로 오름차순 or 내림차순으로 정렬
# 'age' 컬럼을 기준으로 내림차순 정렬하여 출력
df.orderBy(F.col('age').desc()).show()

 

9. Join 과 Union

1) Join: 두 개의 DataFrame을 특정 조건에 맞게 결합

  • inner(기본값), left, right, outer
# 'Name' 컬럼을 기준으로 outer join 수행
df1.join(df2, df1.Name == df2.Name, 'outer').show()

2) Union: 컬럼이 동일한 두 DataFrame을 하나로 합침

# df1과 df2를 union으로 결합
df1.union(df2).show()

 

10. 윈도우 함수

  • 윈도우 함수를 사용하여 순위 계산, 누적 합계 등 구간별 분석을 수행
from pyspark.sql import window as W

# 'Name'을 기준으로 파티션 나누고, 'salary' 컬럼을 오름차순 정렬하는 윈도우 사양 생성
window_spec = W.Window.partitionBy("Name").orderBy("salary")

# row_number() 함수를 사용해 순위를 계산하는 'rank' 컬럼 추가
df = df.withColumn("rank", F.row_number().over(window_spec))

 

11. 사용자 정의 함수 (UDF)

  • 사용자가 정의한 함수를 특정 컬럼에 적용하여 데이터를 가공할 수 있음
from pyspark.sql import types as T
from pyspark.sql.functions import udf

# 문자열을 대문자로 변환하는 함수 정의
def to_uppercase(text):
    return text.upper()

# UDF로 변환하여 'Name' 컬럼에 적용
uppercase_udf = udf(to_uppercase, T.StringType())
df = df.withColumn("upper_name", uppercase_udf(F.col("Name")))

 

12. 기타 함수

  • lit: 고정된 값을 열에 추가
  • split: 특정 구분자로 문자열을 분할
  • like & rlike: 문자열 패턴 매칭
# 'fixed_value'라는 고정된 값을 추가
df.withColumn('fixed_col', F.lit('fixed_value'))

# 'name' 컬럼을 공백 기준으로 나누고 첫 번째 단어를 'split_col'에 추가
df.withColumn('split_col', F.split(F.col('name'), ' ').getItem(0))

# 'codename'이 특정 패턴과 일치하는지 필터링
df.filter(F.col('codename').rlike('pattern'))

 

13. Spark SQL

  • Spark SQL을 사용하여 Spark DataFrame을 SQL 쿼리로 조작할 수 있음
# 테이블 생성 및 조회: DataFrame을 SQL 테이블로 등록하고, SQL 쿼리 실행
df.createOrReplaceTempView("temp_table")
spark.sql("SELECT * FROM temp

 

 

 

📙 내일 일정

  • 데이터 파이프라인 / PySpark 시험


 
 
 
 
 

'TIL _Today I Learned > 2024.10' 카테고리의 다른 글

[DAY 72] Data Pipeline 및 PySpark 시험  (0) 2024.10.31
[DAY 70] SQLAlchemy ORM  (1) 2024.10.29
[DAY 69] SQLAlchemy ORM  (1) 2024.10.28
[DAY 68] 데이터 엔지니어링  (1) 2024.10.25
[DAY 67] AWS 아키텍처 그리기  (0) 2024.10.24