앎을 경계하기

Machine Learning/Spark

[SPARK 완벽 가이드] Chapter 1 아파치 스파크란? / Chapter 2 스파크 간단히 살펴보기

양갱맨 2021. 4. 28. 16:39

Chapter 1. 아파치 스파크란?

  • 통합 컴퓨팅 엔진
  • 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합
  • 병렬 처리 오픈소스 엔진
  • Python, JAVA, Scala, R 지원 및 SQL, Streaming, ML 등 다양한 라이브러리 제공

1.1 아파치 스파크의 철학

  1. 통합

    스파크의 목표는 '빅데이터 애플리케이션 개발에 필요한 통합 플랫폼을 제공하자'이다.

    통합(unified)은 무슨 의미일까?

    스파크는 간단한 데이터 읽기부터 SQL, 머신러닝, 스트림 처리까지 다양한 데이터 분석 작업을 일관성 있는 API로 수행할 수 있도록 설계되어 있다.

    스파크의 통합 특성을 이용하면 기존의 데이터 분석 작업을 더 쉽고 효율적으로 수행할 수 있다.

    스파크에서 제공하는 조합형 API를 사용해 애플리케이션을 만들 수 있고, 만약 원하는 API가 없다면 직접 스파크 기반의 라이브러리를 만들 수 있다.

    ex) SQL 쿼리로 데이터를 읽고 ML 라이브러리로 머신러닝 모델을 평가해야할 경우, 스파크로 이 두 단계를 하나로 병합하고 데이터를 한 번만 조회할 수 있게 한다.

  1. 컴퓨팅 엔진

    통합이라는 관점을 중시하면서 기능의 범위를 컴퓨팅 엔진으로 제한해왔다.

    따라서, 스파크는 저장소 시스템의 데이터를 연산하는 역할만 수행할 뿐 영구 저장소 역할은 수행하지 않는다.

    그 대신 클라우드 기반의 Azure Storage, Amazon S3, 분산 파일 시스템 Hadoop, 키/값 저장소 Cassandra, 메시지 전달 서비스 Kafka 등의 저장소를 지원한다.

    스파크는 내부에 데이터를 장시간 저장하지 않고 특정 저장소 시스템을 선호하지 않는다.

    스파크는 데이터 저장 위치에 상관없이 처리에만 집중하도록 만들어졌다.

    스파크는 연산 기능에 초점을 맞추면서 하둡같은 기존 빅데이터 플랫폼과 차별화하고 있다.

    Hadoop은 범용 서버 클러스터 환경에 하둡 파일 시스템과 컴퓨팅 시스템(map reduce)을 가지고 있고 두 시스템은 밀접한 관련이 있다.

    그래서 하둡에서는 둘 중 하나의 시스템만 단독으로 사용하기 어렵다.

    스파크는 하둡 저장소와 잘 호환된다. 그리고 하둡 아키텍처를 사용할 수 없는 환경에서도 많이 사용된다.

  1. 라이브러리

    스파크는 엔진에서 제공하는 표준 라이브러리와 오픈소스 커뮤니티에서 서드파티 패키지 형태로 제공하는 다양한 외부 라이브러리를 지원한다.

    스파크 코어 엔진 자체는 큰 변화가 없지만 라이브러리는 계속적으로 변해왔다.

    외부 라이브러리 목록은 spark-package.org에서 확인할 수 있다.

1.2 스파크의 등장 배경

데이터 분석에서 새로운 처리 엔진과 프로그래밍 모델이 필요한 근본적인 이유는 무엇일까?

애플리케이션과 하드웨어의 바탕을 이루는 경제적 요인의 변화 때문이다.

대부분의 시스템은 단일 프로세서에서만 실행되도록 설계되었다.

하지만 하드웨어의 성능 향상에는 한계가 있다. 따라서 계속적인 애플리케이션 성능 향상을 위해 병렬처리가 필요하고 스파크와 같은 새로운 프로그래밍 모델이 등장하게 되었다.

1.3 스파크의 역사

