앎을 경계하기

Machine Learning/Spark

[SPARK 완벽 가이드] Chapter 3 스파크 기능 둘러보기

양갱맨 2021. 4. 30. 15:02

3. 스파크 기능 둘러보기

앞 장에서 트랜스포메이션과 액션 등 스파크의 구조적 API와 관련된 핵심 개념을 배웠다.

스파크 구성

  • 저수준 API
  • 구조적 API
  • 표준 라이브러리

스파크의 라이브러리는 그래프 분석, 머신러닝, 스트리밍 등 다양한 작업을 지원한다.

컴퓨팅 및 저장 시스템과의 통합을 돕는 것도 스파크의 역할이다.

  • 3장에서 배울 내용
    • spark-submit 으로 운영용 애플리케이션 실행
    • Dataset : Type-safe를 제공하는 구조적 API
    • 구조적 스트리밍
    • 머신러닝과 고급 분석
    • RDD : 저수준 API
    • SparkR
    • 서드파트 패키지 에코시스템

3.1 운영용 애플리케이션 실행하기

spark-submit명령을 사용해 대화형 쉘에서 개발한 프로그램은 운영용 애플리케이션으로 쉽게 전환할 수 있다.

spark-submit은 애플리케이션실행에 필요한 자원과 실행 방식 그리고 다양한 옵션을 지정할 수 있다.

사용자는 스파크가 지원하는 언어로 애플리케이션을 개발한 다음 실행할 수 있다.

./bin/spark-submit\ --class org.apache.spark.examples.SparkPi\ --master local\ ./examples/jars/spark-examples_spark-examples_2.12-3.1.1.jar 10

spark-submit명령에 예제 class를 --class org.apache.spark.examples.SparkPi로 지정하고 로컬에서 실행되도록 --master local로 설정하였다.

실행에 필요한 JAR 파일과 인수 10도 함께 지정했다.

다음은 파이썬으로 작성한 애플리케이션을 실행하는 예제이다.

./bin/spark-submit\ --master local ./examples/src/main/python/pi.py 10

pi.py는 파이를 특정 자릿수까지 계산하는 예제이다.

spark-submit 옵션 중 master의 인수를 변경하면 스파크가 지원하는 클러스터 매니저에서 애플리케이션 실행이 가능해진다.

3.2 Dataset : type-safe를 제공하는 구조적 API

첫 번째로 설명할 API는 Dataset 이다.

Dataset은 JAVA, Scala의 statically typed code를 지원하기 위해 만들어진 구조적 API다.

정적 타입 코드 : 컴파일 시에 자료형을 결정함 동적 타입 코드 : 실행 시 자료형 결정

 

Dataset은 type-safe를 지원하며 동적 타입 언어인 Python과 R에서는 사용할 수 없다.

DataFrame은 여러 타입의 테이블형 데이터를 보관할 수 있는 Row 타입의 객체로 구성되어 있는 분산 컬렉션이다.

DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당하고 자바의 ArrayList 또는 스칼라의 Seq 객체 등 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공한다.

  • Dataset 클래스
    • JAVA에서는 Dataset<T>, Scala에서는 Dataset[T]로 표시
    • 내부 객체의 데이터 타입을 매개변수로 사용하는데, 예를 들어 Dataset<Person>이면 Person 클래스의 인스턴스만 가질 수 있다.
    • 스파크가 이런 타입을 제한적으로 사용할 수 밖에 없는 이유는 자동으로 type T를 분석해서 Dataset의 테이블 형식 데이터에 적합한 스키마를 생성해야 하기 때문이다.
    • Dataset은 필요한 경우에 선택적으로 사용할 수 있다는 장점이 있다.
      1. 데이터 타입 정의하기
        case class Flight(DEST_COUNTRY_NAME : String, 									ORIGIN_COUNTRY_NAME : String, 									count : BigInt) val flightsDF = spark.read.parquet("spark_ex/data/flight-data/parquet/2010-summary.parquet/") val flights = flightsDF.as[Flight]
        두 번째 줄 명령을 실행하여 반환된 결과가 DataFrame을 확인할 수 있다.
      2. 이처럼 스파크는 결과를 자동으로 DataFrame으로 변환해서 반환한다.
    • collect(), take()를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다. 즉 코드 변경없이 type-safe를 보장할 수 있다.
      flights .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada") .map(flight_row => flight_row) .take(5)  res0: Array[Flight] = Array(Flight(United States,Romania,1), Flight(United States,Ireland,264), Flight(United States,India,69), Flight(Egypt,United States,24), Flight(Equatorial Guinea,United States,1))   flights .take(5) .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada") .map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))  res3: Array[Flight] = Array(Flight(United States,Romania,6), Flight(United States,Ireland,269), Flight(United States,India,74), Flight(Egypt,United States,29), Flight(Equatorial Guinea,United States,6))
      flightsDF와 flight의 반환 타입이 다른 것을 확인할 수 있다.
    • 정확한 이해는 잘 안되지만.. 뒷장에서 더 자세히 Dataset에 대해 알아본다고 한다.

