2014년 12월 14일 일요일

Spark 사용기 5 - SparkSQL 실무 활용기

Spark 를 실무에서 사용하기 시작한지 어언 2달...
아직까지는 Heavy Batch 일부와 Adhoc 개발 부분에서만 사용중이지만, 쓰면 쓸수록, 그 압도적인 성능에 감탄을 금할 수 없다.

물론 시행착오가 없었던 것은 아니다. 실무 사용하기 전 검증 단계에서 사용했던, Spark + tachyon + Shark 는  Memory 관련 에러가 빈번했었고, Public 가상화 환경에서의 태스트 이긴 하였으나, Spark + Yarn 의 태스트는 Spark StandAlone 에 비하여 성능이 더 떨어지는 현상도 접한 바 있다. ( Yarn 테스트는 아직 충분히 되지 않았다. Production 환경에서 여러가지 설정을 바꾸어가며 좀더 지켜봐야 하는 부분이다.)

오늘은 Spark + Scala M/R + SparkSQL 을 실무 사용하며, 발견한 SparkSQL 의 미숙함에 대하여 블로그 해보겠다.

우선 최근에 접했던 것은 Not IN 요건.
결과부터 말하자면, SparkSQL 은 Not IN 을 아직 지원하지 않는다. 물론, 간단한 요건이면, 로딩하는 순간에 filter 로 제외 시키고 불러 올 수 있다. 그렇게 처리 가능한 루틴은 매우 빠르게 Not IN 처리가 가능하다. 단, 아래 SQL 처럼 무언가 요건이 내포된 경우의 Not IN 은 Filter 로 처리하기 힘든 요건이다.

우선 아래 쿼리는 SparkSQL 이 지원하지 않는다는 것이 다소 놀랍지는 않았다. Hive 도 아래 쿼리를 지원하지 않음을 기 알고 있었기 때문이다. (hive0.13 이상은 지원하는지 체크해보지는 않았다... 우리 Production 기준으로 Hive 도 아래 Not IN 쿼리를 지원하지 않는다.)

[1] 첫번째 시도.

         val query3 = """
                   select
                        count(distinct pcId)
                   from
                        everyClickFlat_PC
                   where
                        mbrId = '' and
                        url like '%s' and
                        pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s')
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이렇게 수행하면 아래와 같은 에러가 발생한다.

java.lang.RuntimeException: [9.38] failure: string literal expected
                        pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like 'http://www.ssg.com/')
                                     ^
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
        at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
        at $iwC$$iwC.inline(<console>:220)
        at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
        at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at $iwC$$iwC.dayJob(<console>:80)
        at $iwC$$iwC$$anonfun$1.apply$mcVI$sp(<console>:56)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at $iwC$$iwC.<init>(<console>:41)
        at $iwC.<init>(<console>:345)
        at <init>(<console>:347)
        at .<init>(<console>:351)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:782)
        at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
        at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
        at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
        at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
        at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:766)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

[2] 두번째 시도



그냥 시도 이다. 쿼리가 정확하게 Not In 과 동일한 원하는 결과물을 가져다 줄지 검증은 아래처럼 너무 오래걸려서 검증해보지도 못하고 kill 하였다...

 val query3 = """
                   select
                        count(distinct A.pcId)
                   from
                        everyClickFlat_PC as A, everyClickFlat_PC as B
                   where
                        A.mbrId = '' and
                        A.url like '%s' and
                        B.mbrId != '' and
                        B.url like '%s' and
                        A.pcId != B.pcId
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이렇게 했더니, 에러는 안나지만.... 무쟈게 오래 걸린다. 5분정도 기다리다가 kill..