아파치 스파크는 UC버클리 대학교에서 논문을 통해 처음 세상에 알려졌다.

당시 하둡 맵리듀스는 수천 개의 노드로 구성된 클러스터에서 병렬로 데이터를 처리하는 최초의 오픈소스 시스템이자 클러스터 환경용 병렬 프로그래밍 엔진의 대표였다.

스파크 프로젝트를 진행했던 AMPLap은 새로운 프로그래밍 모델의 장단점을 이해하기 위해 맵리듀스 사용자들과 함께 일했다.

연구의 결과로 명확해진 사실 중 첫 번째는 클러스터 컴퓨팅의 엄청난 잠재력이었고, 두 번째는 맵리듀스 엔진을 사용하는 대규모 애플리케이션의 난이도와 효율성 문제였다.

스파크 팀은 개선을 위해 애플리케이션을 간결하게 개발할 수 있는 함수형 프로그래밍 기반의 API를 설계했다. 그리고 메모리에 저장된 데이터를 효율적으로 공유할 수 있는 새로운 엔진 기반의 API를 구현했다.

처음엔 배치 애플리케이션만 지원하다 대화형 데이터 분석이나 ad-hoc query 같은 강력한 기능을 제공하기 시작했다.

스파크의 조합형 API의 핵심 아이디어는 계속해서 진화중이다.

초기에는 함수형 연산 관점에서 API를 정의하였고 v1.0부터는 스파크 SQL을 추가하였다. 이후 DataFrame, ML Pipeline 등을 추가하였다.

1.4 스파크의 현재와 미래

스파크는 꾸준한 인기를 얻고 활용 사례도 늘고 있다.

Uber, Netflix 등 기술 회사 외 NASA, CERN 같은 연구소에서도 대규모 데이터셋 처리를 위해 사용한다.

1.5 스파크 실행하기

스파크는 스칼라로 구현되어 JVM 기반으로 동작한다. 따라서 JAVA가 설치되어 있어야하며 파이썬이나 R을 사용하려면 각 언어가 설치되어 있어야한다.

스파크 설치는 local, cloud, anaconda 가상환경 등 각자 사용하는 환경이 다르기 때문에 생략한다.

Chapter 2. 스파크 간단히 살펴보기

2.1 스파크의 기본 아키텍처

우리가 사용하는 컴퓨터, 한 대의 PC는 대규모 정보를 연산할만한 자원이나 성능을 갖지 못한다.

연산을 한다고 해도 오랜 시간이 걸린다.

컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아서 하나의 컴퓨터처럼 사용할 수 있게 한다.

그러나 컴퓨터 클러스터를 구성한다고만 해서 되는 것이 아니라 클러스터에서 작업을 조율할 수 있는 프레임워크가 필요하다.

스파크가 바로 그 역할을 한다. 클러스터의 데이터 처리 작업을 관리 및 조율한다.

스파크가 연산에 사용할 클러스터는 스파크 스탠드얼론 클러스터 매니저, 하둡 YARN, 메소스 같은 클러스터 매니저에서 관리한다. 사용자는 클러스터 매니저에 스파크 애플리케이션을 제출하고 클러스터 매니저는 제출받은 애플리케이션 실행에 필요한 자원을 할당하며 사용자는 이 할당받은 자원으로 작업을 처리한다.

2.1.1 스파크 애플리케이션

스파크 애플리케이션 구성

  • 드라이버 프로세스
    • 클러스터 노드 중 하나에서 실행
    • main() 실행
    • 스파크 애플리케이션 정보 유지 관리, 사용자 프로그램이나 입력에 대한 응답, 익스큐터 프로세스의 작업과 관련된 분석, 배포 그리고 스케줄링 역할
    • 필수 프로세스
  • 익스큐터 프로세스
    • 드라이버 프로세스가 할당한 작업을 수행
    • 드라이버 노드에 진행 상황 보고
    • 클러스터 매니저
      • 물리적 머신 관리, 스파크 애플리케이션 자원 할당
      • spark standalone cluster manager, Hadoop YARN, Mesos 중 택 1
      • 하나의 클러스터에서 여러 개의 스파크 애플리케이션 실행 가능
    • 사용자는 각 노드에 할당할 익스큐터 수를 지정할 수 있다.
    • 스파크는 클러스터/로컬 모드를 지원하기 때문에, 로컬모드에서는 드라이버와 익스큐터를 스레드 형태로 실행한다.
    • 익스큐터는 대부분 스파크 코드를 실행하는 역할을 한다.
    • 드라이버는 스파크의 언어 API를 통해 다양한 언어로 실행가능하다.

    스파크 애플리케이션의 핵심 사항

    • 스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용한다.
    • 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있다.

