앎을 경계하기

Machine Learning/Spark

SPARK 완벽 가이드 - ch.4구조적 API

양갱맨 2021. 5. 10. 11:09

구조적 API 개요

구조적 API는 비정형 로그 파일부터 반정형 CSV, 정형 Parquet 파일까지 다양한 데이터를 처리할 수 있다. 구조적 API에는 세 가지 분산 컬렉션 API가 있다.

  • Dataset
  • DataFrame
  • SQL Table, View

구조적 API를 사용해서 배치 작업을 스트리밍 작업으로 손쉽게 변환할 수 있다.

구조적 API는 데이터 흐름을 정의하는 기본 추상화 개념이다.

이번 장에서는...

  • typed/untyped API의 개념과 차이점
  • 핵심 용어
  • 스파크가 구조적 API의 데이터 흐름을 해석하고 클러스터에서 실행하는 방식

4.1 DataFrame과 Dataset

DataFrame과 Dataset의 정의와 의미적인 차이점?

  • DataFrame과 Dataset은 잘 정의된 row와 column을 갖는 분산 테이블 형태의 컬렉션
  • 각 컬럼은 다른 컬럼과 동일한 수의 row를 가져야 한다.
  • 컬렉션의 모든 로우는 같은 데이터 타입 정보를 가지고 있어야 한다.
  • DataFrame과 Dataset은 결과를 생성하기 위해 어떤 데이터에 어떤 연산을 적용해야 하는지 정의하는 지연 연산의 실행 계획이며, 불변성을 갖는다.
  • DataFrame에 액션을 호출하면 스파크는 트랜스포메이션을 실제로 실행하고 결과를 반환한다.
Table과 view는 Dataframe과 같지만 SQL을 사용한다는 차이가 있다.

DataFrame과 Dataset을 구체적으로 정의하려면 "스키마"를 알아야한다.

스키마 : 분산 컬렉션에 저장할 데이터 타입을 정의하는 방법

4.2 스키마

스키마는 DataFrame의 컬럼명과 데이터타입을 정의한다. 위 데이터베이스에서의 스키마에서 개념 스키마와 같은 형태이다.

  • 스키마 정의 방법
    • 직접 정의
    • 데이터 소스에서 얻기 (schema-on-read)

스키마는 여러 데이터 타입으로 구성되기 때문에 어떤 데이터 타입이 어느 위치에 있는지 정의하는 방법이 필요하다.

4.3 스파크의 구조적 데이터 타입 개요

스파크는 catalyst 엔진을 사용한다. catalyst는 다양한 실행 최적화 기능을 제공한다.

주로 스파크를 사용할 때 파이썬이나 스칼라로 사용했었는데, 스파크는 사실상 프로그래밍 언어이기 때문에 자체의 덧셈 연산이나 데이터 타입을 가지고 있다.

>>> df = spark.range(500).toDF("number") >>> df.select(df["number"]+10)  DataFrame[(number + 10): bigint]  >>> df.select(df["number"]+10).show() +-------------+ |(number + 10)| +-------------+ |           10| |           11| |           12| |           13| |           14| |           15| |           16| |           17| |           18| |           19| |           20| |           21| |           22| |           23| |           24| |           25| |           26| |           27| |           28| |           29| +-------------+ only showing top 20 rows

스파크에서 덧셈이 수행되는 이유는 스파크가 지원하는 언어를 이용해 작성된 표현식을 카탈리스트 엔진에서 스파크의 데이터 타입으로 변환해 명령을 처리하기 때문이다.

4.3.1 DataFrame과 Dataset 비교

구조적 API에는 Untyped인 DataFrame과 Typed인 Dataset이 있다.

DataFrame에도 데이터 타입이 있기 때문에 untyped로 보는 것이 안 맞을 수 있지만 스키마에 명시된 데이터 타입의 일치 여부를 런타임이 될 때 확인한다.

Dataset은 스키마에 명시된 데이터 타입의 일치 여부를 컴파일 타임에 확인함

Dataset은 JVM 기반인 Scala와 JAVA에서 지원한다(Python, R 은 사용 불가).

데이터 타입을 정의하려면 Scala의 case classs 또는 JAVA의 JavaBean을 사용해야 한다.

이 책은 대부분 DataFrame을 사용하는데, Row 타입으로 구성된 Dataset이다.

Row 타입은 스파크가 사용하는 연산에 최적화된 인메모리 포맷의 내부적인 표현 방식이다.

인메모리란? 데이터 처리 속도를 향상시킬 수 있는 핵심 기술로 운영을 위한 데이터를 하드가 아닌 메인 메모리에 모두 올려서 서비스를 수행하는 것이다. 메인 메모리의 가격 하락으로 가능해졌다.

 

Row 타입을 사용하면 가비지 컬렉션과 객체 초기화 부하가 있는 JVM 데이터 타입을 사용하는것 대신 자체 데이터 포맷을 사용하기 때문에 매우 효율적인 연산이 가능해진다.

파이썬이나 R에선 Dataset 사용이 불가하기 때문에 최적화된 포맷인 DataFrame으로 처리한다.

지금 중요한 것은

DataFrame을 사용하면 스파크의 최적화된 내부 포맷을 사용할 수 있다는 것

이다.

4.3.2 컬럼

  • 컬럼의 표현
    • 단순 데이터 타입 : 정수형, 문자열 등
    • 복합 데이터 타입 : 배열, 맵 등
    • null 값

