(월간 마소) Rhipe 예제 코드

이번달(2011.09) 월간 마이크로소프트에 기고했던 R로 하는 Big Data분석의 Rhipe 예제 코드에 퍼포먼스 문제가 있어서 개선된 코드를 올려본다. Rhipe의 아키텍처를 살펴볼 시간이 없었는데, 같은 회사분께서 고맙게도 구동 방식을 자세히 설명해줘서 코드 튜닝을 할 수 있었던거 같다.

원본 예제코드가 세 노드에서 6시간에 걸쳐 map/reduce를 한다는 것을 확인 했던 게 원고 데드라인이 지난 후였었는데,   Rhipe의 구동 방식을 모르고서는 최적화가 힘들거 같다는 생각이 있어서 이제야 최적화된 코드를 공개한다. 

Rhipe는 내부적으로 데이터의 serialization을 빈번히 수행하기 때문에 map/reduce단계에서의 rhcollect호출을 줄이는 게 성능향상의 키 포인트다. 사실 이런 Rhipe의 내부 구동 방식을 모르고서 map/reduce를 짜기도 힘든데, 이런 사실까지 분석가가 알아서 map/reduce를 해야 한다는 게 무리이기는 하지만 일단 이를 통해서 6배정도 column mean결과가 빨라졌다는 사실을 공유하는 바이다.

그러니 앞으로 Rhipe를 사용할때는 이점을 염두에 두고 사용해야 할 것이다.

 

코드에서 달라진 점은 map의 키와 값을 (index_number, value)로 되었던 것을 (index_number, list(sum_of_values, number_of_values))로 바꿔서 원래 rhcollect 호출 횟수가 J*K(’J X K’ matrix)라고 할 때 최적화로 K번만 호출하게 했다. 이 코드로 말미암에 숨어있는 serialization과정들의 횟수를 더 많이 줄일 수 있었다.

이 이외에 Rhipe 옵션에서 최적화 포인트가 한두개 있는데, 그에 대해서는 추후 공유하도록 하겠다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bigmeanmap <- expression({
  y <- do.call("rbind", lapply(map.values, function(r){
        as.numeric(strsplit(r,",")[[1]])
      }
  ))
  summed<- colSums(y, na.rm=T)
  nr <- nrow(y)
  nc <- ncol(y)
  #accumulate # of NAs
  for(i in 1:nc){
    nanum <- length(which(is.na(y[,i])))
    if(nanum == nr) next
    rhcollect(i, list(val=summed[i], len=(nr-nanum)))
  }
})
 
bigmeanreduce<-expression(
 pre={
   total <- 0
   cnt <- 0
   },
 reduce={
   total <- total + sum(sapply(reduce.values, function(x) sum(x$val)))
   cnt <- cnt + sum(sapply(reduce.values, function(x) sum(x$len)))
   },
 post={rhcollect(reduce.key,total/cnt)}
)
 
z <- rhmr(map=bigmeanmap, reduce=bigmeanreduce,
   ifolder="/rhipe/airline/hl_airline.csv",
   ofolder="/rhipe/airline/out5",
   inout=c("text", "sequence")
)
 
jobid <- rhex(z, async=TRUE)

CC BY-NC 4.0 (월간 마소) Rhipe 예제 코드 by from __future__ import dream is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.