3.3 구조적 스트리밍

  • 구조적 스트리밍 : 안정화된 스트림 처리용 고수준 API
  • 구조적 스트리밍 사용 시, 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행가능하다.
  • 지연 시간을 줄이고 증분 처리가 가능하다.
  • 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다.

구조적 스트리밍 만들기 예제

데이터셋 : 리테일 데이터셋

특정 날짜, 시간 정보

하루치 데이터 by-day directory의 파일 사용

>>> staticDataFrame = spark.read.format("csv")\ ... .option("header", "true")\ ... .option("inferSchema", "true")\ ... .load("spark_ex/data/retail-data/by-day/*.csv")  >>> staticDataFrame.createOrReplaceTempView("retail_data") >>> staticSchema = staticDataFrame.schema  >>> staticDataFrame DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]  >>> staticSchema StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

시계열 데이터를 다루기 때문에 데이터를 그룹화하고 집계하는 방법을 알아보려고 한다.

특정 고객(CustomerID)이 대량으로 구매하는 영업시간을 살펴본다.

총 구매비용 컬럼을 추가하고 각 고객이 가장 많이 소비한 날을 찾아본다.

window function은 집계 시 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 갖는 윈도우를 구성한다.

시계열 컬럼이란? 시간에 따라 변화하는 값이 들어있는 컬럼

 

윈도우는 간격을 통해 처리 요건을 명시할 수 있기 때문에 날짜, 타임스탬프 처리에 유용하다.

