Chapter 1. 아파치 스파크 소개: 통합 분석 엔진

Posted by yunki kim on March 2, 2024

1. 스파크의 시작

구글 검색 엔진은 수많은 데이터를 색인하고 검색해야 한다. 이는 RDBMS 같은 저장 시스템으로는 불가능했기에 구글 파일 시스템(GFS - Google File System), 맵리듀스(MR - MapReduce), 빅테이블(BigTable) 등을 만들게 되었다. 이들은 각각 다음과 같은 역할을 한다.

  • GFS: 클러스터 안에서 상용 서버에 장애 내구성이 있는 분산 파일시스템을 제공한다.
  • 빅테이블: GFS를 기반으로 구조화된 대규모 데이터의 저장 수단을 제공한다.
  • MR: 거대한 데이터를 쪼개서 여러 머신들에 분산시켜 로직을 수행한 뒤 하나로 합치는 프레임워크. 실제 개발자는 map과 reduce 함수를 구현해야 한다. map은 전체 데이터를 쪼갠 청크에 대해 수행할 로직이다. Reduce는 분산되어 처리된 결과 값들을 하나로 합치는 과정이다. 이 연산들은 분산된 머신들에서 병렬적으로 수행된다.

하둡 파일 시스템(HDFS - HaDoop File System)은 MR을 구현한 분산 컴퓨팅 프레임워크이다. 하둡은 수많은 사용자와 오픈소스 커뮤니티의 기여자들이 있지만 다음과 같은 단점들이 존재한다.

  • 높은 운영 복잡도로 인해 관리가 쉽지 않다.
  • 일반적인 배치 처리를 위한 MR API는 많은 양의 기본 셋업 코드를 요구했고, 장애 대응이 불안정했다.
  • 방대한 데이터로 인해 MR 태스크가 많아지면, 각 태스크는 중간 과정의 데이터를 디스크에 write 해야 했다. 결국 반복적인 디스크 I/O로 인해서 많은 시간이 소요되었다.
  • 하둡 MR은 배치 처리를 위한 대규모 작업에는 적당하지만 ML이나 스트리밍, 질의 등 다른 워크 로드와 연계해 쓰기엔 한계가 있다.

아파치 스파크는 초기부터 하둡이 가진 위와 같은 문제를 고려해서 고안되었다. 때문에 초기부터 하둡 MR 보다 10~20배 빨랐으며, 지금은 더 압도적인 차이가 난다. 그뿐만 아니라 하둡에 비해 다음과 같은 장점도 존재한다.

  • 더 높은 장애 내구성
  • 더 높은 병렬성
  • 맵리듀서 연산을 위한 중간 결과를 메모리에 저장
  • 쉽고 구성이 간편한 API를 다양한 언어로 제공
  • 다양한 워크로드를 통일성 있게 지원

2. 아파치 스파크란 무엇인가

아파치 스파크는 데이터 센터나 클라우드에서 대규모 분산 데이터 처리를 하기 위해 설계된 통합형 엔진이다. 스파크는 ML, 대화형 질의를 위한 SQL, 실시간 데이터 처리를 위한 스트리밍 처리, 그래프 처리 등을 위해 쉽게 사용 가능한 API들로 이루어진 라이브러리들을 가지고 있다.

스파크 설계 철학에는 다음과 같은 네 개의 핵심 특성이 있다.

2.1 속도

스파크는 다양한 방법으로 속도를 높였다.

  • 현대 서버는 수백 GB의 메모리, 수많은 코어, 효과적인 멀티스레딩과 병렬 처리를 지원하는 유닉스 기반 OS 등이 포함되어 있다. 스파크는 이런 요소들을 활용해 최적화를 진행했다.
  • 스파크는 질의 연산을 방향성 비순환 그래프(DAG - directed acyclic graph)로 구성한다. DAG의 스케줄러와 질의 최적화 모듈은 효율적인 연산 그래프를 만든다. 그리고 이를 태스크로 분해해서 클러스터의 워커 노드 위에서 병렬 수행될 수 있게 한다.
    • DAG는 여러 노드와 엣지로 구성되어 있고, 각 엣지는 한 노드에서 다른 노드로의 방향성을 가지는 그래프이다. 그래프에는 순환이 존재하지 않는다.
  • 물리적 실행 엔진인 텅스텐(Tungsten)은 전체적 코드 생성(whole-stage code generation)이라는 기법을 사용해서 실행을 위해 간결한 코드를 생성해낸다.