2.2 스파크의 다양한 언어 API

스파크의 언어 API를 이용하면 다양한 프로그래밍 언어로 스파크 코드를 실행할 수 있다.

스파크는 모든 언어에 맞는 몇몇 핵심 개념을 제공한다.

핵심 개념은 클러스터 머신에서 실행되는 스파크 코드로 변환된다.

구조적 API만으로 작성된 코드는 언어에 상관없이 유사한 성능을 발휘한다.

사용자는 스파크 코드를 실행하기 위해 SparkSession 객체를 진입점으로 사용한다.

Python/R로 스파크를 사용하는 경우 JVM에서 실행할 수 있는 코드로 스파크가 변환해준다.

2.3 스파크API

그렇다면 어떻게 다양한 언어로 스파크를 사용할 수 있을까?

스파크가 기본적으로 두 가지 API를 제공하기 때문이다.

  1. 저수준의 비구조적 API
  1. 고수준의 구조적 API

2.4 스파크 시작하기

실제 스파크 애플리케이션을 개발하려면 사용자 명령과 데이터를 스파크 애플리케이션에 전송하는 방법을 알아야한다.

대화형 모드(Spark-shell)로 스파크를 시작하면 스파크 애플리케이션을 관리하는 SparkSession이 자동으로 생성된다.

그러나 standalone application으로 스파크를 시작하면 사용자 애플리케이션 코드에서 SparkSession 객체를 직접 생성해야한다.

2.5 SparkSession

python에서 일정 범위의 숫자를 만드는 작업을 해보자.

>>> myRange = spark.range(1000).toDF("number") DataFrame[number: bigint]

생성한 DataFrame은 한 개의 column과 1000개의 row로 구성된다.

각 row에는 0부터 999까지 숫자값이 들어있다.

이 숫자들은 분산컬렉션을 나타낸다.

클러스터 모드에서 이 코드를 실행하면 숫자 범위의 각 부분들이 서로 다른 익스큐터에 할당된다.

2.6 DataFrame

가장 대표적인 구조적 API가 DataFrame이다.

  • DataFrame : 테이블의 데이터를 row와 column으로 표현
  • schema : column과 column의 타입을 정의한 목록

스파크의 DataFrame은 수천 대의 컴퓨터에 분산되어 있다.

이렇게 분산하는 이유는 한 대의 컴퓨터에 저장하기에 데이터가 너무 크거나 계산에 오랜 시간이 걸릴 수 있기 때문이다.

Python이나 R에서도 DataFrame을 사용한다. 그러나 일반적으로 단일 컴퓨터에 존재한다는 차이가 있다.

스파크는 Python, R 지원을 하기 때문에 Python의 Pandas DataFrame과 R의 DataFrame을 스파크 DataFrame으로 쉽게 변환할 수 있다.

2.6.1 파티션

스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 chunk 단위로 데이터를 분할한다.

파티션은 클러스터의 물리적 머신에 존재하는 row의 집합을 의미한다.

DataFrame의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타낸다.

만약 파티션이 하나라면 수천 개의 익스큐터가 있어도 병렬성은 1이 된다.

또한 수백 개의 파티션이 있어도 익스큐터가 하나라면 마찬가지로 병렬성은 1이다.

DataFrame을 사용하면 파티션을 스파크가 처리 방법을 결정한다. 이 관련 내용은 후에 보도록 한다.

