2014년 11월 23일 일요일

Spark 사용기 3 - Group By, Order By 성능비교 및 Future Possibilities : Hive vs Spark M/R vs Spark SQL

이번에는 Group By 와 Sort 가 있는 집계( sum() ) 쿼리를,

(1) Hive 로
(2) Spark + Scala M/R 로
(3) Spark + Scala + SparkSQL 로

동일하게 구현해보고, 성능 비교 및 개발 생산성 비교를 해 보았다.

Test  로 사용된 테이블의 Row 수 및 컬럼 수는 아래와 같다.
해당 테이블은 성능 검증 용 Test 셈플 테이블이다. Row 수 등이 어떤 의미가 있는 데이타는 아니므로, 수치에 신경을 쓰진 말기를....

대략 1300만건 정도의 조그마한 테이블이다.
최근에 회사에서 수천억 Row 단위의 테이블들을 다루고 있긴 한데...갸들로 실험하기에는 Hive 쿼리가 수시간이 소요될 것이므로, 일단 단순한 셈플링 데이타 테이블로 실험을 시작하였다.
아래는 컬럼 갯수 참고용 스샷 이다.  스미카상  Date 로 Partition 해 두었다.
(역시 임시 생성 용 스키마 이니 너무 의미를 갖진 말기를....)


컬럼 갯수는 24개.  ( date 키로 partition )

(1) Hive

TEST SQL 은 아래와 같다.

select 
      date , inflositeno, sum(price) 
from 
      everyclick_order_mapping 
group by date,inflositeno 
order by date;

간단한 쿼리 지만, Group by , Sort , Sum 이 있는 전형적인 Full Scan 집계 쿼리 이다.
위를 수행한 결과는 아래와 같다.

42초. 

(date 는 Partition 되어 있어, 통으로 되어 있는 경우에 비해, date 로 group by 하는데 최적화 되어 있는 스키마 구조이긴 하다. 허나 어쨋든 오래 걸렸다.)

(2) Spark ( with Scala M/R )

먼저, 순수하게 위 로직을 scala 코딩으로 구현해 보았고, spark 엔진 위에서 실험 해 보았다.
코드는 아래와 같다.

val hdfsFile = sc.textFile("hdfs://master001.prod.moneymall.ssgbi.com:9000/moneymall/hivedata/everyclick_order_mapping/*")
case class EveryClickOrderClass(currdate:String , siteno:String, price:Double)
val result = hdfsFile.map(_.split("\t")).map(row => EveryClickOrderClass(row(0), row(1) , row(6).toString.toDouble ))

// scala coding.
val keyVals = result.map( clickOrder => ((clickOrder.currdate, clickOrder.siteno), (1 , clickOrder.price)))
val results = keyVals.reduceByKey( (a,b) => (a._1 + b._1 , a._2 + b._2))
results.sortByKey().collect().foreach( println)


Map 과 Reduce 단계는 거의 시간이 소요 되지 않는다, 0.3초 미만....
오히려 Sort 하는 과정이, 아무래도 Network 를 이용하기 때문에 조금 시간이 소요 된듯 하다..(코드에서 Print 시점 Sort 를 걸었다..)


그래 봤자, 결과는 0.39초...윗부분에 다른 step 들도 좀 더 있었지만, 무시할만큼 찰나의 시간에 끝난 일들이다.

전부 합쳐도 1초 미만. 경이적인 속도 이다.

(3) Spark ( with Scala & Spark SQL )

다음으로 1.x 버전에서 도입된 Spark SQL ....
Spark Scala 내부 RDD 자료 구조에다 데고, 바로 Query 를 날릴 수 있다. C# 으로 코딩하다가 Linq 기술로 C# 내 자료구조에 데고 쿼리 하던 때와 흡사한 생산성을 꽤할 수 있다.

더 매력적인 것은 HBase 나 HDFS 혹은 MySQL 등의 멀티 소스로부터 RDD 를 생성하고, 각 N개 멀티 소스를 Join 쿼리를 이용할 수 있다는 점이다.

row 를 Class 로 정의하여 DTO Value Object 클래스화 해 놓고, 해당 Data 가 담긴 Class Structure 를 Generic Class 의 Array. 즉, Java 로 치면 Generic List 를 각 Table 단위로 등록해주고 해당 Table 에 쿼리가 가능하다는 점이 매우 매력적이다.

코드는 아래와 같다.

val hdfsFile = sc.textFile("hdfs://master001.prod.moneymall.ssgbi.com:9000/moneymall/hivedata/everyclick_order_mapping/*")
case class EveryClickOrderClass(currdate:String , siteno:String, price:Double)
val result = hdfsFile.map(_.split("\t")).map(row => EveryClickOrderClass(row(0), row(1) , row(6).toString.toDouble ))

