==== 4장
====
복원 분산데이터 집합 ( RDD )
- RDD 는 Spark 에서 사용되는 가장 기본적인 데이터 객체이다.
- 외부 데이터로 RDD를 로드해 연산을 수행하고 새로운 RDD 를 생성한다.
- 분산된 데이터(객체) 모음
- Pyspark 의 RDD는 리스트, 튜플, 딕셔너리 같은 분산 파이썬 객체로 구성된다.
- 복합 유형을 포함한 모든 유형이 될 수 있다.
- 가능한 메모리에 저장되도록 해야한다.
- 사용하는 언어에 따라 각각 그 프로그래밍 언어 객체 모음으로 구성된다
- 연속적이고 반복적인 작업을 수행할때 효율적인 재사용을 가능하게 한다.
RDD 특성
- 복원 (Resilient) : Spark 가 작업을 수행중 노드가 소실될 경우, 데이터 집합을 재구성할 수 있다.
- 분산 (Distributed) : 파티션을 나누고 클러스터 노드를 통해 분산한다.
- 데이터 집합 (DataSet) : 레코드로 구성된 데이터 집합이다.
- 불변성 : RDD가 인스턴스화 되고 데이터로 채워진 후에는 업데이트 할수 없다. 단 변환 연산을 통해 신규생성할 수 있다.
액션
- 출력을 만들거나, 파일 시스템에 저장하거나, 레코드 수를 반환하는것 등
Pyspark 프로그램 샘플
# 로컬 파일 시스템에서 로그 파일 로드
logfilesrdd = sc.textFile("file:///var/log/hadoop/hdfs/hadoop-hdfs-*")
# 오류에 대해서만 로그 레코드 필터링
onlyerrorsrdd = logfilesrdd.filter(lambda line: "ERROR" in line)
# onlyerrorsrdd를 파일로 저장
onlyerrorsrdd.saveAsTextFile("file:///tmp/onlyerrorsrdd")
RDD에 데이터 로드하기
RDD는 데이터가 채워진 뒤에 효울적으로 생성된다. 이것은 스파크 프로그램의 일부로 기존의 RDD가 새로운 RDD에 쓰여지는 변환 (transformation) 작업의 결과일 수 있다.
스파크 루딘을 시작하려면 먼저 외부 소스의 데이터로 최소 하나의 RDD를 초기화해야한다. 이 초기 RDD는 일련의 변형 및 액션을 통해 중간 RDD 또는 최종 RDD를 만드는데 사용된다. 초기 RDD는 다음과 같은 여러 가지 방법으로 생성될 수 있다.
- 하나 이상의 파일에서 데이터 로드하기
- SQL 또는 NoSQL 데이터 스토어와 같은 데이터 소스에서 데이터 로드하기
- 프로그래밍 방식으로 데이터 로드하기
- 스트림에서 데이터 로드하기
하나 이상의 파일에서 RDD 생성하기
스파크는 하나 이상의 파일이나 디렉토리의 콘텐츠에서 RDD를 만드는 API 메소드를 제공한다. 파일은 구조화되지 않은 텍스트 파일에서부터 JSON 파일과 같이 반구조화된 파일, CSV 파일과 같이 구조화된 데이터 소스에 이르기까지 다양한 형식을 가질수 있다. 또한 스파크는 시퀀스 파일과 프로토콜 버퍼와 같이 일반적으로 직렬화된 이진 인코딩 형식과 파케이 및 ORC 같은 컬럼 방식의 파일 형식을 지원한다.
스파크 파일 압축
스파크는 기본적으로 여러 가지 무손실 압축 형식을 지원한다. BZIP2로 압축된 아카이브 뿐 아니라 GZIP 및 ZIP을 포함한 일반적인 압축 파일 형식도 완벽하게 읽을 수 있다.
또한 압축 파일의 읽기 및 쓰기를 가능하게 하는 원시 코텍을 제공한다. 빌트인 코텍은 LZ4, LZF및 LZ77 기반의 무손실 압축 형식과 스내피를 포함한다. 스내피는 구글에서 자체 개발한 빠르고 분할 가능하며 CPU 자원을 덜 소모하는 데이터 압축 및 압축 해제 라이브러리로 하둡의 코어 및 에코 시스템 프로젝트에서 일반적으로 사용된다. 또한 기본적으로 작업자들 간에 네크워크를 통해 교환되는 RDD 파티션의 데이터와 같은 스파크 내부의 데이터를 압축하는 데에도 사용된다.
*분할 가능한 압축 형식 VS 분할 불가능한 압축 형식 *
스파크 또는 하둡과 같은 처리 플랫폼을 사용할 때, 압축 형식이 분할 가능한지 분할 불가능한지를 구별하는 것은 매우 중요하다.
분할 가능한 압축 형식은 일반적으로 아카이브 무결성을 손상시키지 않으면서 블록 경계에서 분할될 수 있도록 인덱싱된다. 반면, 분할 불가능한 형식은 인덱싱되지 않으며, 분할할 수 없다. 즉, 분할할 수 없는 아카이브는 분산될 수 없으므로 하나의 시스템에서 전체를 읽을 수 있어야 한다.
ZIP이나 GZIP 같은 일반적인 데스크톱 압축 형식은 높은 압축률을 얻을 수 있지만 분할할 수 없다. 탐색 가능한 데이터가 포함된 작은 파일에는 괜찮지만 큰 데이터세트의 경우 스내피나 LZO와 같이 분할 가능 한 압축 형식이 적합하다. 경우에 따라서는 HDFS와 같은 분산 시스템으로 파일을 처리하기 전에 압축을 해제하는 것이 좋다.
RDD를 사용한 데이터 지역성
스파크는 기본적으로 가까운 노드에서 RDD로 데이터를 읽으려고 한다. 스파크는 보통 변환 적업을 최적화하기 위해 HDFS나 S3의 데이터와 같이 분산된 파티션의 데이터에 액세스하므로 분산 파일 시스템에서는 기본 불록을 보유할 파티션을 만든다. HDFS와 같은 분산 파일 시스템에 있는 파일의 불록을 사용해 데이터와 함께 배치된 Worker에 RDD 파티션을 만드는 방법을 보여 준다.
로컬 파일 시스템에서 RDD 로딩하기
- 분산 파일 시스템을 사용하지 않는경우 오류가 발생할수있다.
- HDFS , S3와 같은 분산 파일 시스템을 사용하는 것이 좋다.
하나 이상의 텍스트 파일에서 RDD 만드는 방법
파일시스템 | URL구조 |
---|---|
로컬 파일 시스템 | file:///path |
HDFS | hdfs://hdfs_path |
아마존 S3 | s3://buket/path ()also used are s3a and s3n |
오픈스택 스위프트 (OpenStack Swift) | swift://container.PROVIDER/path |
* 파일 시스템 구성 매개변수를 설정해야 한다. |
textFile()
sc.textFile(name, minPartition=None, use_unicode=True)
- textFile() 메소드는 파일, 디렉토리 또는 글로브(glob) 패턴에서 RDD를 만드는 데 사용된다.
- name 인수는 파일 시스템 스키마를 포함 참조될 경로를 지정한다.
- minpartition 인수는 파티션수를 결정한다.
- use_unicode 인수는 유니 코드 또는 UTF-8을 문자 인코딩 스키마로 사용할지 여부를 지정한다.
Saprk to HDFS
Spark 에서 HDFS 파일을 읽으려면 Worker에 HADOOP_CONF_DIR 환경변수를 설정해야한다.
이정보는 Spark 의 conf/spark-env.sh 스크립트를 사용해 설정할 수 있다.
export HADOOP_CONF_DIR=/etc/hadoop/conf
HDFS to textFile() Example
# 전체 디렉토리 내용 로드
logs = sc.textFile("hdfs:///demo/data/Website/Website-Logs")
# 개별 파일 로드
logs = sc.textFile("hdfs:///demo/data/Website/Website-Logs/fileName.txt")
# 글로브 패턴을 사용해 파일 로드
logs = sc.textFile("hdfs:///demo/data/Website/Website-Logs/*_filename.txt")
wholeTextFiles()
sc.wholeTextFiles(path, minPartition=None, use_unicode=True)
- wholeTextFiles() 메소드를 사용하면 여러 파일이 포함된 디렉토리를 읽을 수 있다.
- key = 파일의 이름
- value = 파일 내용
HDFS to wholeTextFiles() Example
# 전체 디렉토리의 내용을 키/값 쌍으로 로드
logs = sc.wholeTextFiles("hdfs:///demo/data/Website/Website-Logs/")
오프젝트 파일에서 RDD 만들기
오브젝트 파일 :
- 사람이 읽을수 없으며, 직렬화된 데이터 구조, 효율적 데이터 엑세스
- RDD에서 sequenceFile() 메소드를 사용해 직렬화된 시퀀스 파일을 만들수 있다고한다.
데이터 소스에서 RDD 만들기
- 외부 DB 로 생성된 RDD는 여러 Worker 의 여러 Partition 으로 분산을 시도한다.
- 초기단계에서 병렬 처리를 극대화한다.
관계형 데이터베이스의 테이블에서 RDD 로드하기
![DB_to_RDD]
- 관계형 데이터베이스 RDB 에서 RDD를 생성하는 기본 방법은 SparkSession 객체의 read 함수
- DataFrame 으로 반환
- DataFrame 은 SchemaRDD 라고불렸다
- read.jdbc() 함수를 사용할때 너무 많은 파티션을 생성하면 DDoS 로 잘못 해석될 수 있다.
JSON 파일에서 RDD 만들기
JSON 은 read.json() 메소드를 사용해 액세스한다.
read.json() Example
spark.read.json(path, schema=None)
- 이렇게 하면 데이터프레임인데...?
프로그래밍 방식으로 RDD 생성하기
프로그램의 데이터를 사용해 프로그래밍 방식으로 RDD를 만들 수 있다.
Parallelize()
sc.parallelize(c, numSlices=None)
- c 는 이미 생성된 collection (배열, 리스트)
- numSlices 는 파티션 수를 지정
- Parallelize 는 SparkContext 객체에 속한 메서드 중 하나, 배열 또는 리스트를 RDD로 변환하는데 사용된다.
- collection 인자 ...메모리에 있단 소린데 이걸 다시 분산처리 하려고 partition을 나눠야 하나..?
range()
sc.range(start, end=None, step=1, numSlices=None)
range() 메소드는 사용자를 위한 목록을 생성하고, RDD를 만들고 배포한다.
- start, end, step 은 값의 순서를 정의
- numSlices는 원하는 분할 수를 지정
# 0에서 시작하는 1000개의 정수로, 2개의 파티션에서 1씩 증가하는 RDD 생성
range_rdd = sc.range(0, 1000, 1, 2)
# rage는 파이썬 함수로 PythonRDD 유형이다.
range_rdd
# numSlices = 2를 요청하면, 2 를 반환한다.
range_rdd.getNumPartitions()
# 시작 인수가 아니므로 0을 반환한다.
range_rdd.min()
# 0에서 시작해서 1의 1000 증분이므로 999를 반환한다.
range_rdd.max()
# [0, 1, 2, 3, 4]를 반환한다.
range_rdd.take(5)
RDD 연산
- RDD 및 주요 RDD 개념에 대해 수행할 수 있는 연산 작업 유형을 살펴보자.
주요 RDD 개념
Spark 의 변환(transformations) 은 RDD에서 작동하고 새로운 RDD를 반환하는 함수인 반면, 액션(actions) 은 RDD에 대한 연산 작업 후 값을 반환하거나 출력 연산을 수행하는 함수다.
거친 변환 vs 세분화된 변환
- 거친변환 : RDD 처럼 모든 요소에 대해 함수를 적용하고, 새 데이터 집합을 반환하는것
- 세분화된 변환 : RDB의 단일행 변환, NoSQL put 연산 같이 단일 레코드나 데이터 셀을 변환
변환, 액션 및 지연 평가
- 변환(transformation) 은 RDD 연산으로 새로운 RDD 생성
- 일반적으로 map, filter
Example code
originalrdd = sc.parallelize([0, 1, 2, 3, 4, ,5 ,6 ,7 ,8])
newrdd = originalrdd.filter(lambda x: x % 2)
# result
newrdd.collect() # return [1, 3, 5, 7]
RDD 지속성 및 재사용
- RDD는 주로 Executor의 메모리에 만들어지는 일시적인 객체다.
- 하나 이상의 Action에 RDD가 필요한 경우 (RDD유지) persist() 를 사용할수 있다.
persist()
- persist() 메소드를 사용해 RDD를 유지시키면, 첫 번째 액션후 노드에 RDD가 남아있다.
- cache() 메소드가 비슷한 기능을 한다.
- Spark UI > storage 탭에서 확인 가능
RDD 리니지 (lineage)
- 데이터간의 관계, 종속성을 추적한다.
- join 을 하면 멀티 종속성을 가지게된다.
- ! RDD 광범위 연상에 의한 오버헤드
groupBy, reduceByKey, join, sortByKey 등 연산은 데이터 재분배나 셔플링이 필요하기 때문에 성능에 문제가 발생할 수 있다. 이러한 비용과 지연 시간을 고려하라는 뜻인듯
RDD의 내결함성
- RDD 리니지는 부모와 조상 RDD를 모두 포함한다.
- 노드 장애 등으로 발생한 오류 당시의 상태로 재구성할 수 있다.
- Spark 에서는 비결정 함수(random)를 사용하지 마라
RDD 유형
- 너무 많다 몰라도 될거같다.
기본 RDD 변환
map()
RDD.map(<function>, preservesPartitioning=False)
- map() 은 for랑 같은거 아닌가... Python map 이 아닌 RDD map()이 따로있는건지 알아보자
- map > for 루프
- flatMap > 2중 for 루프
- 예제가 이상하다... 공백기준 split() 인거 같은데 공백이 없다...
- Python의 map() :
- Python의 내장 함수 중 하나로, 주어진 함수를 시퀀스(리스트, 튜플 등)의 각 요소에 적용하여 새로운 시퀀스를 생성한다.
- map() 함수는 iterator를 반환하며, 필요한 경우 list() 또는 tuple() 등으로 변환하여 결과를 얻을 수 있다.
- 예를 들어, map() 함수를 사용하여 리스트의 각 요소를 제곱할 수 있다.
Python
input_list = [1, 2, 3, 4, 5]
output_list = list(map(lambda x: x * x, input_list))
print(output_list)
# 출력: [1, 4, 9, 16, 25]
- Spark RDD의 map() :
- Apache Spark의 Resilient Distributed Dataset(RDD)에서 사용되는 메서드.
- map() 메서드는 RDD의 각 요소에 주어진 함수를 적용하여 새로운 RDD를 생성한다.
- Spark에서는 데이터를 분산하여 처리하기 때문에, RDD의 map()은 병렬 처리를 수행한다.
- 예를 들어, RDD의 각 숫자를 제곱할 때 map() 메서드를 사용.
RDD
from pyspark import SparkContext
sc = SparkContext("local", "map_example")
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared_rdd = rdd.map(lambda x: x * x)
print(squared_rdd.collect())
# 출력: [1, 4, 9, 16, 25]
flatMap()
- Python의 리스트 컴프리헨션 :
- Python에는 내장된 flatMap() 함수가 없다. 일반적으로 파이썬에서는 리스트 컴프리헨션이나 제너레이터 표현식을 사용하여 리스트를 평탄화한다.
Python
input_list = [[1, 2], [3, 4], [5, 6]]
# 리스트 컴프리헨션을 사용하여 평탄화
output_list = [item for sublist in input_list for item in sublist]
print(output_list)
# 출력: [1, 2, 3, 4, 5, 6]
- Apache Spark의 RDD의 flatMap() :
- flatMap() 메서드는 RDD의 각 요소에 대해 주어진 함수를 적용하고, 그 결과를 모든 요소를 하나의 RDD로 평탄화하여 반환한다.
- 각 요소에 대해 여러 개의 요소를 생성하고 싶을 때 유용하다.
RDD
from pyspark import SparkContext
sc = SparkContext("local", "flatMap_example")
rdd = sc.parallelize(["Hello world", "Apache Spark", "Big Data"])
flat_rdd = rdd.flatMap(lambda x: x.split())
print(flat_rdd.collect())
# 출력: ['Hello', 'world', 'Apache', 'Spark', 'Big', 'Data']
distinct()
RDD.distinct(numPartitions=None)
- 중복제거
- 여기서 중복은 모든 요소나 필드가 동일한것
groupBy()
RDD.groupBy(<function>, numPartition=None)
- 지정 함수로 지목된 그룹화된 항목의 RDD를 반환한다.
- groupBy() 변환은 많은 데이터가 셔플링된다. 경우에 따라 유용한 함수지만 이점을 고려해야 한다.
sortBy()
RDD.sortBy(<keyfunc>, ascending=True, numPartitions=None)
- key를 인수로 지정하고 RDD로 정렬한다.
기본 RDD 액션
스파크의 액션(action) 은 count() 함수처럼 값을 반환 하거나 saveAsTextFile() 처럼 외부에서 데이터를 저장한다.
반면 foreach() 는 RDD의 각 요소에 맞는 함수를 수행하는 액션이다.
count()
RDD.count()
- count 는 인자를 취하지 않고 long 값을 반환
collect()
RDD.collect()
- RDD의 모든 요소를 포함하는 목록을 Spark Driver 에 반환한다.
- 메모리 부족 오류를 일으킬수 있다.
take()
RDD.take(n)
- RDD의 첫 번째 n개 요소를 반환한다.
- 비결정적이고 특별한 순서가 없어 재시행시 다를 수 있다.
top()
RDD.top(n, key=None)
- RDD 에서 상위 n개 요소를 반환한다.
- take 와 달리 요소가 정렬돼 내림차순으로 반환된다.
first()
RDD.first()
- RDD의 첫 번째 요소를 반환한다.
- 요소의 순서를 고려하지 않은 비결정적 연산을 한다.
reduce()
RDD.reduce(<function>)
- and/or 결합연산자를 지정해 RDD의 요소를 줄이는 데 사용한다.
fold()
RDD.fold(zeroVlaue, <function>)
- RDD의 각 파티션 요소를 합산한 다음 합계연산을 수행한다.
- 결과적으로 reduce 와 비슷...?
- 초기값과 최종값이 필요하다.
- 로컬모드에서 실행이 안된다.... 뭔가 Spark 설정이 잘못된거같다
foreach()
RDD.foreach(<function>)
- 지정된 함수를 RDD의 모든 요소에 적용한다.
PariRDD의 변환
- PairRDD는 키/값 쌍 RDD. Spark 프로그래밍에 필수적이다.
- 딕셔너리 함수
- 기능적 변환
- 그룹화, 합계 및 정렬 연산
- Join 함수
keyBy()
RDD.keyBy()
- RDD에 있는 요소의 키와 값으로 구성된 튜플을 만든다.
mapVlaues()
RDD.mapValues(<function>)
- 키를 변경하지 않고 함수를 통해 키/값 쌍 RDD의 각 값을 전달한다.
- 원래 RDD 파티션은 영향을 받지 않는다.
- 잘모르것음
flatMapValues()
RDD.flatMapValues(<function>)
- 키를 변경하지 않고 함수를 통해 키/값 쌍 RDD의 각 값을 전달하고 병합된 목록을 생성한다.
- reduceByKey() 또는 foldByKey()를 사용하는것을 권장
reduceByKey()
RDD.reduceByKey(<function>, numPartitions=None, PartitionFunc=<hash_fn>)
- 결합 함수를 사용해 키 값을 병합한다.
foldByKey()
RDD.foldByKey(zeroVlaue, <function>, numPartitions=None, PartitionFunc=<hash_fn>)
- 미리 정의된 키/값 요소와 함께 작동하는 변환이다.
sortByKey()
RDD.sortByKey(ascending=True, numPartotions=None, keyfunc=<function>)
맵리듀스 및 워드 카운트 연습
version 1
from pyspark.sql.functions import col,explode, split, lower
wordcount = spark.read.text("dbfs:/FileStore/ingyu/shakespeare.txt")
# 분할 소문자 변환
words_df = wordcount.select(explode(split(lower(wordcount.value), "\W+")).alias("word"))
# 빈 문자 공백 제거
words_df = words_df.filter(words_df.word != "")
# word count 계산
word_count_df = words_df.groupBy("word").count().orderBy("count", ascending=False)
# 출력
# display(word_count_df)
top5 = word_count_df.take(5)
print(top5)
version 2
import re
# from pyspark import SparkConf, SparkContext
import findspark # 이게 뭔진 모르겠지만 노드를 찾을수 없다는 문제를 해결해줌
from pyspark.sql import SparkSession, Row
findspark.init()
spark = SparkSession.builder \
.appName("RDD to DataFrame") \
.getOrCreate()
# conf = SparkConf().setAppName('Word Count')
# sc = SparkContext(conf=conf)
wordcount = spark.sparkContext.textFile("C:\\Users\\pin\\Downloads\\shakespeare.txt") \
.filter(lambda line: len(line) > 0) \
.flatMap(lambda line: re.split('\W+', line)) \
.filter(lambda word: len(word) > 0) \
.map(lambda word: (word.lower(),1)) \
.reduceByKey(lambda v1, v2: v1 + v2) \
.map(lambda x: (x[1],x[0])) \
.sortByKey(ascending=False) \
.persist()
# RDD의 각 요소를 Row 객체로 매핑
row_rdd = wordcount.map(lambda x: Row(word=x[1], count=x[0]))
# 스키마 정의
schema = "word STRING, count INT"
# Row RDD를 DataFrame으로 변환
df = spark.createDataFrame(row_rdd, schema)
df.show()
top5 = wordcount.take(5)
print(top5)
'Spark' 카테고리의 다른 글
Spark Streaming (0) | 2024.04.04 |
---|---|
파이썬을 활용한 스파크 프로그래밍 5장 (0) | 2024.03.15 |
파이썬을 활용한 스파크 프로그래밍 2,3장 (2) | 2024.03.07 |
파이썬을 활용한 스파크 프로그래밍 1장 (review) (0) | 2024.02.19 |
파이썬을 활용한 스파크 프로그래밍 1장 (0) | 2024.02.18 |