2.2 사용 편리성

스파크는 데이터 프레임이나 데이터세트 같은 고수준 데이터 추상화 계층 아래에 유연한 분산 데이터 세트(RDD - Resilient Distributed Dataset)라는 논리 자료구조를 구축해서 단순성을 실현했다. 연산의 종류로서 트랜스포메이션(transformation)과 액션(action)의 집합과 단순한 프로그래밍 모델을 제공해서 사용자들이 각자 편한 언어로 빅데이터 애플리케이션을 만들 수 있게 했다.

  • 데이터세트: 분석 또는 처리를 위해 함께 구성되고 저장된 데이터의 구조화된 모음.
  • 데이터 프레임: 판다스는 파이썬으로 작성된 오픈소스 라이브러리이며, 빠르고 조정 가능한 데이터 구조와 데이터 분석 툴을 제공한다. 판다스는 CSV 같은 데이터나 DB를 행과 열이 있는 파이썬 개체로 바꾸는데, 이 개체가 데이터 프레임이다.
  • RDD: 스파크의 기본 데이터 구조. 데이터는 다수 서버에 분산 저장되며, 병렬 처리가 가능하고 장애가 발생할 경우 스스로 복수될 수 있는 내성을 가진다.
    • 트랜스포메이션: 기존 RDD에서 새로운 RDD를 생성하는 연산들
    • 액션: 실제 데이터를 가지고 작업을 할 때 사용하는 연산들

2.3 모듈성

스파크 연산은 다양한 워크 로드 타입에 적용할 수 있고, 다양한 프로그래밍 언어로 표현할 수 있다. 스파크는 여러 API로 이루어진 통합 라이브러리를 제공한다. 핵심 컴포넌트로는 스파크 SQL, 스파크 정형화 스트리밍, 스파크 MLib, Graphx가 있다.

2.4 확장성

스파크는 저장보단 빠른 병렬 연산 엔진에 초점이 맞추어져 있다. 저장과 연산을 모두 포함하는 하둡과 달리 스파크는 이 둘을 분리했다. 때문에 수많은 데이터 소스에서 데이터를 읽어서 메모리에 처리할 수 있다.

3. 단일화된 스택으로의 아파치 컴포넌트

스파크는 네 개의 다양한 워크 로드를 위한 라이브러리를 지원한다. 이 컴포넌트들은 스파크의 중심 장애 대응 엔진과는 별도로 존재하고, API를 써서 스파크 애플리케이션을 만들면 스파크 코어 엔진이 적절한 DAG(Directed Acyclic Graph)로 변환해 실행하게 된다. 때문에 어떤 언어로 스파크 코드를 작성해 정형화 API를 사용해도, 실제 코드는 고도로 경량화된 바이트코드로 변환되어 클러스터 전체에 나뉘어 워커 노드의 JVM에서 실행된다.

sprak structure

3.1 스파크 SQL

구조화된 데이터(RDBMS 테이블, CSV, Parquet 등)를 읽어서 테이블을 만들 수 있다. 또 한, 지원하는 여러 언어로 정형화 API를 사용해 SQL 계통의 질의를 써서 데이터를 바로 데이터 프레임으로 읽어들일 수 있다.

3.2 스파크 MLlib

MLlib는 범용 머신러닝 알고리즘들을 포함한 라이브러리이다. MLlib는 모델 구축을 위한 고수준 데이터 프레임 기반 API로 여러 인기 있는 머신러닝 알고리즘을 제공한다. 이 API는 특성들을 축출하고 변형하고 파이프라인을 구축하고 배포하는 동안 모델을 보존해 준다. 또 한, 일반적인 선형대수 연산을 지원한다.

3.3 스파크 정형화 스트리밍

스파크는 다른 데이터 소스에서 들어오는 스트리밍 데이터, 정적 데이터에 대해 실시간 연결을 하고 반응하기 위해 모두 새로운 레코드가 끝에 지속적으로 추가되는 형태의 테이블로 본다. 때문에 개발자들은 이를 정형화된 테이블로 바라보고 정적인 테이블에 쿼리를 날리는 것처럼 사용하면 된다. 정형화 스트리밍 모델 하부에 있는 스파크 SQL 엔진은 장애 복구와 지연 데이터의 모든 측면을 관리해서 개발자들이 보다 쉽게 스트리밍 애플리케이션을 작성하게 해준다.