// scala sql.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
result.registerAsTable("clickAndOrder")
val results = sqlContext.sql("""select currdate, siteno, count(*) , sum(price)
  from clickAndOrder
  group by currdate,siteno
  order by currdate""")
results.collect().foreach( println)


 SparkSQL 은 sort 를 포함하여 수행하였기 때문에 모든 과정이 0.6초 정도 걸렸다... Spark 콘솔에서 한줄 한줄 실행하며, 결과를 확인 할 수 있어, Hive 나 Spark + Java 보다 매력적이기도 하다.
위처럼 SQL 을 실행하는 순간에는 실행계획이 바로 보인다.

결과 출력 시점에는 sort 를 할 필요가 없다. foreach 만 써주었기 때문인지, 결과 출력은 0.2초 만에 끝났다.


일단 전부 합쳐도 1초 미만....

일단, 너무 순간에 결과가 나와 다소 성급한 판단이긴 하지만, Spark + Scala M/R 과  Spark + Scala + Spark SQL 은 속도면에서 그리 큰 차이가 있는 거 같지는 않다.


(4) Future Work &  Spark M/R(SparkSQL) 의 가능성.

사실 이번 실험은 실험 용  Data Set 이 너무 작았다.

본 편에서 언급하진 않았지만, 실제로는 뒤이어 몇가지 실험을 더 해보긴 했는데, 사실은 6 Tera Byte 크기의 데이타 였다. (해당 결과는 Hive 로는 결과를 기다리는데 너무 오래 걸려 중간에 kill 하고 더이상의 Hive 비교를 중단 하였다. )

해당 Big 한 Data 로 재 실험을 했을 때는, Spark 로 brute force 알고리즘을 돌리기에도 다소 부담되는 바 조금더 알고리즘 적인 접근을 해보았는데..... 바로, Hash Table 형 NoSQL 과 HDFS 와 HBase 같은 Range Sorted Dictionary 를 Nested loop Join 해본 점이다.. (이부분은 기회가 되는데로 별도 편으로 블로깅 할 예정이다.)

사실 메뉴얼이 빈약하고, googling 을 해보아도 자료가 좀 빈약한데, Hbase 의 scan 쿼리가 불가능하지는 않았다... (별로 편하게 되어 있진 않지만...가능하였다.)
Scan 이 되고 안되고는 매우 중요한 요소인데, Spark 팀의 HBase 셈플은 그걸 간과 한듯 하다. 모든 셈플들은 모든 데이타를 HBase 에서 몽땅 가져다 놓고, Spark 엔진으로 모든걸 처리하고 있다. 실제 Production 환경에서 이런 경우는 빈번하지 않을 것이다.

우리의 경우만 하더라도, 메모리 클러스터에 로딩 자체가 불가능한 거대 테이블과의 부담되는  Join 이 왕왕 있다. 크기가 작은 테이블들은 HBase 에 있는 경우가 거의 없고, 그냥 HDFS 안에 존재한다.

이런 경우 컴팩트한 테이블을 메모리에 몽땅 올려 놓고, Join 바깥 쪽에서 루핑 돌며, 거대 테이블을  Nested loop Join 을 할수 밖에 없다. (물론 거대 테이블이 메모리에 모두 로딩 가능하다면, 거대 테이블을 메모리에 로딩 후 cache() 나 persist() 걸어 놓고 로직 연산을 하는 것이 더 유리 할 것이다.)

두가지 방법이 있을 것이다. 큰 데이타는 어차피 메모리에 로딩이 안된다면, 작은 테이블로 루프를 돌거나... 큰 테이블로 루프를 돌거나....
일반적인 Brute Force Nested Loop 조인이라면, 그나마 메모리 로딩이 가능한 작은 테이블을 메모리에 올려놓고, 큰 테이블로 For문을 도는 것이 훨씬 유리하다..

하지만, 내가 Hbase 의 Scan 억세스에 주목했던 이유는, 작은 테이블로 Loop 를 돌면서 큰 테이블을  Scanning 접근 했을 때... ( Like 나 시계열 범위 Join 일때) Nested Loop Join 알고리즘으로 접근해도, 내부적으로 Merge Join 처럼 동작하여, 범위 Join 이 매우 빨라 질 수 있다는 데, 있다.. 범위 Join 이 아닌 직접 1:1 매칭 Join 이라면, Redis 나 Memcache 나 MongoDB 나 Casandra 같은 Hash Table 성격의 NoSQL 을 이용하면 될 것이다. 그 경우 Nested Loop Join 알고리즘으로 접근해도, 내부적으로는 Hash Join 처럼 동작 할 것이기 때문이다.

여튼 Spark + Scala + SparkSQL  의 조합은 매우 매력적인 조합임에 틀림 없다. 앞으로 오래동안 부대낄 수 있을 것 같은 생각이 강하게 든다...


댓글 없음:

댓글 쓰기