이번달(2011.09) 월간 마이크로소프트에 기고했던 R로 하는 Big Data분석의 Rhipe 예제 코드에 퍼포먼스 문제가 있어서 개선된 코드를 올려본다. Rhipe의 아키텍처를 살펴볼 시간이 없었는데, 같은 회사분께서 고맙게도 구동 방식을 자세히 설명해줘서 코드 튜닝을 할 수 있었던거 같다.
원본 예제코드가 세 노드에서 6시간에 걸쳐 map/reduce를 한다는 것을 확인 했던 게 원고 데드라인이 지난 후였었는데, Rhipe의 구동 방식을 모르고서는 최적화가 힘들거 같다는 생각이 있어서 이제야 최적화된 코드를 공개한다.
Rhipe는 내부적으로 데이터의 serialization을 빈번히 수행하기 때문에 map/reduce단계에서의 rhcollect호출을 줄이는 게 성능향상의 키 포인트다. 사실 이런 Rhipe의 내부 구동 방식을 모르고서 map/reduce를 짜기도 힘든데, 이런 사실까지 분석가가 알아서 map/reduce를 해야 한다는 게 무리이기는 하지만 일단 이를 통해서 6배정도 column mean결과가 빨라졌다는 사실을 공유하는 바이다.
그러니 앞으로 Rhipe를 사용할때는 이점을 염두에 두고 사용해야 할 것이다.
코드에서 달라진 점은 map의 키와 값을 (index_number, value)로 되었던 것을 (index_number, list(sum_of_values, number_of_values))로 바꿔서 원래 rhcollect 호출 횟수가 J*K(’J X K’ matrix)라고 할 때 최적화로 K번만 호출하게 했다. 이 코드로 말미암에 숨어있는 serialization과정들의 횟수를 더 많이 줄일 수 있었다.
이 이외에 Rhipe 옵션에서 최적화 포인트가 한두개 있는데, 그에 대해서는 추후 공유하도록 하겠다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | bigmeanmap <- expression({ y <- do.call("rbind", lapply(map.values, function(r){ as.numeric(strsplit(r,",")[[1]]) } )) summed<- colSums(y, na.rm=T) nr <- nrow(y) nc <- ncol(y) #accumulate # of NAs for(i in 1:nc){ nanum <- length(which(is.na(y[,i]))) if(nanum == nr) next rhcollect(i, list(val=summed[i], len=(nr-nanum))) } }) bigmeanreduce<-expression( pre={ total <- 0 cnt <- 0 }, reduce={ total <- total + sum(sapply(reduce.values, function(x) sum(x$val))) cnt <- cnt + sum(sapply(reduce.values, function(x) sum(x$len))) }, post={rhcollect(reduce.key,total/cnt)} ) z <- rhmr(map=bigmeanmap, reduce=bigmeanreduce, ifolder="/rhipe/airline/hl_airline.csv", ofolder="/rhipe/airline/out5", inout=c("text", "sequence") ) jobid <- rhex(z, async=TRUE) |
(월간 마소) Rhipe 예제 코드 by from __future__ import dream is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.