./bin/spark-submit\ --master local ./examples/src/main/python/pi.py 10
pi.py는 파이를 특정 자릿수까지 계산하는 예제이다.
spark-submit 옵션 중 master의 인수를 변경하면 스파크가 지원하는 클러스터 매니저에서 애플리케이션 실행이 가능해진다.
3.2 Dataset : type-safe를 제공하는 구조적 API
첫 번째로 설명할 API는 Dataset 이다.
Dataset은 JAVA, Scala의 statically typed code를 지원하기 위해 만들어진 구조적 API다.
정적 타입 코드 : 컴파일 시에 자료형을 결정함 동적 타입 코드 : 실행 시 자료형 결정
Dataset은 type-safe를 지원하며 동적 타입 언어인 Python과 R에서는 사용할 수 없다.
DataFrame은 여러 타입의 테이블형 데이터를 보관할 수 있는 Row 타입의 객체로 구성되어 있는 분산 컬렉션이다.
DataFrame의 레코드를 사용자가 자바나 스칼라로 정의한 클래스에 할당하고 자바의 ArrayList 또는 스칼라의 Seq 객체 등 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공한다.
Dataset 클래스
JAVA에서는 Dataset<T>, Scala에서는 Dataset[T]로 표시
내부 객체의 데이터 타입을 매개변수로 사용하는데, 예를 들어 Dataset<Person>이면 Person 클래스의 인스턴스만 가질 수 있다.
스파크가 이런 타입을 제한적으로 사용할 수 밖에 없는 이유는 자동으로 type T를 분석해서 Dataset의 테이블 형식 데이터에 적합한 스키마를 생성해야 하기 때문이다.
Dataset은 필요한 경우에 선택적으로 사용할 수 있다는 장점이 있다.
데이터 타입 정의하기
case class Flight(DEST_COUNTRY_NAME : String, ORIGIN_COUNTRY_NAME : String, count : BigInt) val flightsDF = spark.read.parquet("spark_ex/data/flight-data/parquet/2010-summary.parquet/") val flights = flightsDF.as[Flight]
두 번째 줄 명령을 실행하여 반환된 결과가 DataFrame을 확인할 수 있다.
이처럼 스파크는 결과를 자동으로 DataFrame으로 변환해서 반환한다.
collect(), take()를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환한다. 즉 코드 변경없이 type-safe를 보장할 수 있다.
데이터 집계하는 절차가 지연 연산이므로 data flow를 실행하기 위해 스트리밍 액션을 호출해야 한다.
스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 count()와 같은 일반적인 정적 액션과는 다른 특성을 갖는다. 여기서 사용할 액션은 trigger가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장한다.
파일마다 트리거를 실행한다.
>>> purchaseByCustomerPerHour.writeStream\ ... .format("memory")\ #인메모리 테이블에 저장 ... .queryName("customer_purchases")\ # 인메모리에 저장될 테이블명 ... .outputMode("complete")\ # 모든 카운트 수행 결과를 테이블에 저장 ... .start()
스트림이 시작되는 것을 볼 수 있다.
>>> spark.sql(""" ... SELECT * ... FROM customer_purchases ... ORDER BY `sum(total_cost)` DESC ... """)\ ... .show(5)
더 많은 데이터를 읽을수록 테이블 구성이 바뀐다.
각 파일에 있는 데이터에 따라 결과가 변경되거나 아닐 수 있다.
고객을 그룹화하기 때문에 시간이 지남에 따라 일정 기간 최고로 많이 구매한 고객의 구매 금액이 증가할 것으로 기대할 수 있다.
>>> purchaseByCustomerPerHour.writeStream\ ... .format("console")\ # 콘솔에 결과 출력 ... .queryName("customer_purchases_2")\ ... .outputMode("complete")\ ... .start()
메모리나 콘솔 출력 방식과 파일별 트리거 수행하는 방식을 운영환경에서 사용하는 것은 좋지 않다. 그러나 구조적 스트리밍의 강력함을 느끼기엔 충분하다.
3.4 머신러닝과 고급 분석
스파크에 내장된 머신러닝 알고리즘 라이브러리 MLlib을 사용해 대규모 머신러닝 수행이 가능하다.
MLlib을 사용해 할 수 있는 것들
preprocessing
munging (=data wrangling)
model training
prediction
MLlib에서 제공하는 예측모델 task
classification
regression
clustering
deep learning
이 장에서 k-means를 사용해 clustering을 수행한다.
k-means : 전체 데이터셋에서 k개의 중심(centroid)을 계산해서 센트로이드에서 가까운 점들의 군집에 레이블을 지정하고, 새롭게 센트로이드를 계산한다. 이 과정을 반복해 센트로이드의 변경이 없는, 즉 수렴할 때까지 반복한다.
먼저, MLlib의 머신러닝 알고리즘을 사용하기 위해서는 수치형 데이터가 필요하다.
예제) step 1. preprocessiong
예제 데이터는 타임스탬프, 정수, 문자열 등 다양한 데이터 타입으로 이루어져 있어서 수치형 데이터로 변환해야한다.
>>> staticDataFrame.printSchema()
>>> from pyspark.sql.functions import date_format, col >>> preppedDataFrame = staticDataFrame.na.fill(0)\ # 결측치 0으로 채우기 ... .withColumn("day_of_week", date_format(col("InvoiceDate", "EEEE"))\ # 날짜 데이터 ... .coalesce(5)
withColumn() 한 컬럼의 값을 다른 값으로 변경할 때 사용한다.
InvoiceDate 컬럼의 데이터 포맷을 "EEEE"로하여 그 값들을 day_of_week라는 새로운 컬럼을 생성한다.
트랜스포메이션한 데이터를 train set과 test set으로 나눠야한다. 특정 구매가 이뤄진 날짜를 기준으로 분리한다.
이런 방법말고, MLlib의 트랜스포메이션 API인 TrainValidationSplit 또는 CrossValidator를 사용해서 데이터셋을 생성할 수 있다.
Uploaded by Notion2Tistory v1.1.0