스파크의 컬럼은 테이블 컬럼으로 생각할 수 있다.

4.3.3 로우

로우는 데이터 레코드다.

DataFrame은 Row 타입으로 구성된다.

SQL, RDD, 데이터소스에서 얻거나 직접 만들 수 있다.

>>> spark.range(2).collect() [Row(id=0), Row(id=1)]

range() 메서드를 사용해 DataFrame을 생성한다.

4.3.4 스파크 데이터 타입

다양한 프로그래밍 언어의 데이터 타입이 스파크의 어떤 데이터 타입과 매핑되는지 알아보자.

  • 특정 데이터 타입의 컬럼을 초기화하고 정의하는 방법
from pyspark.sql.types import * b = ByteType()

파이썬 데이터 타입 매핑(생략된 것도 있음)

스파크 데이터 타입 파이썬 데이터 타입 데이터 타입 생성/접근용 API
ByteType int, long ByteType()
ShortType int, long (숫자는 런타임에 2byte 크기의 부호형 정수로 변환된다) Shortype()
IntegerType int, long IntegerType()
LongType long (숫자는 런타임에 8바이트 크기의 부호형 정수로 변환된다. 더 크게 사용하려면 decimal.Decimal형으로 변환해 DecimalType을 사용한다.) LongType()
FloatType float FloatType()
DoubleType float(4 byte 크기의 single-precision 부동소수점) DoubleType()
StringType string StringType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple, array ArrayType(elementType)
MapType dict MapType(keyType, valueType)
StructType list, tuple StructType(fields)
StructField 이 필드의 데이터 타입과 대응되는 파이썬 데이터 타입이다. 만약 IntegerType의 StructField는 파이썬의 int 데이터 타입을 사용한다. StructField(name, dataType)

고정형 DataFrame을 그대로 사용하는 경우는 거의 없고 대부분 처리와 변환을 수행한다. 따라서 구조적 API의 실행 과정을 알아야한다.

4.4 구조적 API의 실행 과정

구조적 API 쿼리가 사용자 코드에서 실제 실행 코드로 변환되는 과정

  1. DataFrame/Dataset/SQL을 이용해 코드를 작성
  1. 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환
  1. 스파크는 논리적 실행 계획 → 물리적 실행 계획으로 변환하며 그 과정에서 추가적인 최적화를 할 수 있는지 확인
  1. 스파크는 클러스터에서 물리적 실행 계획(RDD 처리)을 실행

 

4.4.1 논리적 실행 계획

  • 스파크는 사용자 코드를 논리적 실행 계획으로 변환한다.
  • 논리적 실행 계획 단계에서는 추상적 트랜스포메이션만 표현한다.
  • driver, executor의 정보는 고려하지 않고 사용자의 다양한 표현식을 최적화된 버전으로 변환한다.
  • 코드의 유효성이나 테이블, 컬럼의 존재 여부만 판단하는 과정이기 때문에 실행 계획을 검증하지 않은 상태다.
  • Spark analyzer는 컬럼과 테이블을 검증하기 위해 카탈로그, 모든 테이블의 저장소, DataFrame 정보를 활용한다. 만약 필요한 테이블이나 컬럼이 카탈로그에 없다면 검증 전 논리적 실행 계획이 만들어지지 않는다.
  • 검증 결과는 카탈리스트 옵티마이저로 전달된다(필요한 경우 도메인에 최적화된 규칙을 적용할 수 있는 확장형 패키지를 만들 수 있음).
    카탈리스트 옵티마이저는 조건절 푸시 다운이나 선택절 구문을 이용해 논리적 실행 계획을 최적화하는 규칙 모음이다.

4.4.2 물리적 실행 계획

  • 논리적 실행 계획을 클러스터 환경에서 실행하는 방법을 정의
  • 다양한 물리적 실행 전략을 생성하고 비용 모델로 비교하여 최적을 선택함
    • 비용 비교 > 테이블의 크기나 파티션 수 등 물리적 속성을 고려해 지정된 조인 연산 수행에 필요한 비용을 계산하고 비교
  • 물리적 실행 계획은 일련의 RDD와 트랜스포메이션으로 변환됨
  • 스파크를 컴파일러라고 부르기도 함
    • DataFrame, Dataset, SQL로 정의된 쿼리 → RDD Transformation으로 컴파일

4.4.3 실행

  • 물리적 실행 계획을 선정하고 저수준 프로그래밍 인터페이스인 RDD를 대상으로 모든 코드를 실행
  • 스파크는 런타임에 전체 TASK나 Stage를 제거할 수 있는 자바 바이트 코드를 생성해 추가적인 최적화를 수행

4.5 정리

스파크의 구조적 API에 대해 알아보았다.

<구조적 API에 있는 분산 컬렉션 세가지>

  • Dataset - JVM 기반 JAVA, Scala에서만 사용 가능
  • DataFrame - Python과 R에서는 DataFrame을 사용하는데 DataFrame은 최적화된 스파크 내부 포맷을 사용한다.
  • SQL Table, View

 

<구조적 API 실행과정>

  1. DataFrame/Dataset/SQL을 이용해 코드를 작성
  1. 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환
  1. 스파크는 논리적 실행 계획 → 물리적 실행 계획으로 변환하며 그 과정에서 추가적인 최적화를 할 수 있는지 확인
  1. 스파크는 클러스터에서 물리적 실행 계획(RDD 처리)을 실행