2.7 트랜스포메이션

스파크의 핵심 데이터 구조는 한번 생성하면 변경할 수 없는 immutable한 특성이 있다.

만약 DataFrame을 변경하려면 변경 방법을 스파크에게 알려줘야한다.

이때 사용하는 명령이 트랜스포메이션이다.

  • DataFrame에서 짝수 찾기 트랜스포메이션 예제
    >>> divisBy2 = myRange.where("number % 2 = 0")

코드를 실행해도 결과 출력은 되지 않는다.

추상적인 트랜스포메이션만 지정했기 때문에 action을 호출하지 않으면 지정한 트랜스포메이션을 수행하지 않는다.

트랜스포메이션의 유형

  • narrow dependency
    • 각 입력 파티션이 하나의 출력 파티션에만 영향을 미친다.
  • wide dependency
    • 하나의 입력이 여러 출력 파티션에 영향을 미친다.

    2.7.1 지연 연산

    lazy evaluation은 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식이다.

    스파크는 특정 명령을 받으면 바로 실행하는 것이 아니라 기다렸다가 액션을 호출해야 수행한다.

    스파크는 이 과정을 거치면서 전체 데이터 흐름을 최적화한다는 강점이 있다.

    즉, 최대한 미루고 미뤄서 제일 효율적인 명령 수행하기 위한 계획을 한다는 것.

2.8 액션

실제 연산을 하기까지 트랜스포메이션을 통해 실행 계획을 한다는 것을 배웠다.

이제 실제 연산을 수행하려면 액션 명령을 내려야한다.

액션은 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령이다.

count()같은 메소드가 액션에 해당된다.

이름에서 알 수 있듯이 데이터프레임의 레코드 개수를 출력한다.

>>> divisBy2.count() 500

액션을 지정하면 spark job이 시작된다. 짝수만 찾는 필터(좁은 트랜스포메이션)를 수행한 뒤 파티션 별로 레코드 수를 카운트(넓은 트랜스포메이션)한다.

수행 후 각 언어에 적합한 네이티브 객체에 결과를 모은다. 이 때, 스파크가 제공하는 spark UI로 클러스터에서 실행 중인 spark job을 모니터링할 수 있다.

2.9 스파크 UI

spark job의 진행 상황을 모니터링할 때 사용하는 것이 스파크 UI다.

드라이버 노드 4040포트로 접속가능하다.

로컬에서는 http://localhost:4040으로 접속가능하다.

2.10 종합 예제

미국 교통통계국의 항공운항 데이터 중 일부를 스파크로 부넉한다.

CSV 파일만 사용한다.

컬럼은 DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count 로 구성된다.

< 예제 소스 다운로드 >

git clone https://github.com/JungAh12/Spark-The-Definitive-Guide.git

데이터셋 head 출력