>>> from pyspark.sql.functions import window, column, desc, col >>> staticDataFrame\ ...  .selectExpr( ...    "CustomerId", ...    "(UnitPrice * Quantity) as total_cost", ...    "InvoiceDate")\ ...  .groupBy( ...    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\ ...  .sum("total_cost")\ ...  .sort(desc("sum(total_cost)"))\ ...  .show(5)

 

selectExpr은 select와 기능은 같지만 추가적인 산술연산, SQL식 언어를 위해 사용하는 함수다.

select는 Dataframe의 column을 선택해서 출력한다는 차이가 있다.

로컬에서 코드실행을 하기 위해 셔플 파티션 수를 조정한다.

>>> spark.conf.set("spark.sql.shuffle.partitions", "5")

스트리밍 코드 살펴보기

read()대신 readStream()을 사용하는 것이 가장 큰 차이점이다.

maxFilesPerTrigger 옵션을 추가하는데, 이 옵션은 한 번에 읽을 파일 수를 설정한다.

>>> streamingDataFrame = spark.readStream\ ...	.schema(staticSchema)\ ...	.option("maxFilesPerTrigger", 1)\ ...	.format("csv")\ ...	.option("header", "true")\ ...	.load("spark_ex/data/retail-data/by-day/*.csv")  >>> streamingDataFrame.isStreaming True

DataFrame이 스트리밍 타입이 되었다.

앞서 작성했던 로직을 적용하는 코드다.

>>> purchaseByCustomerPerHour = streamingDataFrame\ ...  .selectExpr( ...    "CustomerId", ...    "(UnitPrice * Quantity) as total_cost", ...    "InvoiceDate")\ ...  .groupBy( ...    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\ ...  .sum("total_cost")

데이터 집계하는 절차가 지연 연산이므로 data flow를 실행하기 위해 스트리밍 액션을 호출해야 한다.

스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 count()와 같은 일반적인 정적 액션과는 다른 특성을 갖는다. 여기서 사용할 액션은 trigger가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장한다.

파일마다 트리거를 실행한다.

>>> purchaseByCustomerPerHour.writeStream\ ...	.format("memory")\ #인메모리 테이블에 저장 ...	.queryName("customer_purchases")\ # 인메모리에 저장될 테이블명 ...	.outputMode("complete")\ # 모든 카운트 수행 결과를 테이블에 저장 ...	.start()

스트림이 시작되는 것을 볼 수 있다.

>>> spark.sql(""" ...  SELECT * ...  FROM customer_purchases ...  ORDER BY `sum(total_cost)` DESC ...  """)\ ...  .show(5)

더 많은 데이터를 읽을수록 테이블 구성이 바뀐다.

각 파일에 있는 데이터에 따라 결과가 변경되거나 아닐 수 있다.

고객을 그룹화하기 때문에 시간이 지남에 따라 일정 기간 최고로 많이 구매한 고객의 구매 금액이 증가할 것으로 기대할 수 있다.

>>> purchaseByCustomerPerHour.writeStream\ ...    .format("console")\ # 콘솔에 결과 출력 ...    .queryName("customer_purchases_2")\ ...    .outputMode("complete")\ ...    .start()

메모리나 콘솔 출력 방식과 파일별 트리거 수행하는 방식을 운영환경에서 사용하는 것은 좋지 않다. 그러나 구조적 스트리밍의 강력함을 느끼기엔 충분하다.

 


3.4 머신러닝과 고급 분석

스파크에 내장된 머신러닝 알고리즘 라이브러리 MLlib을 사용해 대규모 머신러닝 수행이 가능하다.

MLlib을 사용해 할 수 있는 것들

  • preprocessing
  • munging (=data wrangling)
  • model training
  • prediction

 

MLlib에서 제공하는 예측모델 task

  • classification
  • regression
  • clustering
  • deep learning

 

이 장에서 k-means를 사용해 clustering을 수행한다.

k-means : 전체 데이터셋에서 k개의 중심(centroid)을 계산해서 센트로이드에서 가까운 점들의 군집에 레이블을 지정하고, 새롭게 센트로이드를 계산한다. 이 과정을 반복해 센트로이드의 변경이 없는, 즉 수렴할 때까지 반복한다.

먼저, MLlib의 머신러닝 알고리즘을 사용하기 위해서는 수치형 데이터가 필요하다.

예제) step 1. preprocessiong

예제 데이터는 타임스탬프, 정수, 문자열 등 다양한 데이터 타입으로 이루어져 있어서 수치형 데이터로 변환해야한다.

>>> staticDataFrame.printSchema()
>>> from pyspark.sql.functions import date_format, col  >>> preppedDataFrame = staticDataFrame.na.fill(0)\ # 결측치 0으로 채우기 ...	.withColumn("day_of_week", date_format(col("InvoiceDate", "EEEE"))\ # 날짜 데이터 ...	.coalesce(5)

withColumn() 한 컬럼의 값을 다른 값으로 변경할 때 사용한다.

InvoiceDate 컬럼의 데이터 포맷을 "EEEE"로하여 그 값들을 day_of_week라는 새로운 컬럼을 생성한다.

트랜스포메이션한 데이터를 train set과 test set으로 나눠야한다. 특정 구매가 이뤄진 날짜를 기준으로 분리한다.

이런 방법말고, MLlib의 트랜스포메이션 API인 TrainValidationSplit 또는 CrossValidator를 사용해서 데이터셋을 생성할 수 있다.

>>> trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'") >>> testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")
>>> trainDataFrame.count() 245903 >>> testDataFrame.count() 296006

대략 절반으로 나눠진 것을 알 수 있다.

MLlib은 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션을 제공한다.

day_of_week(요일)를 day_of_week_index(수치형)로 변환한다.

월 : 1, 화 : 2, 수 : 3, 목 : 4, 금 : 5, 토 : 6, 일 : 7

>>> from pyspark.ml.feature import StringIndexer  >>> indexer = StringIndexer()\ ...	.setInputCol("day_of_week")\ ...	.setOutputCol("day_of_week_index")

위 방법이 잘못된 것은 수치로 표현된 것이기 때문에 암묵적으로 토요일, 일요일이 월요일보다 크다고 의미한다.

문제점 보완을 위해 OneHotEncoder를 사용한다. 원핫인코딩 방식을 사용하게 되면 특정 요일이 해당 요일인지 아닌지를 나타낼 수 있다.

>>> from pyspark.ml.feature import OneHotEncoder  >>> encoder = OneHotEncoder()\ ...	.setInputCol("day_of_week_index")\ ...	.setOutputCol("day_of_week_encoded")

스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용한다.

>>> from pyspark.ml.feature import VectorAssembler  >>> vectorAssembler = VectorAssembler()\ ...	.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\ ...	.setOutputCol("features")

UnitPrice, Quantity, day_of_week_encoded 컬럼을 핵심 feature로 구성한다.

예제) step 2. Pipeline 구성

