앎을 경계하기

Machine Learning/Spark

SPARK 완벽 가이드 - ch.5 구조적 API 기본 연산

양갱맨 2021. 5. 17. 11:39

구조적 API 기본 연산

 

4장 내용 - 구조적 API의 핵심 추상화 개념

5장 내용 - DataFrame과 DataFrame의 데이터를 다루는 기능 소개

 

<DataFrame의 구성>

  • Row 타입의 records (= 테이블의 row)
  • 각 레코드에 수행할 연산 표현식을 나타내는 Columns (=스프레드시트의 column)
  • 각 컬럼명과 데이터 타입을 정의하는 schema
  • dataframe이나 dataset이 클러스터에서 물리적으로 배치되는 형태를 Partitioning 이라고 함
  • Partitioning schema는 파티션을 배치하는 방법을 정의
    • 파티셔닝의 분할 기준은 특정 컬럼 또는 비결정론적(매번 변하는)값을 기반으로 설정

 

ex) json 파일 데이터를 dataframe 타입으로 로드하여 스키마 확인하기

>>> df = spark.read.format("json").load("./spark_ex/data/flight-data/json/2015-summary.json") >>> df.printSchema()  root  |-- DEST_COUNTRY_NAME: string (nullable = true)  |-- ORIGIN_COUNTRY_NAME: string (nullable = true)  |-- count: long (nullable = true)

도착 국가 이름과 출발지 국가 이름은 string으로 되어있고 null이 가능함 비행 수는 long타입으로 null 가능함.

5.1 스키마

스키마는 데이터 소스에서 얻거나 직접 정의 가능하다.

csv나 json등 파일로부터 스키마를 읽게되면 Long데이터가 Integer로 잘못 인식되는 정밀도 문제가 발생할 수 있다는 점에 주의해야함.

>>> spark.read.format("json").load("./spark_ex/data/flight-data/json/2015-summary.json").schema  StructType( 	List(StructField(DEST_COUNTRY_NAME,StringType,true), 			StructField(ORIGIN_COUNTRY_NAME,StringType,true), 			StructField(count,LongType,true)))

스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체다.

StructField는 이름, 데이터 타입, 컬럼이 값이 없거나 null일 수 있는지 지정하는 불리언(True, False) 값을 갖는다.

필요하면 컬럼과 관련된 메타데이터 지정도 가능하다.

ex) DataFrame에 스키마 생성 및 적용 예제

>>> myManualSchema = StructType([ ...     StructField("DEST_COUNTRY_NAME", StringType(), True), ...     StructField("ORIGIN_COUNTRY_NAME", StringType(), True), ...     StructField("count", LongType(), False, metadata={"hello":"world"})]) >>> df = spark.read.format("json").schema(myManualSchema).load("./spark_ex/data/flight-data/json/2015-summary.json")

5.2 컬럼과 표현식

스파크 컬럼은 Pandas의 DataFrame의 컬럼과 유사하다.

사용자는 표현식을 사용해서 컬럼을 select, control, remove할 수 있다.

컬럼 내용을 수정하려면 반드시 DataFrame의 spark transformation을 사용해야 한다.

5.2.1 컬럼

컬럼은 col함수를 사용해서 만드는 것이 가장 간단한 방법이다.

>>> from pyspark.sql.functions import col >>> col("someColumnName")  Column<'someColumnName'>

컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태로 남는다.

위 글처럼 분석기가 동작할 때 컬럼과 테이블을 분석한다.

<명시적 컬럼 참조>

DataFrame의 컬럼은 col메서드로 참조한다. join할 때 주로 많이 사용한다.

col()을 사용해 명시적으로 컬럼을 정의하면 스파크는 분석기 실행 단계에서 컬럼 확인 절차를 생략함

5.2.2 표현식

표현식은 DataFrame 레코드의 여러 값에 대한 transformation 집합을 의미함

여러 컬럼명을 입력으로 하여 식별하고 '단일 값'을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수라고 생각할 수 있다.

단일 값은 Map이나 Array 같은 복합 데이터 타입일 수 있다.

expr()을 사용해서 DataFrame의 컬럼을 참조할 수 있다.

<표현식으로 컬럼 표현하기>

컬럼은 표현식의 일부 기능을 제공한다.

expr("someCol - 5")col("someCol")-5expr("someCol")-5는 모두 같은 트랜스포메이션 과정을 거친다. 왜냐하면 스파크가 연산 순서를 논리적 트리로 컴파일하기 때문이다.