head ./Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv
>>> flightData2015 = spark\ ... .read\ ... .option("inferSchema", "true")\ ... .option("header", "true")\ ... .csv("spark*/data/flight-data/csv/2015-summary.csv")  >>> flightData2015.take(3) [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

take()는 위에서 사용했던 head 명령과 같은 결과를 보여준다.

정수 타입인 count 컬럼을 기준으로 데이터를 정렬하는 트랜스포메이션을 추가해보자.

>>> flightData2015.sort("count")

sort()는 트랜스포메이션이기 때문에 아무런 변화가 없다. 스파크의 쿼리 실행 계획을 확인하고 싶을 땐 explain()을 호출한다.

>>> flightData2015.sort("count").explain()  == Physical Plan == *(1) Sort [count#18 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(count#18 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#32]    +- FileScan csv [DEST_COUNTRY_NAME#16,ORIGIN_COUNTRY_NAME#17,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/root/spark-3.1.1-bin-hadoop2.7/spark_ex/data/flight-data/csv/2015-summary..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

실행 계획은 위에서 아래 방향으로 읽고, 최종 결과는 가장 위, 데이터 소스는 가장 아래에 있다.

이제 실행 계획을 시작하기 위해 액션을 호출한다.

그 전에 몇가지 설정이 필요하다.

스파크는 shuffle 수행 시, 기본적으로 200개의 shuffle partition을 생성한다.

이 값을 5로 설정해서 출력 파티션 수를 줄여준다.

>>> spark.conf.set("spark.sql.shuffle.partitions", "5") >>> flightData2015.sort("count").take(2)  [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1), Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

2.10.1 DataFrame과 SQL

사용자가 SQL이나 DataFrame으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획으로 컴파일 한다.

스파크 SQL을 사용하면 모든 dataframe을 테이블이나 뷰로 등록한 후 SQL 쿼리를 사용할 수 있다.

createOrReplaceTempView()를 호출하면 모든 DataFrame이 테이블이나 뷰로 만들어진다.

flightData2015.createOrReplaceTempView("flightData2015")
  • 데이터 조회
>>> sqlWay = spark.sql(""" ... SELECT DEST_COUNTRY_NAME, COUNT(1) ... FROM flightdata2015 ... GROUP BY DEST_COUNTRY_NAME ... """)  >>> dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count() >>> sqlWay.explain()  == Physical Plan == *(2) HashAggregate(keys=[dest_country_name#16], functions=[count(1)]) +- Exchange hashpartitioning(dest_country_name#16, 5), ENSURE_REQUIREMENTS, [id=#80]    +- *(1) HashAggregate(keys=[dest_country_name#16], functions=[partial_count(1)])       +- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/root/spark-3.1.1-bin-hadoop2.7/spark_ex/data/flight-data/csv/2015-summary..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>  >>> dataFrameWay.explain()  == Physical Plan == *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)]) +- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#61]    +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])       +- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/root/spark-3.1.1-bin-hadoop2.7/spark_ex/data/flight-data/csv/2015-summary..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>

두 실행 계획은 같은 기본 실행 계획으로 컴파일된다.

  • max 함수를 사용하여 최대 비행 횟수를 구해보자.
>>> spark.sql("SELECT max(count) from flightdata2015").take(1) [Row(max(count)=370002)]  >>> from pyspark.sql.functions import max >>> flightData2015.select(max("count")).take(1) [Row(max(count)=370002)]
  • top 5 도착 국가를 찾아보자
>>> maxSql = spark.sql(""" ... SELECT DEST_COUNTRY_NAME, sum(count) as destination_total ... FROM flightdata2015 ... GROUP BY DEST_COUNTRY_NAME ... ORDER BY sum(count) DESC ... LIMIT 5 ... """)  >>> maxSql.show()  +-----------------+-----------------+ |DEST_COUNTRY_NAME|destination_total| +-----------------+-----------------+ |    United States|           411352| |           Canada|             8399| |           Mexico|             7140| |   United Kingdom|             2025| |            Japan|             1548| +-----------------+-----------------+
>>> from pyspark.sql.functions import desc >>> flightData2015.groupBy("DEST_COUNTRY_NAME")\ ... .sum("count")\ ... .withColumnRenamed("sum(count)", "destination_total")\ ... .sort(desc("destination_total"))\ ... .limit(5)\ ... .show() +-----------------+-----------------+ |DEST_COUNTRY_NAME|destination_total| +-----------------+-----------------+ |    United States|           411352| |           Canada|             8399| |           Mexico|             7140| |   United Kingdom|             2025| |            Japan|             1548| +-----------------+-----------------+

실행 계획은 트랜스포메이션의 DAG(directed acyclic graph)이고 액션이 호출되면 결과를 만들어낸다.

2.11 정리

지금까지 아파치 스파크의 기초를 배웠다.

트랜스포메이션과 액션, Dataframe의 실행 계획을 최적화하기 위해 트랜스포메이션의 DAG를 지연 실행하는 방법을 배웠다.

그리고 데이터가 파티션으로 구성되는 방법과 복잡한 트랜스포메이션을 실행하는 단계도 함께 배웠다.

다음 챕터에서는 스파크 에코시스템과 스트리밍, ML에 이르는 고급 개념과 다양한 기능을 배운다.