파이프 라인을 사용해서 나중에 input 데이터가 들어오면 같은 과정을 거쳐서 트랜스포메이션되도록 한다.

>>> from pyspark.ml import Pipeline  >>> transformationPipeline = Pipeline()\ ...	.setStages([indexer, encoder, vectorAssembler])

학습 준비 과정은 두 가지 단계로 구성된다.

  1. transformer를 데이터셋에 적합시켜야한다.
  1. StringIndexer는 인덱싱할 고윳값의 수를 알아야한다.
    • 고윳값의 수를 알고 있다면 인코딩이 쉬워짐
    • 모른다면 컬럼에 있는 모든 고윳값을 조사하고 인덱싱해야 함.
>>> fittedPipeline = transformationPipeline.fit(trainDataFrame)

transformer에 적합시키고 나면 fitted pipeline이 준비된다.

이것을 사용해서 일관되고 반복적인 방식으로 모든 데이터를 변환할 수 있다.

>>> transformedTraining = fittedPipeline.transform(trainDataFrame)

파이프라인 생성이 끝났다.

데이터 캐싱은 파이프라인 구성 과정에서 제외시켰다. 캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하기 떄문에 전체 파이프라인을 재실행하는 것보다 훨씬 빠르게 데이터셋에 반복적인 접근이 가능하다.

예제) step 3. Training

>>> from pyspark.ml.clustering import KMeans  >>> kmeans = KMeans().setK(20).setSeed(1L)

스파크에서 머신러닝 모델 학습시키는 과정

  1. 아직 학습되지 않은 모델 초기화
  1. 모델 학습

제공 알고리즘 모델에 대한 명명 규칙이 존재한다.

  • 학습 전 : [Algorithm이름]
  • 학습 후 : [Algorithm이름]Model

ex) 학습 전 : KMeans, 학습 후 : KMeansModel

MLlib의 DataFrame API에서 제공하는 estimator는 앞서 사용한 전처리 변환자와 거의 동일한 인터페이스를 갖는다.

이 인터페이스를 사용해 전체 파이프라인의 학습 과정을 단순화할 수 있다.

>>> kmModel = kmeans.fit(transformedTraining) >>> transformedTest = fittedPipeline.transform(testDataFrame)

구체적으로 평가지표를 구하거나 모델 개선에 대한 부분은 후에 더 자세히 볼 것이다.

 

3.5 저수준 API

스파크는 RDD를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 저수준 API를 제공한다.

스파크의 거의 모든 기능은 RDD를 기반으로 만들어졌다.

RDD을 이용해 파티션과 같은 물리적 실행 특성을 결정할 수 있으므로 DataFrame보다 더 세밀한 제어를 할 수 있다.

드라이버 시스템의 메모리에 저장된 원시 데이터를 병렬처리하는 데 RDD를 사용할 수 있다.

예제) 숫자를 이용해 병렬화해 RDD를 생성하기

from pyspark.sql import Row  spark.sparkContext.parallelize([Row(1), Row(2), Row(3)].toDF()

RDD는 파이썬에서도 사용가능하지만 스칼라와 동일한 RDD는 아니다.

세부 구현 방식에서 차이가 있는데 이것은 4장에서 자세히 알아본다.

낮은 버전의 스파크 코드를 계속 사용해야하는 것이 아니라면 RDD를 사용해서 스파크 코드를 작성할 필요가 없다.

최신 버전의 스파크에서는 기본적으로 RDD를 사용하지 않지만 비정형 데이터나 정제되지 않은 원시 데이터를 처리해야 한다면 RDD를 사용해야 한다.

3.6 SparkR

SparkR은 말그대로 스파크를 R로 사용하기 위한 것이다. 파이썬 대신 R 구문을 사용한다는 점외엔 pyspark와 차이가 없다.

R은 내가 거의 사용하지 않으니.. 패스한다.

 

3.7 스파크의 에코시스템과 패키지

스파크는 다양한 프로젝트와 패키지를 깃허브 같은 곳에서 쉽게 찾아볼 수 있다.

spark-packages.org 에서 누구나 자신이 개발한 패키지를 공개할 수 있다.