CHAPTER 4
구조적 API 개요
2부에서는 스파크의 구조적 API를 자세히 살펴본다. 구조적 API는 비정형 로그 파일부터 반정형 CSV 파일, 매우 정형적인 Parquet파일까지 다양한 유형의 데이터를 처리할 수 있다. 구조적 API에는 다음과 같은 세 가지 분산 컬렉션 API가 있다.
- Dataset
- DataFrame
- SQL 테이블과 뷰
배치(batch) 와 스트리밍 (Streaming) 처리에서 구조적 API를 사용할 수 있다. 구조적 API를 활용하면 배치 적업을 스트리밍 작업으로 손쉽게 변환할 수 있다.
구조적 API는 데이터 흐름을 정의하는 기본 추상화 개념이다. 지금까지는 스파크의 여러 기능을 튜토리얼 기반으로 가볍게 알아보았다. 이장에서는 세 가지 기본 개념을 설명한다.
- 타입형(typed)/비타입형(untyped) API의 개념과 차이점
- 핵심 용어
- 스파크가 구조적 API의 데이터 흐름을 해석하고 클러스터에서 실행하는 방식
4장을 시작하기전 Spark 의 기본개념과 정의를 다시 한번 떠올려보자
- 스파크는 트랜스포메이션의 처리 과정을 정의하는 분산 프로그래밍 모델이다.
- 사용자가 정의한 다수의 트랜스포메이션은 지향성 비순환 그래프(DAG)로 표현되는 명령을 만든다.
- 액션은 하나의 JOB을 클러스터에서 실행하기 위해 스테이지와 태스크로 나누고 DAG 처리 프로세스를 실행한다.
- 트랜스토매이션과 액션으로 다루는 논리적 구조가 바로 DataFrame과 Dataset이다.
- 새로운 DataFrame 이나 Dataset을 만들려면 트랜스토메이션을 호출해야 한다.
- 그리고 사용한 언어에 맞는 데이터 타입으로 변환하려면 액션을 호출해야 한다.
4.1 DataFrame과 Dataset
DataFrame과 Dataset의 정의
-DataFrame과 Dataset은 잘 정의된 로우와 컬럼을 가지는 분산 테이블 형태의 컬렉션이다.
-각 컬럼은 다른 컬럼과 동이랗ㄴ 수의 로우를 가져야 한다. ( '값 없음'은 null로 표기한다.)
-컬렉션의 모든 로우는 같은 데이터 타입 정보를 가지고 있어야 한다.
-DataFrame과 Dataset은 결과를 생성하기 위해 어떤 데이터에 어떤 연산을 적용해야 하는지 정의하는 지연 연산의 실행계획이며, 불변성을 가진다.
-DataFrame 에 액션을 호출하면 스파크는 트랜스포메이션을 실제로 실행하고 결과를 반환한다.
4.2 스키마
스키마는 DataFrame의 컬럼명과 데이터 타입을 정의한다.
4.3 스파크의 구조적 데이터 타입 개요
스파크는 사실상 프로그래밍 언어이다. 스파크는 실행 계획 수립과 처리에 사용하는 자체 데이터 타입 정보를 가지고 있는 카탈리스트 엔진을 사용한다. 카탈리스트 엔진은 다양한 실행 최적화 기능을 제공한다. 스파크는 자체 데이터 타입을 지원하는 여러 언어 API와 직접 매핑되며, 각 언어에 대한 매핑 테이블을 가지고 있다.
// 스칼라 코드
val df = spark.range(500).toDF('number')
df.select(df.col("number")+10)
# 파이썬 코드
df = spark.range(500).toDF('number')
df.select(df['number']+10)
지원하느 언어를 이용해 작성된 표현식이 카탈리스트 엔진에서 스파크 데이터 타입으로 변환해 명령을 처리한다.
4.3.1 DataFrame과 Dataset 비교
'비타입형' DataFrame
'타입형' Dataset
DataFrame을 '비타입형' 으로 보는 것이 다소 부정확할 수도 있지만, 스키마에 명시된 데이터 타입의 일치 여부를 런타임 이 되어서야 확인한다. 반명 Dataset은 스키마에 명시된 데이터 타입의 일치 여부를 컴파일 타임 에 확인한다. Dataset 은 JVM 기반에서만 지원한다.
이 책의 예제는 대부분 DataFrame을 사용한다. DataFrame은 Row 타입으로 구성된 Dataset이다. Row 타입은 스파크가 사용하는 '연산에 최적화된 인메모리 포멧'의 내부적인 표현 방식이다. Row 타입을 사용하면 JVM의 객체 초기화 부하를 줄이고 자체 데이터 포멧을 사용해서 효율적으로 연산이 가능하다.
- DataFrame 을 사용하면 스파크의 최적화된 내부 포멧을 사용한다는 것만 기억하자
4.3.2 컬럼
컬럼은 정수, 문자열 같은 단순 데이터 타입 , 배열이나 맵 같은 복합 데이터 타입 그리고 null값을 표현한다.
4.3.3 로우
로우는 데이터 레코드 이다. DataFrame의 레코드는 Row 타입으로 구성된다. 로우는 SQL, RDD, 데이터소스에서 얻거나 직접 만들 수 있다.
spark.range(2).toDF().collect()
spark.range(2).collect()
두 코드 모두 Row 객체로 이루어진 배열을 반환한다.
4.3.4 스파크 데이터 타입
스파크는 여러 가지 내부 데이터 타입을 가지고 있다.
고정형 DataFrame을 그대로 사용하는 경우는 거의 없으며, 대부분 DataFarame의 처리와 변환을 수행한다. 따라서 구조적 API의 실행 과정을 알아야 한다.
4.4 구조적 API의 실행 과정
스파크 코드가 클러스터에서 실제 처리되는 과정을 설명한다.
- DataFrame/Dataset/SQL을 이용해 코드를 작성한다.
- 정상적인 코드라면 스파크가 논리적 실행 계획으로 변환한다.
- 스파크는 논리적 실행 계획을 물리적 실행 계획으로 변환하여 그 과정에서 추가적인 최적화를 할 수 있는지 확인한다.
- 스파크는 클러스터에서 물리적 실행 계획(RDD 처리)을 실행한다.
4.4.1 논리적 실행 계획
첫 번째 실행 단계에서는 사용자 코드를 논리적 실행 계획으로 변환한다.
논리적 실행 계획 단계에서는 추상적 트랜스포메이션만 표현한다. 이 단계에서는 드라이버나 익스큐터의 정로를 고려하지 않는다. 그리고 사용자의 다양한 표현식을 최적화된 버전으로 변환한다.
- 검증 전 논리적 실행 계획 : 코드의 유효성과 테이블이나 컬럼의 존재 여부만을 판단하는 과정
- 스파크 분석기 : 카탈로그에서 컬럼과 테이블을 검증
- 논리적 실행 계획 최적화 : 카탈리스트 옵티마이저가 조건절 푸시다운 이나 선택절 구문을 이용해 최적화
필요한 경우 확장형 패키지를 만들수 있다고한다.
4.4.2 물리적 실행 계획
스파크 실행 계획인라고도 불리는 물리적 실행 계획은 논리적 실행 계획을 클러스터 환경에서 실행하는 방법을 정의한다.
물리적 실행 전략을 생성하고, 비용 모델을 이용해서 비교한 후 최적의 전략을 선택한다. 비용을 비교하는 예는 사용하려는 테이블의 크기나 피티션 수 등의 물리적 속성을 고려해 지정된 조인 연산 수행에 필요한 비용을 계산하고 비교하는 것이다.
물리적 실행 계획은 일련의 RDD와 트랜스포메이션으로 변환된다. 스파크는 DataFrame, Dataset,SQL로 정의된 쿼리를 RDD 트랜스포메이션으로 컴퍼일 한다. 따라서 스파크를 컴파일러 라고 부르기도 한다.
4.4.3 실행
스파크는 물리적 실행 계확을 선전한 다음 저수준 프로그래밍 인터페이스인 RDD를 대상으로 모든 코드를 실행한다. 스파크는 런타임 전체에 태스크나 스테이지를 제거할 수 있는 자바 바이트 코드를 생성해 추가적인 최적화를 수행한다. 마지막으로 스파크는 처리결과를 사용자에게 반환한다.
4.5 정리
사용자가 코드가 물리적인 실행 코드로 변환되는 과정
#CHAPTER 5
구조적 API 기본 연산
5장에서는 DataFrame과 DataFrame의 데이터를 다루는 기능을 소개한다. 특히 DataFrame 의 기본 기능을 중심적으로 다룬다.
5.1 스키마
DataFrame 은 Row 타입의 레코드 와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성된다.
# 파이썬 코드
df = spark.read.format('csv').load('temp/data/path/test_date_0001.csv')
df.printSchema()
스키마는 DataFrame의 컬럼명과 데이터 타입을 정의한다. 데이터소스에서 스키마를 얻거나 직접 정의할 수 있다.
데이터를 읽기전에 스키마를 정의해야 하는지는 상황에 따라 달라진다.
# 파이썬 코드
df = spark.read.table('training.ig.invoice_bronze')
# 실행 결과
스키마는 여러 개의 StructField 타입 필트로 구성된 StructType 객체이다.
5.2 컬럼과 표현식
스파크의 컬럼은 스프레드시트, R의 dataframe, Pandas의 DataFrame 컬럼과 유사하다. 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있다.
스파크의 컬럼은 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조 이다. 따라서 커럼의 실젯값을 얻으려면 로우가 필요하고, 로우를 얻으려면 DataFrame이 필요하다. DataFrame을 통하지 않으면 외부에서 컬럼에 접근할 수 없다. 컬럼 내용을 수정하려면 반드시 DataFrame 의 스파크 트랜스포메이션을 사용해야 한다.
5.2.1 컬럼
컬럼을 생성하고 참조할 수 있는 여러 가지 방법이 있지만, col 함수나 column 함수를 사용하는 것이 가장 간단하다. 이들 함수는 컬럼명을 인수로 받는다.
# 파이썬 코드
from pyspark.sql.functions import col, column
col('tmepCol')
column('tempCol')
컬럼은 컬럼명을 카탈로그
에 저장된 정보와 비교하기 전까지 미확인 상태로 남는다.
명시적 컬럼참조
DataFrmae의 컬럼은 col 메서드로 참조한다. col 메서드는 조인 시 유용하다. 예를 들어 DataFrame 의 어떤 컬럼을 다른 DataFrame 의 조인 대상 컬럼에서 참조하기 위해 col 메서드를 사용한다.
df.col('count')
5.2.2 표현식
앞서 DataFrame 을 정의할 때 컬럼은 표현식이라고 했다. 그럼 표현식이란?
표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미한다. 여러 컬럼명을 입력으로 받아 식별하고, '단일 값' 을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수라고 생각할 수 있다. 여기서 단일 값은 Map 이나 Array 와 같은 복합 데이터 타입일 수 있다.
표현식은 expr 함수로 가장 간단히 사용할 수 있다. 이 함수를 사용해 DataFrame의 컬럼을 참조할 수 있다.
표현식으로 컬럼 표현
컬럼은 표현식의 일부 기능을 제공한다. col() 함수를 호출해 컬럼에 트랜스포메이션을 수행하려면 반드시 컬럼 참조를 사용해야 한다. expr 함수의 인수로 표현식을 사용하면 표현식을 분석해 트랜스포메이션과 컬럼 참조를 알아낼 수 있으며, 다음 트랜스포메이션에 컬럼 참조를 전달할 수 있다.
- 컬럼은 단지 표현식일 뿐이다.
- 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일된다.
(((col('someCol') + 5 ) * 200 ) -6 ) < col('otherCol')
위 코드는 다음 논리적 트리의 개요를 나타낸다.
위 그래프는 다음 코드로 동일하게 표현할 수 있다.
# 파이썬 코드
from pyspark.sql.functions import expr
expr("(((someCol + 5 ) * 200 ) - 6 ) < otherCol")
이 내용은 아주 중요하다. SQL의 SELECT 구문에 이전 표현식을 사용해도 잘 동작하며 동일한 결과를 생성한다. 그 이유는 SQL 표현식과 위 예제의 DataFrame 코드는 실행 시점에 동일한 논리 트리로 컴파일 되기 때문이다. 따라서 DataFrame 코드나 SQL로 표현식을 작성할 수 있으며 동일한 성능을 발휘한다.
DataFrame 컬럼에 접근하기
printSchema 메서드로 DataFrame의 전체 컬럼 정보를 확인할 수 있다. 하지만 프로그래밍 방식으로 컬럼에 접근할 때는 DataFrame columns 속성을 사용한다.
spark.read.table('training.ig.invoice_bronze').columns
5.3 레코드와 로우
스파크에서 DataFrame 의 각 로우는 하나의 레코드 입니다. 스파크는 레코드를 Row 객체로 표현한다. 스파크는 값을 생성하기 위해 컬럼 효현식으로 Row 객체를 다룬다. Row 객체는 내부에 바이트 배열을 가진다. 이 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있으므로 사용자에게 절대 노출되지 않는다.
DataFrame 을 사용해 드라이버에 개별 로우를 반환하는 명령은 항상 하나 이상의 Row 타입을 반환한다.
DataFrame 의 first 메서드로 로우를 확인해보자
# 파이썬
df.first()
# 결과
#Row(InvoiceId='1', CustomerId='2', InvoiceDate='2007-01-01 00:00:00', BillingAddress='Theodor-Heuss-Straße 34', BillingCity='Stuttgart', BillingState=None, BillingCountry='Germany', BillingPostalCode='70174', Total='1.98')
5.3.1 로우 생성하기
각 컬럼에 해당하는 값을 사용해 Row 객체를 직접 생성할 수 있다. Row 객체는 스키마 정보를 가지고 있지 않다. DataFrame만 유일하게 스키마를 갖는다. 그러므로 Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다.
# 파이썬
from pyspark.sql import Row
myRow = Row('Hello', None , 1, False)
print(myRow[0])
# 출력
# Hello
5.4 DataFrame 의 트랜스포메이션
이어서 DataFrame 을 다루는 방법을 알아보자.
- 로우나 컬럼 추가
- 로우나 컬럼 제거
- 로우를 컬럼으로 변환하거나, 그 반대로 변환
- 컬럼값을 기준으로 로우 순서 변경
5.4.1 DataFrame 생성하기
원시 데이터소스에서 DataFrame 을 생성할 수도 있다. 그리고 생성한 DataFrame은 이장 후반부에서 SQL 쿼리를 실행하고 SQL의 기본 트랜스포메이션을 확인하기 위해 임시 뷰로 등록한다.
# 파이썬
df = spark.read.table('training.ig.invoice_bronze')
df.createOrReplaceTempView('temp_view')
Row 객체를 가진 Seq 타입을 직접 변환해 DataFrame 을 생성할 수도 있다.
# 파이썬
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField('some',StringType() , True),
StructField('col',StringType() , True),
StructField('names',LongType() , True),
])
myRow = Row('Hello',None , 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
display(myDf)
출력 결과
유용하게 사용할 수 있는 메서드
- 컬럼이나 표현식을 사용하는 select 메서드
- 문자열 표현식을 사용하는 selectExpr 메서드
- 메서드로 사용할 수 없는 org.apache.spark.sql.fucntions 패키지에 포함된 다양한 함수
위 세가지로 대부분의 트랜스포메이션 작업을 해결할 수 있다.
5.4.2 select와 selectExpr
select와 selectExpr 메서드를 사용하면 데이터 테이블에 SQL을 실행하는 것처럼 DataFrame에서도 SQL을 사용할수 있다.
다시말해 DataFrame 의 컬럼을 다룰때 SQL을 사용할 수 있다. DataFrame 을 사용한 몇 가지 예제를 보면서 알아보자. 그중 select 메서드를 사용하는 방법이 가장 쉽다.
df.select('InvoiceId').show()
출력 결과
같은 형태로 여러 컬럼을 select 할수 있다.
하지만 col 객체와 문자열을 함께쓰면 컴파일 에러가 발생한다.
....고 하는데... 되는데? 업데이트 됬나보다
# 이렇게 쓰면 컴파일 에러라고한다. 그런데 된다... 버전이 다른가보다
df.select('InvoiceId', col('CustomerId')).show()
출력 결과
expr 함수는 가장 유연한 참조 방식이다. expr 함수는 단순 컬럼 참조나 문자열을 이용해 컬럼을 참조할 수 있다.
df.select(expr('InvoiceId as ic')).show()
또는
df.select(expr('InvoiceId ').alias('ic')).show()
selectExpr 메서드는 자주 사용하는편리한 인터페이스 중 하나이다.
df.selectExpr('InvoiceId as ic', 'InvoiceId').show()
출력 결과
다음은 selectExpr 를 사용해서 새로운 커럼을 추가하는 예제다.
df_expr = df.selectExpr(
"*",
"InvoiceId as ic"
).limit(2)
display(df_expr)
출력 결과
select 표현식에는 DataFrame 의 컬럼에 대한 집계 함수를 지정할 수 있다.
df.selectExpr('avg(InvoiceId)', 'count(distinct(customerId))').show()
출력 결과
5.4.3 스파크 데이터 타입으로 변환하기
때로는 새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야한다. 명시적인 값은 상숫값일 수 있고, 추후 비교에 사용할 무언가가 될 수도 있다. 이때 리터럴
을 사용한다. 리터럴은 프로그래밍 언어의 리터럴 값을 스파크가 이해할 수 있는 값으로 변환한다.
# 파이썬 코드
from pyspark.sql.functions import lit
df_lit = df.select(expr('*'), lit(1).alias('One'))
display(df_lit)
결과
5.4.4 컬럼 추가하기
DataFrame에 신규 컬럼을 추가하는 공식적인 방법은 DataFrame의 withColumn 메서드를 사용하는 것이다.
df_with = df.withColumn('numberOne', lit(1))
display(df_with)
withColumn 메서드는 두 개의 인수를 사용한다. 하나는 컬럼명, 다른 하나는 값을 생성할 표현식이다.
withColumn 으로 컬럼명을 변경할수도 있다.
df_chan = df.withColumn('changeCol', col('Total'))
#또는 df_chan = df.withColumn('changeCol', expr('Total'))
display(df_chan)
5.4.5 컬럼명 변경하기
withColumn 메서드 대신 withColumnRenamed 메서드로 컬럼명을 변경할 수도 있다.
df.withColumnRenamed('Total','Total_rename').columns
결과
['InvoiceId',
'CustomerId',
'InvoiceDate',
'BillingAddress',
'BillingCity',
'BillingState',
'BillingCountry',
'BillingPostalCode',
'Total_rename']
5.4.6 예약 문자와 키워드
공백이나 하이픈( - ) 같은 예약 문자는 컬럼명에 사용할 수 없다. 허용하려면 백틱( `` ) 문자를 이용해 이스케이핑 해야한다.
5.4.7 대소문자 구분
기본적으로 스파크는 대소문자를 가리지 않는다. 하지만 설정을 사용해 스파크에서 대소문자를 구분하게 만들 수 있다.
-- SQL
set spark.sql.caseSensitive true
5.4.8 컬럼 제거하기
DataFrame 에서 컬럼을 제거하는 방법을 알아보자. select 메서드로 컬럼을 제거할 수 있지만 컬럼을 제거하는 메서드인 drop 을 사용할 수도 있다.
5.4.9 컬럼의 데이터 타입 변경하기
데이터 타입을 변환할때는 cast 메서드로 타입을 변환할 수 있다.
df_cast = df.withColumn('Invoice_int', col('InvoiceId').cast('int'))
display(df_cast)
결과
5.4.10 로우 필터링하기
로우를 필터링하려면 참과 거짓을 판별하는 표현식을 만들어야 한다. 그러면 결과가 false 인 로우를 걸러낼 수 있다. DataFrame 의 where 메서드나 filter메서드로 필터링할 수 있다.
df_cast.filter(col('Invoice_int') < 40).count()
# 출력
# 39
5.4.11 고유한 로우 얻기
DataFrame 에서 고윳값이나 중복되지 않은 값을 얻는 연산을 자주 사용한다. 이때 distinct 메서드를 사용한다.
df_cast.select('BillingCountry').distinct().count()
# 결과
# 24
5.4.12 무작위 샘플 만들기
DataFrame 에서 무작위 샘플 데이터를 얻으려면 DataFrame 의 sample메서드를 사용한다. 표본 추출 비율을 지정할 수 있으며, 복원 추출이나 비복원 추추의 사용 여부를 지정할 수도 있다.
df_cast.sample(False, 0.5, 5).count()
#결과
# 223
5.4.13 임의 분할하기
임의 분할은 원본 DataFrame을 임의 크기로 '분할' 할 때 유용하게 사용된다. 이 기능은 머신러닝 알고리즘에서 사용할 학습셋, 검증셋 그리고 테스트셋을 만들 때 주로 사용한다.
5.4.14 로우 합치기와 추가하기
DataFrame은 불변성을 가진다. 그러므로 DataFrame에 레코드를 추가하는 작업은 DataFrame을 변경하는 작업이기 때문에 불가능하다. 통합은 두 개의 DataFrame 을 단순히 결합하는 행위이다. 두 개의 DataFrame은 반드시 동일한 스키마와 컬럼 수를 가져야 한다.
from pyspark.sql import Row
schema = myDf.schema
newRows = [Row('qqqq',None , 1,2), Row('wwww',None , 3,4)]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDf = spark.createDataFrame(parallelizedRows, schema)
myDf.union(newDf)\
.where('names = 1')\
.where(col('some') == 'Hello')\
.show()
결과
+-----+----+-----+-----+
| some| col|names|count|
+-----+----+-----+-----+
|Hello|NULL| 1| 2|
+-----+----+-----+-----+
5.4.15 로우 정렬하기
sort와 orderBy 메서드를 사용해 DataFrame의 최대값 혹은 최솟값이 상단에 위치하도록 정렬할 수 있다. 두 메서드는 완전히 같은 방식으로 동작한다.
정렬 기준은 asc나 desc 함수를 사용한다.
df_cast.orderBy(expr('Invoice_int')).show()
결과
+---------+----------+-------------------+--------------------+-------------+------------+--------------+-----------------+-----+-----------+
|InvoiceId|CustomerId| InvoiceDate| BillingAddress| BillingCity|BillingState|BillingCountry|BillingPostalCode|Total|Invoice\_int|
+---------+----------+-------------------+--------------------+-------------+------------+--------------+-----------------+-----+-----------+
| 1| 2|2007-01-01 00:00:00|Theodor-Heuss-Str...| Stuttgart| NULL| Germany| 70174| 1.98| 1|
| 2| 4|2007-01-02 00:00:00| Ullevålsveien 14| Oslo| NULL| Norway| 0171| 3.96| 2|
| 3| 8|2007-01-03 00:00:00| Grétrystraat 63| Brussels| NULL| Belgium| 1000| 5.94| 3|
df_cast.orderBy(expr('Invoice_int').desc()).show()
결과
+---------+----------+-------------------+--------------------+--------------+------------+--------------+-----------------+-----+-----------+
|InvoiceId|CustomerId| InvoiceDate| BillingAddress| BillingCity|BillingState|BillingCountry|BillingPostalCode|Total|Invoice\_int|
+---------+----------+-------------------+--------------------+--------------+------------+--------------+-----------------+-----+-----------+
| 412| 58|2011-12-22 00:00:00| 12,Community Centre| Delhi| NULL| India| 110017| 1.99| 412|
| 411| 44|2011-12-14 00:00:00| Porthaninkatu 9| Helsinki| NULL| Finland| 00530|13.86| 411|
| 410| 35|2011-12-09 00:00:00|Rua dos Campeões ...| Porto| NULL| Portugal| NULL| 8.91| 410|
5.4.16 로우 수 제한하기
limit() 메서드
5.4.17 repartition과 coalesce
최적화 기법중 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 것이다. 이를 통해 파티셔닝 스키마와 파티션 수를 포함해 클러스터 전반의 물리적 데이터 구성을 제어할 수 있다.
repartition 메서드를 호출하면 무조건 전체 데이터를 셔플한다. 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 합니다.
df.rdd.getNumPartitions()
df.repartition(5)
특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배하는 것이 좋다.
df.repartition(col('partition_col'))
선택적으로 파티션 수를 지정할 수도 있다.
5.4.18 드라이버로 로우 데이터 수집하기
스파크는 드라이버에서 클러스터 상태 정보를 유지한다. 로컬 환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야 한다.
collectDF = df.limit(10)
collectDF.take(5)
collectDF.show()
collectDF.collect()
추가로 toLocalIterator 메서드역시 모든 데이터를 드라이버에 전달한다.
collectDF.toLocalIterator
메모리 이슈가 발생할 수 있다 주의하자
'Spark' 카테고리의 다른 글
스파크 완벽 가이드 8 ~ 9 (0) | 2024.05.23 |
---|---|
스파크 완벽 가이드 CHAPTER 6~7 (0) | 2024.05.07 |
스파크 완벽 가이드 CHAPTER 15~17 (0) | 2024.04.24 |
Spark Streaming (0) | 2024.04.04 |
파이썬을 활용한 스파크 프로그래밍 5장 (0) | 2024.03.15 |