ex)논리적 트리

DAG(Directed Acyclic Graph)로 표현되기 때문에 다음처럼 표현이 가능하다.

아래 첫번째 코드와 두번째 코드는 같은 트랜스포메이션 과정을 거친다.

#첫번째 코드 (((col("someCol")+5)*200)-6)<col("otherCol")  #두번째 코드 from pyspark.sql.functions import expr expr("(((someCol + 5) * 200) - 6) < otherCol")

SQL SELECT 구문에 이전 표현식을 사용해도 잘 동작하며 동일한 결과를 생성한다.

왜냐하면 SQL 표현식이나 위 예제같은 DataFrame 코드는 실행 시점에 동일한 논리 트리로 컴파일 되기 때문이다.

<DataFrame 컬럼에 접근하기>

printSchema()로 전체 컬럼 정보를 확인할 수 있었다.

컬럼에 접근할 때는 columns를 사용한다.

>>> df.columns  ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

5.3 Record와 Row

스파크에서 DataFrame의 각 row는 하나의 record이다.

Row 객체는 내부에 바이트 배열을 갖는다. 이 배열 인터페이스는 컬럼 표현식으로만 다루기 때문에 사용자에게 노출되지 않는다.

first()를 사용하여 첫번째 row를 확인한다.

>>> df.first()  Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

5.3.1 로우 생성하기

Row 객체는 스키마 정보를 가지고 있지 않다. 따라서 Row 객체를 만들려면 DataFrame의 스키마와 같은 순서대로 값을 명시해야한다.

>>> from pyspark.sql import Row >>> myRow = Row("Hello", None, 1, False) >>> myRow[0] 'Hello' >>> myRow[2] 1

5.4 DataFrame의 트랜스포메이션

트랜스포메이션으로 할 수 있는 작업

  • 로우나 컬럼 추가
  • 로우나 컬럼 제거
  • 로우 ↔ 컬럼 변환
  • 컬럼값 기준으로 로우 순서 변경

5.4.1 DataFrame 생성하기

>>> df = spark.read.format("json").load("./spark_ex/data/flight-data/json/2015-summary.json") >>> df.createOrReplaceTempView("dfTable") >>> df DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]  >>> from pyspark.sql import Row >>> from pyspark.sql.types import StructField, StructType, StringType, LongType  >>> myManualSchema = StructType([ ...     StructField("some", StringType(), True), ...     StructField("col", StringType(), True), ...     StructField("names", LongType(), False) ... ]) >>> myRow = Row("Hello", None, 1) #some, col, names 순서 맞춰서 로우 입력 >>> myDf = spark.createDataFrame([myRow], myManualSchema) >>> myDf.show()  +-----+----+-----+ | some| col|names| +-----+----+-----+ |Hello|null|    1| +-----+----+-----+

DataFrame을 만들었으면 이제 유용하게 사용하는 메소드를 알아보자.

  • select
  • selectExpr
  • org.apache.spark.sql.functions에 포함된 함수

5.4.2 select와 selectExpr

select(), selectExpr()을 사용해서 SQL 실행하듯 DataFrame에서도 SQL문을 적용할 수 있다.

SELECT * From dataFrameTable SELECT columnName FROM dataFrameTable SELECT columnName * 10, otherColumn, someOtherCol as c From dataFrameTable

 

ex)

>>> df.select("DEST_COUNTRY_NAME").show(2) +-----------------+ |DEST_COUNTRY_NAME| +-----------------+ |    United States| |    United States| +-----------------+
>>> df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2) +-----------------+-------------------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| +-----------------+-------------------+ |    United States|            Romania| |    United States|            Croatia| +-----------------+-------------------+
>>> df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2) +-------------+ |  destination| +-------------+ |United States| |United States| +-------------+ only showing top 2 rows
>>> df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME")).show(2) +-----------------+ |DEST_COUNTRY_NAME| +-----------------+ |    United States| |    United States| +-----------------+ only showing top 2 rows

SQL문의 AS는 pyspark에서 alias()와 같다.

select와 expr이 합쳐진 selectExpr()을 많이 사용한다.

아래는 DEST_COUNTRY_NAME 컬럼을 newColumnName으로 바꾼 컬럼과 DEST_COUNTRY_NAME 컬럼의 로우 2개를 보여준다.

