파이썬을 활용한 스파크 프로그래밍 5장
==== 5장
====
스파크 응용
스파크 코어 API를 사용한 고급 프로그래밍
5장에서 다룰 내용
- 스파크의 공유변수(brodcast, accumulator) 소개
- 스파크 RDD의 분할(partition) 및 재분할(repartition)
- RDD의 저장 옵션
- 캐싱(caching), 분산 지속성 및 RDD의 체크포인트
스파크의 공유변수
스파크 API는 스파크 클러스터에서 공유변수를 생성하고 사용하는 2가지 메커니즘을 제공한다.
브로드캐스트변수, 어큐물레이터
브로드캐스트 변수
브로드캐스트 변수는 스파크 드라이버 프로그램에 의해 설정된 읽기 전용 변수로, 스파크 클러스터에서 작업자 노드가 사용할 수 있다. 이 변수는 드라이버에 의해 설정된 후에만 읽을 수 있고, 작업자의 실행자에서 실행되는 모든 작업에 사용할 수 있다. 또한 비트토런트를 기반으로 하는 피어-투-피어 공유 프로토콜을 사용한면 자겅ㅂ자 간의 효율적인 공유가 가능하다. 이는 단순히 스파크 드라이버에서 실행자 프로세스로 변수를 직접 푸싱하는 것보다 확장성이 뛰어니다.
bronadcast()
sc.broadcast(value)
- Value는 브로드캐스트 객체에서 직렬화되고 캡슐화된다.
- 만들어진 변수는 응용 프로그램에서 실행되는 모든 작업에서 사용할 수 있다.
stations = sc.broadcast({'83' : 'Mezes Park', '84' : 'Ryland Park'})
stations
브로드캐스트 변수는 로컬, 네트워크 또는 분산 파일 시스템의 파일 콘텐츠에서 만들 수 있다.
익명 함수를 제거했을때
- 본 예제에서 0,1 번을 제외한 나머지 값은 버려진다
value()
broadcast.value()
- 브로드캐스트 변수의 값 반환방법
unpersist()
Broadcast.unpersist(blockking=False)
- 브로드캐스트 객체의 unpersist() 메서드는 존재하는 클러스터의 모든 작업자 메모리에서 브로드캐스트 변수를 제거하는데 사용된다.
- 메모리를 즉시 릴리스해야 하는경우 True
브로드 캐스트 변수와 관련된 스파크 구성 옵션
구성옵션 | 설명 |
---|---|
spark.broadcast.compress | 브로드캐스트 변수를 작업자에게 전송하기 전에 압축할지 여부를 결정한다. 기본값은 True(권장). |
spark.broadcast.factory | 사용할 브로드캐스트 구현을 지정한다. 기본값은 TrrentBroadcastFactory. |
spark.broadcast.blockSize | (TorrentBroadcastFactory에서 사용하는) 브로드캐스트 변수의 각 블록 크기를 지정한다. 기본값은 4MB. |
spark.broadcast.port | 드라이버 HTTP 브로드캐스트 서버가 수신 대기할 포트를 지정한다. 기본 값은 random. |
브로드 캐스트의 장점은 무엇이고, 어떤 경우에 유용할까
RDD join() 을 사용해 데이터 조인하기
status = sc.textFile('file:///opt/spark.data/status') \
.map(lambda x: x.split(',')) \
.keyBy(lambda x: x[0])
stations = sc.textFile('file:///opt/spark/data/stations') \
.map(lambda x:x.split(',')) \
.keyBy(lambda x: x[0])
status.join(statins) \
.map(lambda x: (x[1][0][3], x[1][1][1], x[1][0][1], x[1][0][2])) \
.count()
# return 907200
- 이것은 대부분 비싼 셔플연산을 초래하므로 테이블 변수는 stations 드라이버에서 설정하는것이 좋다
드라이버 변수를 사용해 데이터 조인하기
stationsfile = '/opt/spark/data/stations.csv'
sdata = dict(map(lambda x: (x[0],[1]), \
map(lambda x: x.split(','), \
open(stationsfile))))
status = sc.textFile('file:///opt/spark.data/status') \
.map(lambda x: x.split(',')) \
.keyBy(lambda x: x[0])
stations.map(lambda x: (x[1][3], sdata[x[0]], x[1][1], x[1][2])) \
.count()
# return 907200
- 이 방법은 첫 번째 옵션보다 대부분 더 효과적이지만 확장성이 부족하다.
- 변수는 참조 함수 내 클로저(closure)의 일부로 Worker node 에서의 데이터 전송 및 복제를 비효율적이며 불필요하게 만든다.
브로드 캐스트 변수를 사용해 데이터 조인하기
stationsfile = '/opt/spark/data/stations.csv'
sdata = dict(map(lambda x: (x[0],[1]), \
map(lambda x: x.split(','), \
open(stationsfile))))
stations = sc.broadcast(sdata)
status = sc.textFile('file:///opt/spark.data/status') \
.map(lambda x: x.split(',')) \
.keyBy(lambda x: x[0])
stations.map(lambda x: (x[1][3], stations.value[x[0]], x[1][1], x[1][2])) \
.count()
# return 907200
- 브로드캐스트 변수의 사용은 런타임 동안 스파크 클러스터의 서로 다른 노드에서 실행되는 프로세스 사이에 데이터를 공유하는 효율적인 방법이다.
브로드캐스트 변수의 특성
- 브로드캐스트 변수를 사용하면 셔플연산이 필요 없다. - 효율적이고 확장 가능한 피어-투-피어 배포 매커니즘을 사용한다. - 작업당 한 번씩 복제하는 대신 작업자당 한 번씩 데이터를 복제한다. 스파크 응용 프로그램에 수천 개의 작업이 있을 수 있으므로 이것은 매우 중요하다. - 많은 작업을 여러 번 다시 할 수 있다. - 직렬화된 객체로, 효율적으로 읽힌다.
어큐뮬레이터(accumulator)
스파크의 또다른 공유변수 어큐뮬레이터는 브로드 캐스트 변수와 달리 업데이트할 수 있다.
어큐물레이터는 스파크 프로그래밍에서 여러 가지 방법으로 사용할 수 있는 카운터로 생각할 수 있다. 어큐물레이터를 사용하면 프로그램이 실행되는 동안 여러 값을 집계할 수 있다.
어큐뮬레이터는 드라이버에 의해 설정되고, 각각의 스파크 콘텍스트에서 작업을 담당하는 실행자에 의해 업데이트되다. 드라이버는 보통 어큐뮬레이터의 최종 값을 프로그램의 끝에서 다시 읽을 수 있게 한다.
accumulator()
sc.accumulator(value, accum_param=None)
- accumulator() 메소드는 특정 스파크 콘텍스트 내에 Accumulator 객체의 인스턴스를 만들고, value 인수에 의해 지정된 값으로 초기화된다. accum_param인수는 사용자 정의 어큐뮬레이터를 정한다.
value()
Accumulator.value()
- value() 메서드는 어큐뮬레이터 값을 가져온다. 이 방법은 드라이버 프로그램에서만 사용할 수 있다.
사용자 정의 어큐뮬레이터
- int 및 float을 포함한 기본적인 숫자 데이터 유형을 지원한다.
- AccumulatorParam 도우미 객체를 사용해 생성된다.
- 수행되는 연산은 결합 및 교환이 가능해야 한다.
- 백터를 리스트나 딕셔너리로 어큐뮬레이트하는 것이 일반 적이다.
- 문자열 값을 연결하는 사용자 정의 어큐뮬레이터를 만들수 있다?
- 사용자 정의 클래스를 정의해아 한다.
a. addInPlace() :b. zero() : map 유형에 빈 map을 제공하는 것처럼 각 유형에 '0값' 을 제공한다.
데이터 유형의 두 객체에 대해 연산하고 새 값을 반환하는 데 사용
from pyspark import AccumulatorParam
class VectorAccumulatorParam(AccumulatorParam) :
def zero(self, value) :
dict1={}
for i in range(0,len(value)) :
dict1[i]=0
return dirc1
def addInPlace(self, val1, val2) :
for i in val1.keys() :
val1[i] += val2[i]
return val1
rdd1 = sc.parallelize([{0: 0.3, 1: 0.8, 2: 0.4},{0: 0.2, 1: 0.4, 2: 0.2}])
vector_acc = sc.accumulator({0: 0, 1: 0, 2: 0}, VectorAccumulatorParam())
def mapping_fn(x) :
global vector_acc
vector_acc += x
# 다른 rdd 처리를 수행해야 한다.
rdd1.foreach(mapping_fn)
print vector_acc.value
# return {0: 0.5, 1: 1.200000000002, 2: 0.6000000000001}
어큐뮬레이터의 용도
- 일반적으로 처리된 레코드 수를 계산하거나 조작된 레코드 수를 추적하는 등의 용도로 사용된다.
- 집계 작업: 분산된 환경에서 데이터를 처리하고 집계해야 할 때 사용된다. 예를 들어, 특정 작업을 처리하는 동안 생성되는 값을 모두 합산하거나 계산한 후, 그 결과를 모으고 싶을 때 Accumulator가 유용하다.
- 상태 유지: 태스크 간에 공유되어야 하는 상태를 유지해야 할 때 사용된다. 예를 들어, 분산 시스템에서 반복적인 작업을 수행하면서 각 반복에서 생성된 값의 총계를 유지하거나, 최대값 또는 최소값을 찾는 등의 작업에서 Accumulator를 사용할 수 있다.
- 디버깅: 작업 중에 생성된 값을 모니터링하고 디버깅할 때 사용될 수 있다. Accumulator를 사용하여 각 태스크에서 발생한 특정 이벤트의 수를 세거나 특정 조건을 만족하는 데이터 레코드의 수를 추적할 수 있다.
- 외부 데이터 소스와의 상호 작용: 외부 데이터 소스와 상호 작용하면서 결과를 수집하고 필요한 정보를 유지할 수 있습니다. 예를 들어, 데이터베이스나 외부 파일 시스템과 상호 작용하면서 생성된 데이터의 총량을 추적하는 등의 작업에서 Accumulator를 사용할 수 있다.
요약하면, Accumulator는 분산 환경에서 데이터의 집계, 상태 유지, 디버깅 및 외부 데이터 소스와의 상호 작용과 같은 다양한 용도로 사용된다.
스파크의 데이터 파티셔닝
파티셔닝은 대부분 스파크 프로세스에 필수적이다. 효율적인 파티셔닝은 응용 프로그램성능을 수십 배 향상 시킬수 있지만, 비효율적인 파티셔닝은 프로그램을 완료하기 힘들게 만든다.
과도하게 큰 파티션은 Executor-out-of-memory
와 같은 문제가 발생할 수 있다.
파티셔닝 개요
RDD 벼환에서 생성할 파티션 수는 일반적으로 구성할 수 있다. 그러나 그에 앞서 몇가지 기본 동작을 알고 있어야 한다.
- 스파크는 HDFS 를 사용하면 블록마다 RDD 파티션을 만든다 (블록크기 128MB)
myrdd = sc.textFile("hdfs://dir/filescontaining10blocks") myrdd.getNumPartitions() #returns 10
- Bykey 연상 같은 셔플연산과 numPartitions 값이 메소드에 대한 인수로 제공되지 않는 다른 연산들은 spark.default.parallelism 구성 값과 같은 여러 파티션을 만든다.
- 스파크에서 사용하는 기본 파티션 클래스는 HashPartitioner 로, 결정적 해시 함수를 사용해 모든 키를 해시한 다음, 키 해시를 사용해 거의 동일한 버킷을 만든다. 키를 기반으로 파티션 수에 데이터를 균등하게 분산키시는 것이다.
파티션 제어
큰 데이터에 파티션 수가 너무 적으면 Worker에 메모리 부족 문제가 발생할 수 있다.
왜곡된 파티션
filter() 연산은 필터 조건을 만족하는 레코드만 사용해 일대일 기반으로 모든 입력 파티션에 대해 새 파티션을 만든다.
이로 인해 일부 파티션의 데이터가 다른 파티션보다 훨씬 적어 데이터 왜곡, 추측 실행 가능성 및 다음 단계에서 최적이 아닌 성능과 같은 나쁜 결과를 초래할 수 있다.
이러한 경우 스파크 API 에서 재분할 방법 중 하나를 사용할 수 있다.
최적의 파티션 수 결정
추가 파티션이 성능을 떨어 뜨리기 시작하는 지점을 찾을 때까지 다른 값들을 실험해야 한다.
시작은 Worker node 코어 수의 2배를사용하는것이다.
함수 재분할
RDD를 재분할 하는데 사용되는 주요 함수
partitionBy()
RDD.partitionBy(numPartitions, PartitionFunc=portable_hash)
- partotionBy() 메소드는 기본적으로 portable_hash 함수를 사용해 입력 RDD와 동일한 데이터를 포함하지만, numPartitions 인수로 지정된 파티션 수와 함께 새RDD를 반환한다.
- partotionBy() 함수는 rangePartotioner를 사용해 partitionBy()를 호출하는 sortBykey() 와 같은 함수에 의해 호출된다.
repartition()
RDD.repartition(numPartitions)
- repartition() 메서드는 입력 RDD와 동일한 데이터를 갖는 새로운 RDD를 반환하며 numpartotions 인수로 지정된 파티션 수에 정확하게 일치한다.
coalesce()
RDD.coalesce(numPartitions, shuffle=False)
- coalesce() 메서드는 numPartitions 인수로 지정된 파티션 수로 구성된 새로운 RDD를 반환한다.
- boolen shuffle 인수를 사용해 셔플을 트리커하는지 여부를 컨트롤할 수 있다.
repartitonAndSortWithinPartitions()
RDD.repartitonAndSortWithinPartitions(numPartitions=None, partitionFunc=portable_hash, ascending=True, keyfunc=<lambda function>)
- repartitonAndSortWithinPartitions() 메서드는 입력 RDD를 numPartitions 인수로 지정된 파티션 수로 다시 분할하고, partitionFunc인수로 지정된 함수에 따라 분할된다.
파티션별 또는 파티션 인식 API 메소드
스파크의 많은 메서드는 파티션과 원자 단위로 상호작용하도록 설계되었다. 여기에는 액션과 변환이 모두 포함되는데, 그중 일부 메소드에 관해 살펴본다.
foreachPartition()
RDD.foreachPartition(func)
- foreachPartition() 메서드는 foreach() 액션과 비슷한 액션으로 func 인수로 지정된 함수를 RDD의 각 파티션에 적용한다.
- 변환이 아닌 액션이므로 입력 RDD와 전체 리니지에 대한 평가를 트리거한다.
glom()
RDD.glom()
- glom() 메서드는 각 파티션 내의 모든 요소를 리스트로 병합해 만든 RDD를 반환한다.
lookup()
RDD.lookup(key)
- lookup() 메서드는 key 인수로 참조된 키에 대한 RDD의 값 리스트를 반환한다.
- 파티션으로 분할된 RDD에 대해 사용하는 경우 파티셔너를 사용해 키가 있는 파티션으로만 검색 변위를 좁힌다.
mapPartitions()
RDD.mapPartitions(func, preservesPartitioning=False)
- mapPartitions() 메서드는 이 RDD의 각 파티션에 함수를 적용해 새로운 RDD를 반환한다.
- 가장 큰 장점은 참조된 함수가 요소당 한번이 아닌 파티션당 한 번 호출된다는 것이다.
- 스파크의 많은 변환은 내부적으로 mapPartitions() 함수를 사용한다.
RDD 저장 옵션
RDD의 리니지 개념을 살펴보자
RDD 리니지 재검토
- 스파크는 프로그램의 실행을 DAG로 계획한다.
- 족속성이 있는 분리된 연산 집합니다. ( 번역기 개같은것인가 한국말이 어려운 것인가 )
- 액션이 호출되면 전체 리니지가 다시 재평가 된다.
- 재평가는 문제가 되지 않지만, 비효율적일 수 있고, 오류 발생 시 복구 시간에 영향을 미칠 수 있다.
RDD 저장 옵션
RDD는 스파크 YARN, 독립실행형 또는 메소스 클러스터의 다양한 Worker 노드에 있는 파티션에 저장된다.
RDD 스토리지 레벨
스토리지 레벨 | 설명 |
---|---|
MEMORY_ONLY | RDD 파티션은 메모리에만 저장된다. 이것이 기본값이다. |
MEMORY_AND_DISK | 메모리에 맞지 않는 RDD 파티션은 디스크에 저장된다. |
MEMORY_ONLY_SER* | RDD 파티션은 직렬화도니 객체로 메모리에 저장된다. 직렬화된 객체가 비직렬화된 객체보다 적은 공간을 사용할 수 있으므로 메모리를 절약하려면 이 옵션을 사용한다. |
MEMORY_AND_DISK_SER* | RDD 파티션은 직렬화도니 객체로 메모리에 저장된다. 메모리에 들어가지 않는 객체는 디스크로 유출된다. |
DISK_ONLY | RDD 파티션은 디스크에만 저장된다. |
OFF_HEAP | RDD 파티션은 직렬화된 객체로 메모리에 저장된다. 이를 위해서는 오프 힙 메모리가 활성화돼 있어야 한다. 이 저장 옵션은 시험용으로만 사용된다. |