3.4 GraphX

그래프를 조작하고, 그래프 병렬 연산을 수행하기 위한 라이브러리이다. 분석, 연결 탐색 등의 표준적인 알고리즘과 커뮤니티 사용자들이 기여한 알고리즘을 포함하고 있다.

4. 아파치 스파크의 분산 실행

스파크는 분산 데이터 처리 엔진이고 컴포넌트들은 클러스터의 머신들 위에서 협업하며 동작한다. 컴포넌트들이 어떻게 스파크 아키텍처 위에서 동작하면서 서로 통신하는지, 어떻게 배포가 가능한지를 알아보자.

하나의 스파크 애플리케이션은 스파크 클러스터의 병렬 작업들을 조율하는 하나의 드라이버 프로그램으로 이루어진다. 드라이버는 SparkSession 객체를 통해 클러스터의 분산 컴포넌트들에 접근한다.

4.1 스파크 세션(Spark Session)

스파크 세션은 스파크 2.0부터 모든 연산과 데이터에 대한 통합 연결 채널이 되었다. 즉, 모든 스파크 기능을 한 군데서 접근할 수 있는 entry point이다. 이 채널을 통해 다음과 같은 연산을 실행할 수 있다.

  • JVM 실행 파라미터를 만든다
  • 데이터 프레임이나 데이터 세트를 정의한다
  • 데이터 소스에서 데이터를 읽는다
  • 메타데이터에 접근해서 스파크 SQL 질의를 실행한다.

4.2 Spark Execuator

Spark executor는 각 클러스터의 워커 노드에서 동작한다. Executor는 드라이버 프로그램과 통신해서 워커에서 태스크를 실행하는 역할을 한다. 통상적으로 배포 모드에선 노드당 하나의 executor가 실행된다.

4.3 클러스터 매니저

스파크 애플리케이션이 실행되는 클러스터에서 자원을 관리, 할당하는 역할을 한다. 스파크가 지원하는 클러스터 매니저 종류는 다음과 같다.

  • standalone cluster manger
  • 아파치 하둡 얀
  • 아파치 메소스
  • 쿠버네티스

4.4 스파크 드라이버

SparkSession 객체를 초기화하는 책임을 가진 스파크 애플리케이션이다. 다음과 같은 역할을 한다.

  • 클러스터 매니저와 통신해서 spark executor들을 위해 필요한 자원을 요청한다(CPU, memory 등)
  • 모든 스파크 작업을 DAG 연산 형태로 변환하고 스케줄링한다
  • 각 실행 단위를 태스크로 나누어서 스파크 executor들에게 분배한다.
  • 자원이 할당된 뒤엔 드라이버가 executor와 직접 통신한다.

spark element relationship

4.5 배포 모드

스파크는 여러 환경에서 다른 설정으로 돌아갈 수 있게 다양한 배포 모드를 지원한다. 클러스터 매니저는 추상화되어 있어서 실행되는 환경에 대한 자세한 정보가 없어도 잘 동작한다. 스파크 배포 모드는 다음과 같다.

spark deploy mode

4.6 분산 데이터와 파티션

물리적인 데이터를 파티션으로 여러 저장소에 분산해서 저장된다. 스파크는 각 파티션을 메모리의 데이터 프레임 객체로 바라본다. Spark executor는 가급적 데이터 지역성을 고려해 네트워크에서 가장 가까운 파티션을 읽어들이도록 태스크를 할당한다.

파티셔닝은 효과적인 병렬 처리를 가능하게 해 준다. 데이터를 파티션으로 분산해 저장하는 방식은 spark executor가 네트워크 사용을 최소화하고, 가까이 있는 데이터만 처리할 수 있게 한다. 즉, 각 executor의 코어는 데이터 파티션과 1:1 대응된다.

spark parition

1
2
3
4
5
6
7
8
# 클러스터에 나뉘어서 저장된 물리적 데이터들을 8개의 파티션으로 나누고, 
# 각 executor가 하나 이상의 파티션을 메모리로 읽어들이는 코드
log_df = spark.read.text("path_to_large_text_file").repartition(8)
print(log_df.rdd.getNumPartitions())

# 1만 개의 정수로 구성된 데이터 플레임을 만들어서 8개 파티션으로 메모리에 분산한다
df = spark.range(0, 10000, 1, 8)
print(df.rdd.getNumPartitions())

출서 - 러닝 스파크