>>> df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2) +-------------+-----------------+ |newColumnName|DEST_COUNTRY_NAME| +-------------+-----------------+ |United States|    United States| |United States|    United States| +-------------+-----------------+ only showing top 2 rows

ex) 출발지와 도착지가 같은지 나타내는 컬럼 만들기

모든 컬럼과 withinCountry라는 컬럼을 추가하여 2개의 로우를 보여준다.

첫 번째 로우는 출발지(Romania)와 도착지(US)가 다르기 때문에 당연히 withinCountry가 false다.

>>> df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2) +-----------------+-------------------+-----+-------------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry| +-----------------+-------------------+-----+-------------+ |    United States|            Romania|   15|        false| |    United States|            Croatia|    1|        false| +-----------------+-------------------+-----+-------------+ only showing top 2 rows

ex) 집계함수 사용하기

첫 번째 컬럼인 avg(count)는 count 컬럼내 로우들의 평균이고, count(distinct(DEST_COUNTRY_NAME))은 중복을 제거하고 도착지 국가들의 개수를 나타낸다.

>>> df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2) +-----------+---------------------------------+ | avg(count)|count(DISTINCT DEST_COUNTRY_NAME)| +-----------+---------------------------------+ |1770.765625|                              132| +-----------+---------------------------------+

5.4.3 스파크 데이터 타입으로 변환하기

스파크를 사용하면서 명시적인 값을 스파크에 전달해야하는 경우가 있는데, 명시적인 값을 전달할 때는 literal을 사용한다.

리터럴을 사용해서 스파크가 이해할 수 있는 값으로 변환할 수 있다.

>>> from pyspark.sql.functions import lit  >>> df.select(expr("*"), lit(1).alias("One")).show(2) +-----------------+-------------------+-----+---+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One| +-----------------+-------------------+-----+---+ |    United States|            Romania|   15|  1| |    United States|            Croatia|    1|  1| +-----------------+-------------------+-----+---+

5.4.4 컬럼 추가하기

withColumn()을 사용해서 컬럼을 쉽게 추가할 수 있다.

>>> df.withColumn("numberOne", lit(1)).show(2) +-----------------+-------------------+-----+---------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne| +-----------------+-------------------+-----+---------+ |    United States|            Romania|   15|        1| |    United States|            Croatia|    1|        1| +-----------------+-------------------+-----+---------+ only showing top 2 rows

ex) 출발지와 도착지가 같은지에 대한 여부를 boolean 타입으로 표현하기

>>> df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2) +-----------------+-------------------+-----+-------------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry| +-----------------+-------------------+-----+-------------+ |    United States|            Romania|   15|        false| |    United States|            Croatia|    1|        false| +-----------------+-------------------+-----+-------------+ only showing top 2 rows
컬럼명을 변경할 때에도 withColumn을 사용할 수 있다.
>>> df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

5.4.5 컬럼명 변경

withColumnRenamed()를 사용해서 컬럼명을 변경할 수 있는데, 첫 번째 인수의 컬럼명을 두 번째 인수로 변경한다.

withColumn 을 쓰면 기존 컬럼이 유지된 채로 새로운 컬럼으로 생성되는 반면, withColumnRenamed()는 생성하지 않고, 해당 컬럼의 이름을 바꾼다.

>>> df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns ['dest', 'ORIGIN_COUNTRY_NAME', 'count']

5.4.6 예약 문자와 키워드

