==== 6장
====
스파크로 SQL 및 NoSQL 프로그래밍하기
6장에서 다루는 내용
- 하이브 및 스파크 SQL 소개
- 스파크세션(SparkSessionn) 객체 및 데이터프레임(DataFrame) API소개
- 스파크 데이터프레임 생성 및 액세스
- 외부 응용 프로그램과 함께 스파크 SQL 사용
- NoSQL 개념 및 시스템 소개
- Hbase, 카산드라(Cassandra) 및 DynamoDB에서 스파크 사용
무어(Moore)의 법칙과 모바일 유비쿼터스 컴퓨팅의 탄생과 폭발은 데이터, 컴퓨팅 및 데이터베이스 환경을 영구적으로 변화시켰다. 6장에서는 잘 알려진 의미 체계를 사용하는 SQL 응용 프로그램에서 스파크를 사용하는 방법과 SQL 방식이 실용적이지 않은 NoSQL응용 프로그램에서 스파크를 사용하는 방법에 관해 중점적으로 설명한다.
스파크 SQL 소개
- SQL( Structured Query Language ) 는 데이터에 관한 질문을 정의하고 표현하는 언어.
- 운영 데이터 대부분은 관계형 데이터베이스 시스템에 테이블 형식으로 저장된다.
하이브 소개
하이브와 하이브 메타스토어는 스파크 SQL과 같은 프로젝트에 필수적인 구성 요소다.
- 2010년에 페이스북에서 하둡 맵리듀스 상단에 높은 수준의 SQL 같은 추상화를 제공하기위해 시작했다.
- 하이브는 하이브 쿼리언어 ( HiveQL, Hive Query Language ) 라는 새로운 언어를 도입했다.
- 당시에 자바 맵리듀스 기술을 보유한 분석가는 거의 없었으나, 대부분 SQL이 능숙해 하이브가 만들어진 동기가됨.
<_Hive가 Hadoop 에서 데이터를 처리하는 방법_>
[Hive_to_Hadoop]
하이브 객체와 하이브 메타스토어
- 하이브는 HDFS에서 객체에 대한 표 추상화를 구현한다.
- 관계형 DB 와 마찬가지로 지정된 데이터 유형의 미리 정의된 열이 있다.
- HDFS의 데이터는 일반 데이터베이스 관리 시스템처럼 SQL DML 문을 통해 액세스할 수 있다.
- 하이브는 HDFS 르 기반으로 하며 Schema-on-read 플랫폼과 비슷하다.
기존 관계형 데이터베이스 플랫폼과 차이
- Upadte는 실제로 지원되지 않는다. HiveQL의 일부에서 UPDATE를 소개하지만, HDFS는 여전히 불별의 파일 시스템이므로 이 추상화는 대단위(coares-grained) 변환을 적용하는 것과 관련이 있다. 반면 RDBMS에서의 UPDATE는 세분화된 연산이다.
- 트랜잭션(transaction) , 저널링(journaling), 롤백(rollback) 또는 실제 트랙잭션 격리 수준은 없다.
- 선언적 참조 무결성(DRI, Declarative Referential Integrity) 은 없는데, 이는 기본 키 또는 외부 키에 관한 정의가 없음을 뜻한다.
- 잘못 입력된 데이터나 잘못된 레코드와 같이 잘못 포맷된 데이터는 null값으로 클라이언트에 표시된다.
- 하이브 메타스토어는 테이블을 HDFS 디렉토리 위치에 매핑하고, 열과 해당 정의를 유지 관리한다. (개같은 번역 테이블이 표라고 번역되있다)
- 메타스토어는 MySQL이나 PostgreSQL 같은 로컬 또는 원격 데이터베이스나 임베디드 더비 데이터베이스일 수 있다. (RDB를 메타스토어로 쓴다는 뜻인가보다)
- 하이브에서 생성된 객체를 아파치 Pig와 같은 다른 프로젝트로 확장하기 위한 HCatalog 라는 하위 프로젝트도 있다.
- Spark SQL 은 하이브 메타스토어를 활용한다.
하이브 엑세스하기
- 하이브는 HiveQL 입력을받는 CLI를 제공한다.
- 대규모 구현의 경우 클라이언트/서버 접근법이 더 적합하다.
- HiveServer2 는 여러 클라이언트에 대한 다중 세션을 지원한다.
- 시각화 도구등에서 사용할 수 있는 JDBC 인터페이스와 Beeline 이라는 경량의 CLI 를 제공한다.
하이브 데이터 유형 및 데이터 정의 언어
- 하이브는 기본 RDB 데이터 유형과 여러 복잡한 데이터 유형을 모두 지원한다.
- <Spark SQL의 기본 유형으로 사용되는 유형>
하이브 내부 테이블과 외부 테이블
- 하이브에 테이블을 만들 때 기본 옵션은 내부 테이블을 만드는것이다.
- EXTERNAL을 지정해 사용하면 외부 테이블이 생성되고 DROP 연산에서 디렉토리와 파일을 삭제하지 않는다.
스파크 SQL 아키텍처
- Saprk SQL 은 RDD 기반 실행 모델에 HiveQL 호환 SQL 추상화 기능을 제공한다.
- Saprk SQL 은 일반적인 관계형 접근 패턴을 최적화하도록 설계된 코어 API에 대한 몇가지 주요 확장을 포함한다.
- 부분 DAG 실행 (PDE, Partial DAG Execution) :
처리 중에 데이터에 관한 정보가 검색되므로 즉시 DAG를 변경하고 최적화할 수 있다. - 파티션 통계 :
파티션 내의 데이터에 대한 통계를 유지하고, 맵 정리 기능을 제공하여, 조인 연산을 최적화한다. - 데이터프레임 API :
6장에서 설명| - 칼럼형 스토리지 :
스파크 SQL은 행 대신 열로 데이터를 구성하는 칼럼형 스토리지를 사용해 객체를 메모리에 저장한다.
- 부분 DAG 실행 (PDE, Partial DAG Execution) :
<열중심스토리지>
- Spark SQL은 관계형 액세스에 최적화된 칼럼 형식의 파일 기반 스토리지 형식인 Parquet 형식의 파일을 기본적으로 지원한다.
- Spark SQL 은 HiveQL 의 하위 집합으로 많은 HiveQL 기본 제공 함수와 UDF을 지원한다.
- Spark SQL 은 하이브 또는 하이브 메타스토어가 없어도 사용할 수 있다.
<SparkSQL_고수준아키텍처>
스파크세션 엔트리 포인트
SparkSession 객체는 스파크 SQL 응용 프로그램의 주요 엔트리 포인트다.
- Spark2.0 이전의 SQLContext 및 HiveContext라는 특수 콘텍스트를 하나의 간련한 엔트리 포인트로 캡슐화했다.
- Spark Session 엔트리 포인트에는 객체(테이블) 정의를 저장하는 메타스토어에 대한 참조가 포함돼 있다. 하이브가 사용 가능하고 구성돼 있으면 해당 메타스토어를 사용하고, 아니면 자체 로컬 메타스토어를 사용한다.
- Hive를 Spark 와 함께 사용하도록 구성하려면 $ SPARK_HOME의 conf/ 디렉토리에 있는 hive-site.xml, core-site.xml, hdfs-site.xml 파일을 배치하면 된다.
<Hive 자원으로 SparkSession 객체 생성>
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("My Spark SQL Application") \
.enableHiveSupport() \
.getOrCreate()
- SparkSession 객체는 DataFrame API 를 노출하고 SQL 문과 연산자를 사용해 테이블 객체를 만들고 쿼리할 수 있도록 한다.
<Spark SQL을 사용한 하이브 쿼리>
sql_cmd = """ SELECT name, lat , long FROM stations WHERE landmark = 'San Jose' """
spark.sql(sql_cmd).show()
데이터 프레임 시작하기
Saprk SQL DataFrame은 모두 동일하게 정의된 스키마를 사용해 관계형 데이터베이스의 공유 테이블과 개념적으로 유사한 레코드 분산 모음이다.
- DataFrame 은 Saprk RDD의 추상화지만, 초기 RDD와 다르게 Schema 를 추적하고 일반적인 SQL함수와 관계 연산자를 지원한다.
데이터 프레임의 생성
- 기존 RDD
- JSON 파일
- 텍스트 파일, Parquet 파일 또는 ORC파일
- Hive에 있는 테이블
- 외부 데이터베이스의 테이블
- Spark 임시 테이블
기존 RDD에서 데이터프레임 만들기
RDD에서 DataFrame 을 만드는 함수는 createDataFrame() 메소드다.
createDataFrame()
SparkSession.createDataFrame(data, schema=None, samplingRatio=None)
- data 인자는 튜플이나 리스트 요소로 구성된 RDD
- schema 인자는 데이터프레임에 사용할 스키마
- samplingRatio 인자는 유추된 스키마의 데이터를 샘플링하기 위한 인자다
Exampl code
myrdd = sc.parallelize([('Jeff', '48'), ('Kellie', '45')])
spark.createDataFrame(myrdd).collect()
# return :
# [Row(_1=u'Jeff', _2=48), Row(_1=u'Kellie', _2=45)]
하이브 테이블에서 데이터프레임 만들기
하이브 테이블에서 Spark SQL DataFrame으로 데이터를 로드하려면 HiveContext를 만들어야 한다.
- HiveContext는 하이브 클라이언트 구성 (hive-site.xml)을 읽어 하이브 메타스토어의 연결 정보를 가져온다.
sql()
SparkSession.sql(sqlQuery)
- sql() 메소드는 sqlQuery인수를 제공하고 하이브의 테이블에서 DML연산을 수행해 데이터프레임 객체를 만든다.
- DML 연산을 수행해 데이터프레임 객체를 만든다.
<하이브 테이블에서 데이터프레임 만들기>
sql_cmd = """ SELECT name, lat, long
FROM stations
WHERE landmark = 'San Jose' """
df = spark.sql(sql_cmd)
df.count()
# return : 16
table
SparkSession.table(tableName)
- table() 메서드는 Hive의 테이블에서 DataFrame 객체를 만든다.
- sql() 메서드 처럼 열을 열 리스트로 제거하거나 WHERE 절을 사용해 행을 필터링할 수는 없지만 전체 테이블을 데이터프레임으로 로드할 수 있다.
- table() 메서드는 데이터베이스 카달로그를 조사하는데 유용한 메서드가 많다.
- tables() : 현재 Spark 세션에 등록된 모든 테이블의 정보를 포함하는 DataFrame을 반환
- tableNames() : 현재 Spark 세션 또는 지정된 데이터베이스에 존재하는 모든 테이블의 이름을 문자열 리스트로 반환
df = spark.table('station')
df.columns
df.count()
# return : 70
JSON 객체에서 데이터프레임 만들기
JSON 은 일반적이고, 표준화된, 사람이 읽을 수 있는 직렬화 혹은 와이어 전송 형식으로 웹 서비스 응답에 자주 사용된다. JSON은 Schema 가 있는 반구조화 소스로 JSON에 대한 지원은 Spark SQL에 포함돼 있다.
read.json()
DataFrameRreader.read.json(
path,
schema=None,
primitivesAsString=None,
allowComments=None,
allowUnquotedFieldNames=Name,
allowStringQuotes=None,
allowNumericLeadingZero=None,
allowBackslashEscapingAnyCharacter=None,
mode=None,
columnNameOfCorruptRecord=None,
dateFormat=None,
timestampFormat=None,
mulitLine=None)
- DataFrameReader의 json() 메서드는 JSON 파일에서 데이터프래임 객체를 만든다.
- path 인수는 JSON 파일의 전체 정규 경로를 나타낸다.
- schema 인수는 데이터프레임에 대한 대상 스키마를 명시적으로 정의할 수 있다.
<JSON 파일에서 데이터프레임을 만드는 read.json() 메서드>
people_json_file = 'path.../people.json'
people_df = spark.read.json(people_json_file)
people_df.show()
쓸일이 있을까 싶지만 JSON 형태의 RDD를 데이터프레임으로 만들수도있다.
json_df = spark.read.json(rdd)
플랫 파일에서 데이터프레임 만들기
DataFrameReader는 SQL 같은 데이터소스 뿐아니라 CSV 와 같은 다른 유형의 파일에서 데이터프레임을 로드하는 데 사용한다.
text()
DataFrameReader.read.text(path)
- DataFrameReader의 text()메서드는 외부 파일 시스템의 텍스트 파일에서 데이터프레임을 로드하는데 사용된다.
- RDD의 sc.textFile()과 비슷하다.
- path 인수는 파일, 디렉토리 또는 파일 glob의 경로를 나타낸다.
<하나 이상의 플래인 텍스트 파일에서 데이터프레임 만들기>
df = spark.read.text('file:///opt/spark/data/stations.csv')
df.take(1)
df = spark.read.text('file:///opt/spark/data/stations/')
df.count()
parpuet()
DataFrameReader.read.parquet(paths)
- Parquet 파일은 기본적으로 각 열의 데이터 타입을 가지고 있다
orc()
DataFrameReader.read.orc(path)
- ORC(Optimized Row Columnar) 파일 형식으로 저장하거나 읽는 메서드
- 대규모 데이터 집합을 저장하고 처리하는 데 특히 최적화
- 데이터의 압축과 직렬화를 최적화하여 읽기 및 쓰기 성능을 향상시키고, 저장 공간을 절약할 수 있도록 설계었다
데이터프레임을 RDD로 변환하기
rdd() 메서드를 사용해 데이터프레임을 네이티브 RDD로 쉽게 변환할 수 있다.
<데이터프레임을 RDD로 변환하기>
stationsdf = spark.read.parquet('hdfs://user/hadoop/stations.parquet')
stationsrdd = stationsdf.rdd
stationsrdd
stationsrdd.take(1)
데이터프레임 데이터 모델 : 기본 유형
- 데이터 프레임 API의 데이터 모델은 하이브 모델을 기반으로 한다. 데이터프레임과 함께 사용되는 데이터 유형은 하이브의 해당 데이터 항목에 직접 매핑된다.
- 리스트, 딕셔너리 및 튜플에 상응하는 것과 같은 복잡한 중첩 유형뿐 아니라 모든 공통 기본 유형을 포함한다.
데이터 타입 | 설명 |
---|---|
StringType | 문자열 타입 |
IntegerType | 정수 타입 |
LongType | 긴 정수 타입 |
FloatType | 실수 타입 |
DoubleType | 더블(배정밀도 부동소수점) 타입 |
DecimalType | 고정 소수점 타입 |
BooleanType | 부울(불리언) 타입 |
BinaryType | 이진(바이너리) 데이터 타입 |
TimestampType | 타임스탬프 타입 |
DateType | 날짜 타입 |
데이터프레임 데이터 모델 : 복합 유형
- 복잡한 중첩 구조는 기본 HiveQL 기반 연산자를 사용해 스파크 SQL에 엑세스할 수 있다.
데이터 타입 | 설명 |
---|---|
ArrayType | 배열 타입 |
MapType | 맵(사전) 타입 |
StructType | 구조체 타입 |
데이터프레임 스키마 추론하기
- 스파크 SQL 데이터프레임의 스키마는 명시적으로 정의되거나 유추될 수 있다.
- 스키마는 추정하는 것이 가장 간단한 방법이지만, 일반적으로 코드에서 스키마를 정의하는 것이 좋다.
- 리플렉션(reflection)을 사용해 객체를 검사하며 해당 구성을 결정하고 데이터프래임 객체의 스키마를 추정한다.
데이터프레임 스키마 정의
- 스키마를 만들려면 StructField 객체 컬랙션을 포함하는 StructType 객체를 만들어야 한다.
<명시적으로 데이터프레임에 대한 스키마 정의>
from pyspark.sql.types import *
myschema = StructType([ \
StructField("station_id", IntegerType(), True), \
StructField("name", StringType() , True), \
StructField("lat", FloatType() ,True), \
StructField("long", FloatType(), True) \
])
데이터프레임 사용
- 데이터프레임 API는 현재 스파크 프로젝트에 가장 빠르게 움직이는 영역 중 하나다. ( 가장 빠르게 업데이트된다는 뜻인듯)
데이터프레임 메타데이터 연산
데이터프레임 API
columns()
df.columns()
- columns() 메서드는 지정된 데이터프레임의 열 이름 리스트를 반환한다.
dtypes()
df.dtypes()
- dtypes() 메서드는 주어진 데이터프레임 객체의 열 이름과 데이터 유형으로 구성된 각 튜블의 리스트를 반환한다.
기본 데이터프레임 연산
- 데이터프레임은 RDD 메서드의 직접적인 변환 및 액션과 같은 많은 함수가 있다.
- select(), drop(), filter(), where(), distint() 메서드는 데이터프레임에서 열을 제거하거나 행을 필터링 할수 있다.
- show(), collect(), take() 액션으로서 리니지의 평가를 트리거한다.
show()
df.show(n=20, truncate=True)
- show() 메서드는 데이터프레임의 처음 n 행을 콘솔에 출력한다.
- 변수로 리턴할수 없다.
select()
df.select(cols)
- select() 메서드는 cols 인수로 지정된 열 리스트에서 새 데이터프레임 객체를 반환한다.
*
를 사용해 모든열을 선택할 수 있다.- alias를 줄수있다.
drop()
df.drop()
- drop() 메서드는 col 인수로 지정된 열이 제거된 새 데이터프레임을 반환한다.
filter()
df.filter(condition)
- filter() 메서드는 행만 포함하는 새 데이터프레임을 반환한다. ( df.filter(df.name == 'St James Park') )
distinct()
df.distinct()
- distinct() 메소드는 기본적으로 중복 행을 필터링에 입력 데이터프레임의 고유한 행을 포함하는 새 데이터프래임을 반환한다.
- 비슷한 메서드로 drop_duplicates() 가 있다.
데이터프레임을 처리할 때 lambda 함수는 약간 다르다.
<스파크 SQL 데이터프레임을 사용한 map() 함수>
rdd = df.rdd.map(lambda r: r.name)
- 매핑 연산의 결과가 RDD 대신 새 데이터프레임을 반환하도록 하려면 select() 가 더 낫다.
Python docstrings ( 설명서 )
- Python docstrings 는 파이썬 스파크 SQL API의 모든 함수에 포함돼 있다.
- 스파크 SQL의 모든 함수 및 스파크 파이썬 API의 모든 다른 함수의 구문과 사용법을 탐색할 수 있다.
- doc 메서드를 사용해 접근할 수 있다.
print(DataFrame.sample.__doc__)
데이터프레임 기본 제공 함수
- 스파크 SQL에서 사용할 수 있는 수많은 함수는 SQL의 대부분의 다른 일반적인 DBMS구현에 있다.
- 파이썬 스파크 API 의 이러한 내장 함수는 pyspark.sql.functions 모듈을 통해 사용할 수 있다.
함수 범주 | 함수 이름 | 설명 |
---|---|---|
집계 함수 | count() |
그룹의 항목 수를 계산합니다. |
sum() |
숫자 컬럼의 합계를 계산합니다. | |
avg() |
숫자 컬럼의 평균값을 계산합니다. | |
max() |
컬럼의 최대값을 반환합니다. | |
min() |
컬럼의 최소값을 반환합니다. | |
문자열 함수 | concat() |
여러 문자열을 연결합니다. |
upper() |
문자열을 대문자로 변환합니다. | |
lower() |
문자열을 소문자로 변환합니다. | |
substr() |
문자열의 특정 부분을 추출합니다. | |
trim() |
문자열 앞뒤의 공백을 제거합니다. | |
날짜 함수 | current_date() |
현재 날짜를 반환합니다. |
date_add() |
지정된 날짜에서 일정한 일수를 더합니다. | |
date_sub() |
지정된 날짜에서 일정한 일수를 뺍니다. | |
datediff() |
두 날짜 사이의 일수 차이를 계산합니다. | |
year() |
날짜에서 연도를 추출합니다. | |
month() |
날짜에서 월을 추출합니다. | |
day() |
날짜에서 일을 추출합니다. | |
조건 함수 | case |
조건에 따라 다른 값을 선택합니다. |
when() |
특정 조건이 참일 때 값을 반환합니다. (주로 case 와 함께 사용됩니다.) |
|
coalesce() |
목록에서 첫 번째 null이 아닌 값을 반환합니다. |
데이터프레임 API에서 사용자 정의 함수 구현하기
- 원하는 기능이 없다면 Spark SQL 에서 사용자 정의 함수(UDF) 를 작성할 수 있다.
udf()
pyspark.sql.functions.udf(func, returnType=StringType)
<스파크 SQL의 사용자 정의 함수>
from pyspark.sql.functions import *
from pyspark.sql.types import *
df = spark.read.parquet('hdfs:///user/hadoop/stations.parquet')
lat2dir = udf(lambda x : 'N' if x > else 'S', StringType())
lon2dir = udf(lambda x : 'E' if x > else 'W', StringType())
df.select(df.lat, lat2dir(df.lat).alias('latdir'),
df.long, lon2dir(df.lat).alias('longdir')) \
.show(5)
여러 데이터프레임에 대한 연산
- join() 및 union() 과 같은 집합 연산은 관계형 SQL 프로그래밍에서 필수적인 연산이므로 데이터프레임의 일반적인 요구 사항이다.
- 데잉터프레임 조인은 RDD API 및 HiveQL 에서 지원하는 모든 조인 연산을 지원한다.
join()
df.join(other, on=None, how=None)
orderBy()
df.orderBy(cols, ascending)
groupBy()
df.groupBy(cols)
캐싱, 지속 및 데이터프레임 재구성
- 데이터프레임 API는 이러한 작업에 대한 스파크 RDD API와 유사한 캐싱, 지속 및 재분할을 위한 매서드를 지원한다.
데이터 프레임 출력 저장
DataFrameWriter
df.write()
하이브 테이블에 데이터 쓰기
데이터프레임을 hive 테이블에 쓸때는 saveAsTable() 을 사용한다.
saveAsTable()
df.write.saveAsTable(name, format=None, mode=None, PartitionBy=None)
- saveAsTable() 메서드는 데이터프레임을 name 인수에 지정된 하이브 테이블에 쓴다.
- mode 유효값은 append, overwrite, error, ignore 이다.
! 그렇다면 merge 를 할때는?
- merge() 메서드를 사용하려면 delta lake 를 사용해야한다.
- DataFrame 은 Spark to hive 는 merge 또는 update 를 지원하지 않는다
<HiveQL 에서는 Qeury 로 merge 를 수행할 수있다>
MERGE INTO target_table AS T
USING source_table AS S
ON T.id = S.id
WHEN MATCHED THEN
UPDATE SET T.value = S.value
WHEN NOT MATCHED THEN
INSERT VALUES (S.id, S.value);
- 다만 df 를 직접 merge 할수 없기때문에 temp View 생성 과정이 필요하다.
<Delta table 에 merge>
deltaTable.as("target")
.merge(
sourceData.as("source"),
"target.id = source.id")
.whenMatched()
.updateExpr(Map("value" -> "source.value"))
.whenNotMatched()
.insertExpr(Map("id" -> "source.id", "value" -> "source.value"))
.execute()
파일에 데이터 쓰기
- 데이터프레임의 데이터는 지원되는 모든 파일 시스템의 파일에 쓸 수 있다.
- csv 가 일반적이 형식이며 df.write.csv() 메서드를 사용한다.
write.csv()
df.write.csv(path, mode, header, etc option.......)
- path : 파일 경로
- mode : 파일이 이미 있는경우 (append, overwrite, ignore, error)
parquet()
df.write.parquet(path, mode=None, partitionBy=None)
- write.parquet() 메서드는 데이터프레임을 parquet 형식의 파일에 쓴다.
etc
df.write.orc()
df.write.json()
df.write.jdbc()
스파크 SQL 엑세스하기
- PySpark 은 프로그래머가 아닌 사용자아게 적절하지 않을 수 있다.
- SQL shell 또는 Spark SQL 엔진에 대한 엑세스는 ODBC를 통한 Tableau 또는 Excel 과 같은 시작화 도구에서 더 적절할 수 있다.
spark-sql shell 을 사용해 Spark SQL 엑세스하기
- Saprk-sql shell 프로그램은 로컬 구성 및 Spark driver binary 를 사용해 Spark SQL 및 Hive에 엑세스할 수 있는 경량 REPL(Read-Evaluate-Print Loop : 읽기-평가-인쇄-루프) shell 이다.
Thrift JDBC/ODBC 서버 실행
- Spark SQL 은 JDBC/ODBC 인터페이스가 있는 분산 쿼리 엔진으로 유용하다.
- Thrift 는 교차 언어 서비스 개발에 사용되는 Apache 프로젝트다.
<Thrift 서버 실행>
$SPARK_HOME/sbin/start-thriftserver.sh
--master와 같은 유효한 모든 spark-submit 명령 행 인수는 start-thriftserver.sh 스크립트에서 허용된다. 또한 --hiveconf 옵션을 사용해 하이브 관련 등록 정보를 제공할 수 있다.
<Thrift 포트 수정>
export HIVE_SERVER2_THRIFT_PORT=<customport>
<Thrift 서버 테스트>
$SPARK_HOME/sbin/stop_thrifrserver.sh
beeline 사용하기
- 명령행 셀인 beeline을 사용해 하이브서버2 또는 Spark SQL Thrift JDBC/ODBC 서버에 연결할 수 있다.
- beeline 은 SQLLine CLI 프로젝트를 기반으로 하는 경량 JDBC 클라이언트 응용 프로그램이다.
<beeline을 사용해 SQL JDBC Thrift 서버를 테스트>
$SPARK_HOME/bin/beeline
<beeline 프롬프트에서 JDBC서버 연결>
beeline> !connect jdbc:hive2://localhost:10000
JDBC/ODBC를 통한 외부 응용 프로그램 사용
- Spark JDBC/ODBC Thrifr 서버는 Tableau , Excel 과 같은 다른 JDBC/ODBC 클라이언트 응용 프로그램에도 연결할 수 있다.
NoSQL 시스템에서 Spark 사용하기
난독증 걸릴거같다 번역 개같이 어렵게 해놨네 스파크 공식문서 구글번역이 더 읽기 쉽것다
- NoSQL 시스템과 방법론을 소개하고, 스파크 처리 워크플로와의 통합을 살펴본다.
NoSQL 시스템 특성
- *이들은 디자인 타임에 Schema 가 없고, 런타임 시 'Schema on read' 이다.* 즉, 미리 정의된 열은 없지만, 각 PUT 연산으로 열이 생성되며, 각 레코드, 문서, 데이터 인스턴스는 이전 인스턴스와 다른 스키마를 가질 수 있다.
- *데이터는 다른 객체와 미리 정의된 관계가 없다.* 즉, 선언적 또는 다른 방식으로 외래 키나 참조 무결성의 개념이 없음을 뜻한다.
- *조인은 일반적으로 피할 수 있다.* 대부분의 NoSQL 구현에서 조인은 절대적으로 최소로 유지되거나 모두 피할 수 있다. 이는 대게 데이터를 비정규화해 종종 중복 데이털르 저장하는 방식으로 수행된다.
- 쓰기 연산은 일반적으로 더 빠르고 확장 가능하다.
- 규모와 확장성 (페타에서 테라까지 쿼리), 성능 및 낮은 마찰을 위해 구축된다.
NoSQL 시스템의 유형
<NoSQL 시스템 유형>
유형 | 설명 | 예 |
---|---|---|
Key-Value Store | 키-값 쌍으로 데이터를 저장하는 가장 간단한 형태의 NoSQL. 고성능의 조회와 쓰기를 지원함. | Redis, Amazon DynamoDB |
Document Store | JSON, BSON, XML과 같은 문서 형식의 데이터를 저장. 문서는 키에 의해 식별되며, 복잡한 데이터 구조를 효율적으로 저장하고 쿼리할 수 있음. | MongoDB, Couchbase |
Column-Family Store | 데이터를 컬럼의 집합인 컬럼 패밀리로 저장. 각 컬럼 패밀리는 여러 컬럼으로 구성될 수 있으며, 대규모 분산 환경에 적합. | Apache Cassandra, HBase |
Graph Database | 노드, 엣지, 프로퍼티를 이용해 데이터를 그래프 구조로 저장. 복잡한 네트워크 연결을 효과적으로 표현하고 분석할 수 있음. | Neo4j, Amazon Neptune |
HBase와 Spark 사용하기
- HBase와 Spark를 함께 사용하는 것은 대규모 데이터 처리 및 분석을 위한조합
HBase 소개
- HBase는 분산형 NoSQL 데이터베이스이며, 대용량의 구조화된 데이터를 저장하고 실시간으로 읽고 쓸 수 있다.
- 분산형 아키텍처:
HBase는 수백 대의 서버로 확장할 수 있는 분산형 아키텍처를 가지고 있다.
데이터는 여러 노드에 분산되어 저장되므로, 대규모 데이터셋을 처리하고 저장할 수 있다. - 고성능:
HBase는 대규모 데이터셋에서도 빠른 읽기와 쓰기를 지원한다.
빠른 읽기를 위해 데이터는 메모리에 캐시되어 있으며, 빠른 쓰기를 위해 Write-Ahead Logging(WAL)과 배치 처리 등의 기술을 사용한다. - 스키마 자유:
HBase는 스키마 자유형 데이터베이스로, 구조화된 데이터 모델이 필요하지 않다.
각 행은 고유한 키에 의해 식별되며, 각 열은 여러 버전을 가질 수 있다. - 컬럼 지향 저장:
HBase는 컬럼 패밀리와 컬럼으로 구성된 컬럼 지향 저장 형식을 사용한다.
이러한 구조는 데이터를 효율적으로 압축하고, 쿼리 성능을 향상시킨다. - 실시간 분석:
HBase는 실시간 데이터 분석과 검색을 지원한다.
데이터에 접근하기 위해 HBase는 키 기반의 빠른 조회를 제공하며, MapReduce나 Apache Spark와 같은 분산 처리 프레임워크와 통합하여 데이터를 분석할 수 있다. - 높은 가용성:
HBase는 자동으로 데이터의 복제를 관리하여 데이터의 가용성을 보장한다.
Hadoop HDFS와 같은 하둡의 기본적인 HDFS(Hadoop Distributed File System)에 데이터를 저장하기 때문에, HDFS의 내결함성을 이용하여 데이터 손실을 방지한다.
HBase 메서드
<데이터 쓰기>
- Put: 특정 행에 데이터를 쓸 때 사용
JAVA
Put put = new Put(Bytes.toBytes("rowKey"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));
table.put(put);
Shell
put 'my_table', 'row_key', 'cf:qualifier', 'value'
- Batch: 여러 행에 대해 일괄적으로 데이터를 쓸 때 사용
JAVA
List<Put> puts = new ArrayList<>();
puts.add(new Put(Bytes.toBytes("rowKey1")).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value1")));
puts.add(new Put(Bytes.toBytes("rowKey2")).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value2")));
table.put(puts);
<데이터 읽기>
- Get: 특정 행의 데이터를 읽을 때 사용
JAVA
Get get = new Get(Bytes.toBytes("rowKey"));
Result result = table.get(get);
Shell
get 'my_table', 'row_key'
- Scan: 여러 행의 데이터를 읽을 때 사용
JAVA
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
// 각 행에 대한 처리
}
Shell
scan 'my_table'
<행삭제>
- Delete: 특정 행을 삭제할 때 사용
JAVA
Delete delete = new Delete(Bytes.toBytes("rowKey"));
table.delete(delete);
Shell
delete 'my_table', 'row_key', 'cf:qualifier'
<테이블 관리>
- CreateTable: 새로운 테이블을 생성할 때 사용
JAVA
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf("tableName"));
tableDescriptor.addFamily(new HColumnDescriptor("cf"));
admin.createTable(tableDescriptor);
Shell
create 'my_table', 'cf1', 'cf2'
- DeleteTable: 테이블을 삭제할 때 사용
JAVA
admin.deleteTable(TableName.valueOf("tableName"));
Shell
drop 'my_table'
HBase 및 Spark
파이썬 API 를 사용해 스파크에서 HBase로 읽고 쓰는 가장 안전한 방법은 해피베이스(HappyBase) 파이썬 패키지를 사용하는 것이다.
sudo pip install happybase
카산드라와 함께 스파크 사용하기
- 다른 주목할 만한 NoSQL 프로젝트는 Apache Cassandra 로 초기에 페이스북에서 개발되었으며, 나중에 아파치 소프트웨어 라이센스 제도에 따라 오픈소스 프로젝트로 릴리스 되었다.
카산드라 소개
카산드라(Cassandra)와 HBase는 모두 대규모 분산형 데이터베이스 시스템
- 데이터 모델:
- 카산드라: 카산드라는 Wide Column Store로서, 행과 열의 개념이 없이 테이블은 로우 키(Row Key)와 컬럼 페밀리(Column Family)로 구성됩니다. 각 로우 키는 여러 개의 컬럼을 가질 수 있다.
- HBase: HBase는 Hadoop 기반의 분산형 컬럼 지향 데이터베이스로, 행 지향 데이터베이스인데다 컬럼 지향 데이터베이스의 장점을 취합한 형태.
- 일관성:
- 카산드라: 카산드라는 AP(Availability and Partition Tolerance) 분류에 속하며, 데이터의 가용성과 파티션 내성에 중점을 둡니다. 일관성은 유연하게 조정할 수 있다.
- HBase: HBase는 CP(Consistency and Partition Tolerance) 분류에 속하며, 일관성과 파티션 내성에 중점을 둔다.
- 분산 디자인:
- 카산드라: 카산드라는 마스터-리스 아키텍처를 사용하지 않으며 모든 노드가 동등합니다. 노드 간 데이터의 분산은 주로 Consistent Hashing 알고리즘을 사용한다.
- HBase: HBase는 Hadoop의 HDFS 파일 시스템을 기반으로 하며, Hadoop 클러스터와 통합된다.
- 읽기/쓰기 성능:
- 카산드라: 쓰기 중심 워크로드에 강하며, 쓰기 지연이 낮고 선형적인 확장성을 제공한다.
- HBase: 대용량의 읽기 작업에 적합하며, 읽기 중심 워크로드에 강하다.
- 사용 사례:
- 카산드라: IoT, 시계열 데이터, 로그 저장 등 쓰기 중심 및 대규모 데이터 처리에 적합하다.
- HBase: 스트리밍 데이터 처리, 온라인 트랜잭션 처리(OLTP), 실시간 분석 등 다양한 사용 사례에 적합하다.
카산드라 쿼리 언어 CQL
- CQL은 Cassandra 데이터베이스와 상호 작용하는 데 사용하는 쿼리 언어로 SQL과 유사한 구문을 사용하여 데이터를 선택, 삽입, 업데이트 및 삭제할 수 있다.
CQL의 주요 특징:
데이터 정의 언어(DDL): 테이블, 인덱스 및 사용자 정의 유형을 생성 및 삭제하는 데 사용된다.
데이터 조작 언어(DML): 데이터를 삽입, 업데이트 및 삭제하는 데 사용된다.
데이터 쿼리 언어(DQL): 데이터를 선택하고 필터링하는 데 사용된다.
DynamoDB 에서 Spark 사용하기
DynamoDB는 AWS 에서 제공하는 완전 관리형 NoSQL Key-Value 데이터베이스 서비스이다. 빠르고 안정적이며 확장 가능한 데이터베이스를 필요로 하는 다양한 앱에서 사용할 수 있다.
- DynamoDB의 주요 특징:
- 완전 관리형: 서버 프로비저닝, 데이터베이스 관리, 백업 및 복구 등을 모두 AWS에서 관리한다.
- 빠른 성능: 밀리초 단위의 지연 시간으로 데이터를 읽고 쓸 수 있다.
- 확장 가능성: 테이블 용량을 쉽게 늘리고 줄일 수 있다
- 높은 가용성: 99.999%의 가용성을 제공한다.
- 비용 효율성: 사용한 만큼만 비용을 지불하면 된다.
- DynamoDB 사용 분야:
- 모바일 애플리케이션: 모바일 애플리케이션은 사용자 데이터, 게임 데이터, 설정 등을 저장하는 데 DynamoDB를 사용할 수 있다.
- 웹 애플리케이션: 웹 애플리케이션은 사용자 프로필, 제품 카탈로그, 세션 데이터 등을 저장하는 데 DynamoDB를 사용할 수 있다.
- IoT 애플리케이션: IoT 애플리케이션은 장치 데이터, 센서 데이터, 이벤트 데이터 등을 저장하는 데 DynamoDB를 사용할 수 있다.
- 게임: 게임은 게임 상태, 플레이어 데이터, 점수 등을 저장하는 데 DynamoDB를 사용할 수 있다.
- Spark 에서 사용하려면 boto3 lib 가 필요하다.
- AWS 에 연결하려면 AWS API 자격 증명이 필요하다.
=== 요약 ===
스파크 SQL은 스파크의 가장 보편적인 확장 기능중 하나다. 대호식 쿼리를 지원하고 비즈니스 인텔리전스 및 시각화 도구를 지원하며, 훨씬 광범위한 분석가가 이용할 수 있는스파크를 만들 수 있다. SQL 인터페이스와 관계형 데이터베이스와 같은 프로그래밍 방식을 사용해 강력한 스파크 런타임 분산 처리 프레임워크에 대한 엑세스를 제공한다. 스파크 SQL은 SQL을 사용해 관계형 액세스 패턴을 중심으로 많은 최적화를 도입했다. 이러한 최적화에는 칼럼 레벨 통계를 유지 관리하는 칼럼형 스토리자와 데이터에서 관찰되는 통계 및 왜곡을 기반으로 처리 중에 DAG 를 변경할 수 있는 부분 DAG 실행이 포함된다. 또한 스파크 SQL은 스파크 RDD의 구조화된, 칼럼 추상화인 데이터프레임을 소개한다. 데이터프레임 API는 대부분의 SQL 개발자, 분석가, 마니아 및 일반 사용자에게 익숙한 많은 기능과 함수를 제공한다.
NoSQL 데이터베이스는 인터넷 규모의 스토리지 기능과 쿼리 경계는 물론 분산 장치 및 모바일 응용 프로그램 상호 작용을 지원하는 빠른 읽기 및 쓰기 액세스 기능을 제공해 기존 SQL 시스템의 보완 및 대체 수단이 되었다. 이런한 개념은 초기 구글과 야후의 작업으로부터 나왔으므로 NoSQL 개념과 구현은 스파크와 병행해 등장했다.