[3] 세번째 시도
이번 역시 그냥 시도이다.. Not In 과 비스무리한 결과라도 얻을라고 했던 Just 시도 이다..

         val query3 = """
                   select
                        count(distinct A.pcId)
                   from
                         (select distinct pcId from everyClickFlat_PC where mbrId = '' and url like '%s') as A, 
                         (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s') as B
                   where
                        A.pcId != B.pcId
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이 역시 무쟈게 오래 걸린다.. 이 역시 8분 정도 기다리다가 kill...

[4] 네번째 시도

사실 Not In 은 Left Semi Join 을 이용하면 동작할것임은 어느정도 예상하고는 있었다. Not In 을 지원하는 RDBMS 에서 조차, 성능상 이유로, Not In 보다는 semi join 을 많이 사용하곤 하므로...
left semi join 사용 했더니 약 5초만에 결과가 나왔다. 참고로 저 테이블은 300Gb 가 넘는 크기의 테이블이다. (물론 성능 극대화를 위하여, 파티션 처리를, 먼저 전처리 해두었다.)

        val query3_3 = """
                   select
                          count(distinct results3_1.pcId)
                   from
                         results3_1 left semi join results3_2 on results3_1.pcId != results3_2.pcId
                   """
          val results3_3 = sqlContext.sql(query3_3)

          pc_uv2 = results3_3.collect()(0)(0).toString.toDouble

[5] 결론

물론 이번에는 실험해보지 않았지만, SparkSQL 을 우회해서 사용하지 않고, Scala 로 그냥 구현하면 Not In 을 훨씬 빠르게 구현할 수 있을 것이나, 300Gb 가 5초 정도면, SparkSQL 도 쓸만한 속도이다... 시도해보진 않았지만, 같은 쿼리를 Hive 환경에서 테스트 했다면, 수분 혹은 수십분도 걸릴 수 있었을 것이다.

Hive 나 SparkSQL 이 RDBMS 에 비하여 지원하지 않는 아쉬운 기능 중, 실무에서 제일 쉽게 접하는 것중 첫번째 정도는 inline query 가 아닐 까 한다. 두번째 정도가 Not In 일것 같다. inline 쿼리는 scala 로 처리하면, 매우 빠르게 구현이 가능했고, Not In 은 SparkSQL 이 직접 지원하진 않지만, Left Semi Join 이 대안 정도로 쓸만 한것 같다가 오늘의 결론이다...



2014년 12월 1일 월요일

Elasticsearch 설치기 1 - 기본 설치 & plug-in 설치

Elasticsearch 설치는 매우매우 쉽다.

(1) 우선 공식 홈페이지에서 zip 파일을 다운로드 받는다.

http://www.elasticsearch.org/overview/elkdownloads/

(2) 그리고 압축을 푼다.

(3) 그리고 아래처럼 실행하면 끝.... 사실 아래처럼 백그라운드 실행을 할 필요는 없다. 그러나, 최초 Getting Started 데로, 실행시 프롬프트로 빠져나가지 않았고, 프롬프트로 돌아오면 데몬이 종료됨을 발견하여, 2번째 시도에서는 아래처럼 Ctrl+z 로 나와서 backgroud 프로세스로 돌려 주었었다. 물론 그렇게 하여도, 데모나이즈 화가 잘되는 것 같았지만, 좀더 자료를 찾아보니, -d 옵션을 주면 데몬화 실행이 됨을 알 수 있었다..

백그라운드 실행 - 이렇게 할 필요는 없다.
-d 옵션을 주면 데모나이즈 실행 모드 이다.
(4) 설치 확인은 아래처럼 Restful 접근을 해보면 가능하다.

(5) 뭔가 불안하리 만큼 너무 설치가 간단하다. Web-Console 이라도 띄워보기 위해 플러그인을 뒤져 보았다. marvel 이라고 하는 걸쭉한 모니터링 도구가 바로 검색 되었다. 아래처럼 plug-in 설치도 매우 쉽다. ( marvel 은 이름이 생소하였지만, 설치하고 나서 알고보니 kibana 위에 구성된 3rd party kibana 활용 dashboard 였다.. 요즘 뜨고 있는 ELK 의 K )
marvel 설치 후에는 elasticsearch 재실행이 필요하다.
(6) marvel web console 화면은 아래와 같은 주소로 띄울 수 있다. 9200 으로 방화벽 신청이 필요할 수 있다.

http://설치서버주소:9200/_plugin/marvel/
-> kibana dashboard 페이지로 direction 된다.


일단.... 오늘은 여기까지....
이제 좀 놀다가 자야겠다....

회의는 회사에서... R&D 는 집에서....



Spark 사용기 4 - cache() 쓰고 안쓰고 비교

최근에 만든 Spark 위의 Scala M/R 배치는 3개 HDFS File 을 가지고 연산을 수행하는데, 모두 Memory 에 로딩 가능한 정도의 크기이다. 이 경우 RDD 를 Cache 할 필요가 있을까?

그래서 실험을 해 보았다.

(1) Cache 안썻을 때
-> 14초

(2) Cache 썼을 때
-> 1차 시도: 45초
-> 2차 시도: 32초

1차 시도를 했을때 , 45초나 걸려서 다소 놀랐었다. 그때는 뭔가 효율적으로 자료구조를 메모리 DB 화 하기 위해 복잡한 insert 루틴을 수행하고 있나 보다 했었다. 그런데, 그런 논리라고 한다면, 2차 시도때 훨씬 빨라 져야 정상일 것이다.
그런데, 결과는 약간 빨라지긴 했지만, 그다지 안썻을 때에 비해 전혀 개선된 것 같지가 않았다.

(3) cache() 했을 때 값이 남아 있는지 여부?

Shell 콘솔을 껏다 키면, 살짝 간격을 두고 다 없어진다. 그럼 뭔차이일까?
오히려 더 느리기만 하고.... (persist 를 쓰면, keeping 이 되긴 하겠지만...)

일단 짐작을 해 보자면, 클러스터를 Yarn 기반으로 구동 시키면, MR 코드가 Yarn 의 Job 을 수행시켰었다. 이 경우 Cache 를 적절하게 써주면, HDFS 가 아닌 Spark Memory 를 활용하며, 좀더 효율적인 연산을 하지 않을 까 기대했었다...  그 시나리오를 실험해보자, 경미하게 속도 개선이 있었지만, 역시 큰 개선은 없었다. 그러나, Job 성격에 따라 성능 향상이 많은 경우도 있을 것이다. 일단, Cache 는 그런 용도라고 짐작하고 넘어가야겠다.

Yarn 모드가 아닌 Spark Engine 자체로 Master 와 Slave 노드를 설정하고 Memory Instance 를 띄워서 Memory Cluster 를 구동한 경우에는 Memory Cluster 에 로딩 가능한 크기인 경우, 굳이 Cache 를 쓰지 않아도 메모리에서 모든 연산이 이루어지는 듯 하다. 아니, 위 실험 처럼 오히려 속도가 빠르다....
즉, 그러한 환경에서는 ( Stanalone Master & Slave Cluster 모드) Cache 를 쓰지 않는게 더 효율적인듯 하다. 가끔 pinning 할때만, persist 기능을 적절히 써주면 될듯 하다.




ps... persist 를 적절히 쓰면 10배 이상 성능이 개선된다고 한다... 아래 메뉴월 원문을 paste 해본다.  이런실험은 좀더 다양한 M/R 을 짜보고 종합적으로 실험해 보아야 할듯 하다. 일단, 오늘 코딩한 소스로는 아무것도 안쓴게 젤로 빠르다...

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing a StorageLevel object (ScalaJavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which isStorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SERSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.