예약문자를 컬럼명에 사용하려면 백틱(`)을 사용해야한다.

# escaping 필요 없음. >>> dfWithLongColName = df.withColumn("This Long Column-Name", expr("ORIGIN_COUNTRY_NAME"))  # 표현식으로 컬럼을 참조하기때문에 escape 필요함 >>> dfWithLongColName.selectExpr( ...    "`This Long Column-Name`", ...    "`This Long Column-Name` as `new col`").show(2)  # 표현식 대신 문자열을 사용해 컬럼 참조하려면 리터럴로 해석되기 때문에 예약 문자가 포함된 컬럼을 참조할 수 있다. >>> dfWithLongColName.select(expr("`This Long Column-Name`")).columns

5.4.7 대소문자 구분

기본적으로 대소문자 구분은 하지 않지만 set spark.sql.caseSensitive true 설정 명령어를 통해 대소문자 구분을 할 수 있다.

5.4.8 컬럼 제거

drop으로 컬럼을 제거할 수 있다.

# 컬럼 한 개 제거 >>> df.drop("ORIGIN_COUNTRY_NAME").columns ['DEST_COUNTRY_NAME', 'count']  # 컬럼 여러 개 제거 >>> dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME") DataFrame[count: bigint, This Long Column-Name: string]

5.4.9 컬럼 데이터 타입 변경

문자열 타입의 컬럼을 정수형으로 바꾸는 경우 등 특정 컬럼의 데이터 타입을 변환해야하는 상황이 발생한다.

>>> df.withColumn("count2", col("count").cast("string")) DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]

5.4.10 로우 필터링

로우를 필터링하려면 참과 거짓을 판별하는 표현식을 만들어야 한다. 가장 일반적인 방법은 문자열 표현식이나 컬럼 다루는 기능을 이용해서 표현식을 만드는 것이다.

DataFrame의 where, filter 메서드로 필터링할 수 있다.

>>> df.where("count < 2").show(2) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |    United States|            Croatia|    1| |    United States|          Singapore|    1| +-----------------+-------------------+-----+ only showing top 2 rows  >>> df.filter(col("count") < 2).show(2) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |    United States|            Croatia|    1| |    United States|          Singapore|    1| +-----------------+-------------------+-----+ only showing top 2 rows

스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행하기 때문에 여러 필터를 연결하기 위해 AND 필터를 사용할 수 있지만 판단은 스파크에게 맡겨야 한다.

>>> df.where(col("count")<2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |    United States|          Singapore|    1| |          Moldova|      United States|    1| +-----------------+-------------------+-----+ only showing top 2 rows

5.4.11 고유한 row 얻기

DataFrame에서 고윳값이나 중복되지 않은 값을 얻는 연산을 자주 사용한다. 고윳값을 얻으려면 하나 이상의 컬럼을 사용해야한다. DataFrame에서 중복데이터를 제거하기 위해 distinct를 사용해 고윳값을 찾을 수 있다.

>>> df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count() 256  >>> df.select("ORIGIN_COUNTRY_NAME").distinct().count() 125

5.4.12 random sample 만들기

sample메서드를 사용해서 무작위로 샘플 데이터를 추출할 수 있다.

복원, 비복원 추출 설정도 가능함.

seed = 5 withReplacement = False fraction = 0.5 df.sample(withReplacement, fraction, seed).count()

5.4.13 random split

원본 DataFrame을 임의로 분할 시 유용하게 사용된다. 보통 머신러닝에서 train, validation, test set을 나눌 때 사용한다.

>>> dataFrames[0].count() > dataFrames[1].count() False

5.4.14 로우 합치기, 추가하기

DataFrame은 불변성을 갖는다. 그래서 DataFrame에 새로운 레코드를 추가하는 것은 불가능하다. 그래서 DataFrame에 레코드를 추가하기 위해서는 새로운 DataFrame과 union하는 작업을 통해 레코드를 추가할 수 있다. 통합하는 두 개의 DataFrame은 반드시 동일한 스키마, 컬럼 수를 가져야 한다.

>>> from pyspark.sql import Row >>> schema = df.schema >>> schema StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))  >>> newRows = [Row("New Country", "Other Country", 5), ... Row("New Country 2", "Other Country 3", 1)] >>> parallelizedRows = spark.sparkContext.parallelize(newRows) >>> newDF = spark.createDataFrame(parallelizedRows, schema) >>> newDF.show() +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |      New Country|      Other Country|    5| |    New Country 2|    Other Country 3|    1| +-----------------+-------------------+-----+  >>> df.union(newDF) ... .where("count=1") ... .where(col("ORIGIN_COUNTRY_NAME")!="United States") ... .show() +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |    United States|            Croatia|    1| |    United States|          Singapore|    1| |    United States|          Gibraltar|    1| |    United States|             Cyprus|    1| |    United States|            Estonia|    1| |    United States|          Lithuania|    1| |    United States|           Bulgaria|    1| |    United States|            Georgia|    1| |    United States|            Bahrain|    1| |    United States|   Papua New Guinea|    1| |    United States|         Montenegro|    1| |    United States|            Namibia|    1| |    New Country 2|    Other Country 3|    1| +-----------------+-------------------+-----+

5.4.15 sorting

sortorderBy를 사용해 DataFrame의 최댓값 또는 최솟값이 상단에 위치하도록 정렬할 수 있다. 두 메서드는 완전히 동일하다.

>>> df.sort("count").show(5) +--------------------+-------------------+-----+ |   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +--------------------+-------------------+-----+ |               Malta|      United States|    1| |Saint Vincent and...|      United States|    1| |       United States|            Croatia|    1| |       United States|          Gibraltar|    1| |       United States|          Singapore|    1| +--------------------+-------------------+-----+ only showing top 5 rows  >>> df.orderBy("count", "DEST_COUNTRY_NAME").show(5) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |     Burkina Faso|      United States|    1| |    Cote d'Ivoire|      United States|    1| |           Cyprus|      United States|    1| |         Djibouti|      United States|    1| |        Indonesia|      United States|    1| +-----------------+-------------------+-----+ only showing top 5 rows  >>> df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |     Burkina Faso|      United States|    1| |    Cote d'Ivoire|      United States|    1| |           Cyprus|      United States|    1| |         Djibouti|      United States|    1| |        Indonesia|      United States|    1| +-----------------+-------------------+-----+ only showing top 5 rows

정렬 기준을 명확히 할 땐 desc , asc를 사용한다.

>>> df.orderBy(expr("count desc")).show(2) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |          Moldova|      United States|    1| |    United States|            Croatia|    1| +-----------------+-------------------+-----+ only showing top 2 rows  # count 컬럼은 내림차순, DEST_COUNTRY_NAME 컬럼은 오름차순 >>> df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2) +-----------------+-------------------+------+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count| +-----------------+-------------------+------+ |    United States|      United States|370002| |    United States|             Canada|  8483| +-----------------+-------------------+------+ only showing top 2 rows

파티션 별 정렬 수행도 가능하다.

파티션 별 정렬은 트랜스포메이션 전 성능 최적화를 위해 사용한다.

>>> spark.read.format("json").load("./spark_ex/data/flight-data/json/*-summary.json").sortWithinPartitions("count") DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

5.4.16 Row 수 제한

limit을 사용해서 추출할 로우 수를 제한할 수 있다.

>>> df.limit(5).show() +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |    United States|            Romania|   15| |    United States|            Croatia|    1| |    United States|            Ireland|  344| |            Egypt|      United States|   15| |    United States|              India|   62| +-----------------+-------------------+-----+  >>> df.orderBy(expr("count desc")).limit(6).show() +--------------------+-------------------+-----+ |   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +--------------------+-------------------+-----+ |               Malta|      United States|    1| |Saint Vincent and...|      United States|    1| |       United States|            Croatia|    1| |       United States|          Gibraltar|    1| |       United States|          Singapore|    1| |             Moldova|      United States|    1| +--------------------+-------------------+-----+

5.4.17 repartition과 coalesce

또 다른 최적화 기법으로 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 것이다.

repartition은 무조건 전체 데이터를 셔플한다. 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 한다.

>>> df.rdd.getNumPartitions() 1 >>> df.repartition(5) DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint] >>> df.repartition(col("DEST_COUNTRY_NAME")) DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

coalesce는 셔플없이 파티션을 병합하는 경우에 사용한다.

>>> df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2) DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

5.4.18 드라이버로 로우 데이터 수집

스파크는 드라이버에서 클러스터 상태 정보를 유지한다. 로컬 환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야 한다.

collect, take, show등 몇가지 드라이버로 데이터 수집 연산을 확인한 적이 있다.

>>> collectDF = df.limit(10) # 10개 row만 가져옴  >>> collectDF.take(5) [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]  >>> collectDF.show() +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |    United States|            Romania|   15| |    United States|            Croatia|    1| |    United States|            Ireland|  344| |            Egypt|      United States|   15| |    United States|              India|   62| |    United States|          Singapore|    1| |    United States|            Grenada|   62| |       Costa Rica|      United States|  588| |          Senegal|      United States|   40| |          Moldova|      United States|    1| +-----------------+-------------------+-----+  >>> collectDF.show(5, False) +-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |United States    |Romania            |15   | |United States    |Croatia            |1    | |United States    |Ireland            |344  | |Egypt            |United States      |15   | |United States    |India              |62   | +-----------------+-------------------+-----+ only showing top 5 rows  >>> collectDF.collect() [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62), Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588), Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40), Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

전체 데이터셋에 대해 반복적인 처리를 하기 위해서 toLocalIterator를 사용한다.

>>> for i in collectDF.toLocalIterator(): ...     print(i)  Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15) Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1) Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344) Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15) Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62) Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1) Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62) Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588) Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40) Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)