[천재교육] 프로젝트 기반 빅데이터 서비스 개발자 양성 과정 9기
학습일 : 2024.10.30
📕 학습 목록
- PySpark 개요
- Spark 컴포넌트
- RDD
- Spark 코드 실습
📗 기억할 내용
1. PySpark 이론 및 구성 요소
1) PySpark 개요
- Apache Spark: 분산 데이터 처리를 위한 클러스터 컴퓨팅 프레임워크로, 대용량 데이터 분석과 머신러닝에 최적화됨
- PySpark: Apache Spark의 Python API로, Spark의 기능을 Python에서 사용할 수 있게 함
2) Spark의 주요 컴포넌트
- Spark Core: Spark의 기본 엔진, RDD(Resilient Distributed Dataset)를 사용하여 분산 데이터를 처리
- Spark SQL: SQL과 DataFrame을 이용한 데이터 분석
- Spark Streaming: 실시간 데이터 스트리밍 분석
- MLlib: 머신러닝 라이브러리, 다양한 알고리즘 제공
- GraphX: 그래프와 그래프 병렬 처리를 위한 라이브러리
3) RDD (Resilient Distributed Dataset)
- RDD: Spark Core의 핵심 데이터 구조로 불변성과 병렬 처리 지원
- Transformation: 새로운 RDD를 생성 (지연 평가, map, filter, flatMap 등)
- Action: 실제 연산을 수행해 결과를 반환 (collect, count, take 등)
- Lazy Evaluation: Action이 호출될 때까지 Transformation이 지연 평가됨
- RDD 생성: parallelize, textFile 메서드로 생성 가능
2. PySpark 문법
1) SparkSession 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkApplication") \
.getOrCreate()
2) DataFrame 생성 및 기본 연산
# DataFrame 생성
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# 열 선택 및 필터링
df.select("Name").show()
df.filter(df["Age"] > 30).show()
# 집계 및 그룹화
df.groupBy("Age").count().show()
3) SQL 사용
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE Age > 30").show()
4. 주요 PySpark 함수와 모듈
- f (functions): 데이터 변환 및 집계 함수 제공
- t (types): 데이터 타입 지정 및 캐스팅
- window: 윈도우 함수 지원으로 순위 및 구간별 분석 가능
1) 주요 함수 예제
- f.col: 특정 열 참조
from pyspark.sql import functions as f
df.select(f.col("Age")).show()
- t.IntegerType()와 cast: 열의 데이터 타입 변환
from pyspark.sql.types import IntegerType
df = df.withColumn("AgeInteger", f.col("Age").cast(IntegerType()))
- f.regexp_extract: 정규 표현식을 이용해 문자열에서 특정 패턴 추출
# 숫자 부분만 추출
df = df.withColumn("ExtractedData", f.regexp_extract(f.col("column_name"), r"\d+", 0))
- f.regexp_replace: 정규 표현식을 이용해 문자열을 치환
# 숫자 부분을 빈 문자열로 치환하여 제거
df = df.withColumn("no_numbers", f.regexp_replace(f.col("column_name"), r"\d+", ""))
- f.like: 특정 패턴이 포함되어 있는지 확인 (SQL의 LIKE 연산자와 유사)
# 'h'로 시작하는 문자열 여부 확인
df = df.withColumn("starts_with_h", f.col("column_name").like("h%"))
- f.rlike: 정규 표현식을 사용한 패턴 일치 확인(f.like와 비슷하지만, 졍규 표현식 사용 가능)
# 숫자로 끝나는 문자열 여부 확인
df = df.withColumn("ends_with_number", f.col("text").rlike(r"\d+$"))
- f.split: 정규 표현식을 기준으로 문자열을 분할
# 쉼표를 기준으로 문자열 분할
df = df.withColumn("fruit_list", f.split(f.col("fruits"), ","))
- f.instr: 특정 문자열이 포함된 위치 변환
# 'is'가 시작하는 위치 반환 (없으면 0 반환)
df = df.withColumn("position_of_is", f.instr(f.col("text"), "is"))
- f.substring: 문자열의 특정 위치부터 지정된 길이만큼 추출
# 2번째 위치부터 3개의 문자 추출
df = df.withColumn("substring_text", f.substring(f.col("text"), 2, 3))
- f.length: 문자열의 길이를 반환
# 각 문자열의 길이 계산
df = df.withColumn("text_length", f.length(f.col("text")))
5. 윈도우 함수(Window Function)
1) 정의
- 데이터의 특정 구간(윈도우) 별로 집계∙순위 계산을 할 때 사용하는 함수
- 일반적인 집계 함수와 달리 행별로 집계된 값을 계산하면서도 원본행을 유지함
- Spark는 이를 활용해 특정 범위 내에서 누적 합계, 순위, 평균 등을 계산할 수 있음
2) 구성 요소
- 윈도우 함수를 사용할 때는 WindowSpec 이라는 윈도우 스펙을 정의함. 여기에는 다음 세 가지 요소가 들어감
- PARTITION BY: 데이터를 그룹화하여 윈도우를 나눔. (예: department별로 윈도우를 나누기)
- ORDER BY: 각 윈도우 안에서 정렬 기준을 정함. (예: salary 순서로 정렬)
- ROWS BETWEEN: 윈도우의 범위를 지정. (예: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
3) 윈도우 함수 종류
- row_number(): 각 윈도우 내에서 순차적인 행 번호를 부여합니다.
- rank(): 동일한 값이 있을 때 동일 순위를 부여하며, 다음 순위를 건너뜁니다.
- dense_rank(): 동일한 값이 있을 때 동일 순위를 부여하지만, 다음 순위를 건너뛰지 않습니다.
- sum(), avg(), max(), min(): 윈도우 내에서 누적 합계, 평균, 최대, 최소값을 계산합니다.
4) 장점
- 구간별로 데이터 분석: 특정 조건에 따라 데이터 구간을 나누어 구간별 집계를 할 수 있어, 예를 들어 날짜별 누적 합계나 특정 구간의 평균을 구할 때 유용
- 원본 데이터 보존: 일반적인 집계와 달리, 원본 행을 유지하면서도 집계된 값을 추가로 계산해줌
5) 윈도우 함수 예시
- window: 그룹화 분석에 유용, 순위 함수 등과 결합 가능
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("Category").orderBy("Age")
df = df.withColumn("rank", f.rank().over(windowSpec))
6. 다양한 Transformation
- withColumn: 새로운 열 추가 or 기존 열 변환
df = df.withColumn("NewColumn", f.lit("Value"))
- distinct, union, intersect: 중복 제거, DataFrame 간 연산
df_distinct = df.distinct()
df_union = df1.union(df2)
7. 집계 함수와 조건 함수
- 집계 함수: f.sum, f.avg, f.max
df.groupBy("Category").agg(f.sum("Age").alias("Total_Age")).show()
- 조건 함수(조건에 따라 값 할당): f.when, f.otherwise
df = df.withColumn(
"Category",
f.when(f.col("Age") > 30, "Senior").otherwise("Junior")
)
8. PySpark 데이터 저장, 로드
- CSV 파일 읽기∙쓰기
df = spark.read.option("header", True).csv("path/to/file.csv")
df.write.option("header", True).csv("path/to/output.csv")
- Parquet 파일 읽기∙쓰기
df = spark.read.parquet("path/to/file.parquet")
df.write.parquet("path/to/output.parquet")
9. 사용자 정의 함수 (UDF)
- UDF 등록 및 사용
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def to_uppercase(text):
return text.upper()
uppercase_udf = udf(to_uppercase, StringType())
df = df.withColumn("UppercaseName", uppercase_udf(f.col("Name")))
10. 실시간 데이터 처리 (Spark Streaming)
- 소켓 스트림 생성 및 처리
streaming_df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
query = streaming_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
📘코드 실습
[Spark DataFrame API]
1. SPARK 환경 설정 및 Spark 세션 연결
- Spark 환경을 설정하여 DataFrame을 사용할 수 있는 Spark 세션을 생성
[tip] 세션(Session)
• Session: 프로그램이 실행되는 동안 프로그램과 사용자 간의 연결을 유지하는 상태
• Spark Session: Spark 애플리케이션의 진입점(entry point) 역할을 하는 객체. Spark를 사용하기위한 모든 설정을 포함함. Spark Session이 생성되면 Spark 애플리케이션이 데이터를 로드하고, 처리하고, 저장할 수 있는 환경이 만들어짐
# JDK 설치 및 Spark 파일 다운로드, 압축 해제
!apt-get install openjdk-8-jdk-headless # JDK 설치
!wget -q http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz # Spark 파일 다운로드
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz # 압축 해제
!pip install findspark # PySpark 사용을 위한 패키지 설치
# 환경 변수 설정
import os
import findspark
# JAVA_HOME과 SPARK_HOME 설정하여 Java 및 Spark 경로를 지정
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['SPARK_HOME'] = '/content/spark-3.0.0-bin-hadoop3.2'
findspark.init() # Spark 환경 초기화
# SparkSession 생성
from pyspark.sql import SparkSession
# 로컬 모드에서 Spark 세션 시작
spark = SparkSession.builder \
.master("local") \
.appName("Colab") \
.getOrCreate()
# SparkSession 설정 확인
spark.sparkContext.getConf().getAll() # 현재 설정된 Spark 환경 변수 확인
2. 데이터 읽기
- 데이터 파일을 읽어 Spark DataFrame 으로 변환. header, inferSchema, mode 옵션을 통해 데이터 읽기 방식을 설정할 수 있음
# CSV 파일을 읽어와 DataFrame으로 변환
df = spark.read.option("header", "true").csv('./sample_data/test1.csv')
df.show() # DataFrame 내용 출력
df.printSchema() # DataFrame의 스키마(컬럼명 및 타입) 출력
df.dtypes # 각 컬럼의 데이터 타입 확인
3. 컬럼 다루기및 DataFrame 생성
1) 컬럼 선택∙삭제
- 컬럼 선택: 특정 컬럼만 선택
- 컬럼 삭제: 특정 컬럼을 삭제
df.select(['Name', 'age']).show() # 'Name'과 'age' 컬럼만 선택하여 출력
df_drop = df.drop('Experience') # 'Experience' 컬럼 삭제
2) 컬럼 추가∙변경
- withColumn: 새로운 컬럼을 추가하거나 기존 컬럼을 변환
- expr 및 selectExpr: 표현식을 사용해 계산된 컬럼 추가
- withColumnRenamed: 컬럼명 변경
- cast: 컬럼의 데이터 타입 변경
from pyspark.sql import functions as F
from pyspark.sql import types as T
# 새 컬럼 추가 ('age' 컬럼을 두 배로 곱하여 'new_column' 컬럼 생성)
df = df.withColumn('new_column', F.col('age') * 2)
# 'age' 컬럼의 타입을 Integer로 변경
df = df.withColumn('age', F.col('age').cast(T.IntegerType()))
3) DataFrame 생성
from pyspark.sql import Row
# 스키마 정의: 'aaa', 'bbb', 'ccc'라는 3개의 String 타입 컬럼을 포함하는 스키마
schema = T.StructType([
T.StructField('aaa', T.StringType(), True),
T.StructField('bbb', T.StringType(), True),
T.StructField('ccc', T.StringType(), True)
])
# 데이터 준비: Row를 사용하여 데이터 생성
data = [Row('apple', 'banana', 'tomato'), Row('apple1', 'banana1', 'tomato1')]
df_new = spark.createDataFrame(data, schema) # 정의한 스키마와 데이터로 DataFrame 생성
df_new.show() # 생성된 DataFrame 출력
4, DataFrame 쓰기 및 옵션
- DataFrame을 파일로 저장하는 여러 가지 옵션을 설정
# CSV 파일로 DataFrame 저장 - 기존 파일 덮어쓰기 모드
df.write.mode('overwrite').csv("output_path")
# 지정된 컬럼을 기준으로 파티션을 나눠서 저장
df.write.partitionBy("column_name").mode("overwrite").csv("output_path")
5. 결측치 다루기
1) 결측치 삭제∙채우기
- dropna: 결측치가 포함된 행을 제거
- fillna: 결측치를 특정 값으로 채움
df_na = df.na.drop() # 모든 결측치가 포함된 행 제거
df_fill = df.fillna(value="10000", subset=['age', 'Experience']) # 특정 컬럼의 결측치를 '10000'으로 채움
6. 필터링
- 조건에 맞는 데이터를 필터링
# 'age'가 30보다 큰 행을 필터링하여 출력
df.filter(F.col('age') > 30).show()
# 'codename'이 '액세서리'이거나 'price'가 30 미만인 행을 필터링하여 출력
df.filter((F.col('codename') == '액세서리') | (F.col('price') < 30)).show()
7. Group By 및 집계
- 집계 함수를 사용하여 데이터를 그룹화&요약
# 'codename'을 기준으로 그룹화하고, 'price'의 합계와 평균 계산
df.groupBy('codename').agg(
F.sum('price').alias('total_price'), # 'price' 컬럼 합계
F.mean('price').alias('average_price') # 'price' 컬럼 평균
).show()
8. 정렬 Order By
- 데이터를 특정 열을 기준으로 오름차순 or 내림차순으로 정렬
# 'age' 컬럼을 기준으로 내림차순 정렬하여 출력
df.orderBy(F.col('age').desc()).show()
9. Join 과 Union
1) Join: 두 개의 DataFrame을 특정 조건에 맞게 결합
- inner(기본값), left, right, outer
# 'Name' 컬럼을 기준으로 outer join 수행
df1.join(df2, df1.Name == df2.Name, 'outer').show()
2) Union: 컬럼이 동일한 두 DataFrame을 하나로 합침
# df1과 df2를 union으로 결합
df1.union(df2).show()
10. 윈도우 함수
- 윈도우 함수를 사용하여 순위 계산, 누적 합계 등 구간별 분석을 수행
from pyspark.sql import window as W
# 'Name'을 기준으로 파티션 나누고, 'salary' 컬럼을 오름차순 정렬하는 윈도우 사양 생성
window_spec = W.Window.partitionBy("Name").orderBy("salary")
# row_number() 함수를 사용해 순위를 계산하는 'rank' 컬럼 추가
df = df.withColumn("rank", F.row_number().over(window_spec))
11. 사용자 정의 함수 (UDF)
- 사용자가 정의한 함수를 특정 컬럼에 적용하여 데이터를 가공할 수 있음
from pyspark.sql import types as T
from pyspark.sql.functions import udf
# 문자열을 대문자로 변환하는 함수 정의
def to_uppercase(text):
return text.upper()
# UDF로 변환하여 'Name' 컬럼에 적용
uppercase_udf = udf(to_uppercase, T.StringType())
df = df.withColumn("upper_name", uppercase_udf(F.col("Name")))
12. 기타 함수
- lit: 고정된 값을 열에 추가
- split: 특정 구분자로 문자열을 분할
- like & rlike: 문자열 패턴 매칭
# 'fixed_value'라는 고정된 값을 추가
df.withColumn('fixed_col', F.lit('fixed_value'))
# 'name' 컬럼을 공백 기준으로 나누고 첫 번째 단어를 'split_col'에 추가
df.withColumn('split_col', F.split(F.col('name'), ' ').getItem(0))
# 'codename'이 특정 패턴과 일치하는지 필터링
df.filter(F.col('codename').rlike('pattern'))
13. Spark SQL
- Spark SQL을 사용하여 Spark DataFrame을 SQL 쿼리로 조작할 수 있음
# 테이블 생성 및 조회: DataFrame을 SQL 테이블로 등록하고, SQL 쿼리 실행
df.createOrReplaceTempView("temp_table")
spark.sql("SELECT * FROM temp
📙 내일 일정
- 데이터 파이프라인 / PySpark 시험
'TIL _Today I Learned > 2024.10' 카테고리의 다른 글
[DAY 72] Data Pipeline 및 PySpark 시험 (0) | 2024.10.31 |
---|---|
[DAY 70] SQLAlchemy ORM (1) | 2024.10.29 |
[DAY 69] SQLAlchemy ORM (1) | 2024.10.28 |
[DAY 68] 데이터 엔지니어링 (1) | 2024.10.25 |
[DAY 67] AWS 아키텍처 그리기 (0) | 2024.10.24 |