About Hadoop Streaming

Hadoop을 처음에 Java로만 접근을 했다. 물론 Java API가 기본이기 때문에 그럴것이다.
물론 저번주 사내 Tech Talk에서도 Java Code만 공개해서 예제를 보여줬다.
사내에서 많이 쓰는 언어에 대해서 예제를 만들고 보여줬어야 하는데 조금 아쉽기는 하다.(설명 당시에는 전혀 감을 못잡은 상태였다.)
그러다 짬을내서 API와 소스를 보고 예제를 구현해 봤다.

Java api is not a only way to construct map&reduce function. last week in Tech Talk I didn’t recommend hadoop streaming, cause I did’t find any examples of hadoop streaming. Finally I try to repeat  to exam  Streaming API and successfully test it.

결론적으로 말하자면, Hadoop Streaming은 Linux shell의 pipe 연산자의 역할을 Hadoop Streaming Library에서 수행해 준다는 것이다. 따라서 세상의 어떤 언어라든지 Hadoop의 mapreduce함수로 구현이 될 수 있는것이다.

Conclusively, Hadoop Streaming gives an simple interface like Pipe operator in Linux shell. you can see a Pipe operator like this in any shell. ( ls –al | sort  | more).
stdout and stdin is the only interface of map&reduce function in Hadoop Streaming. So any Language can be a MapReduce function.

여러 언어로 구현을 해보고 테스트도 해봤지만 여기서는 C로 구현된 Streaming 예제(word count)만 제공하겠다. 이유는 없다. 단지 C 문법이 직관적이고 구현하기가 편했기 때문이라는 것밖에는 또한 여러 언어 중에서 가장 빨랐기 때문이다. 적어도 나에겐 Perl 함수를 구현하는 것보다 C함수를 구현하는게 더 빨랐다. ㅜㅜ (역시 나에게 편한 언어가 짱이다.)

this post only provides a C Streaming example of word count. (like in hadoop-wiki)

[CODE c]
/*
Mapper.c

written by gogamza on 2007. 05.05

— freesearch.pe.kr —

*/
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <string.h>

#define BUF_SIZE        2048
#define DELIM   ” \n”

int main(int argc, char *argv[]){
   char buffer[BUF_SIZE];

   while(fgets(buffer, BUF_SIZE – 1, stdin)){
          char *querys  = rindex(buffer, ‘\t’);
          char *query = NULL;
          if(querys == NULL) continue;
          querys += 1; /*  not to include ‘\t’ */
          query = strtok(querys, DELIM);
          while(query){
                 printf(“%s\t1\n”, query);
                 query = strtok(NULL, DELIM);
          }
   }
   return 0;
}
[/CODE]

map function code(Mapper.c)

Above function produces mapping ‘(keyword, 1) tuples’ of log files.  Inner tuple delimiter is ‘\t’ and Reduce function can be matched to this delimiter. (don’t say about why use rindex function. in may case of log was setted to this format. )

[CODE c]
/*
Reducer.c
 
written by gogamza on 2007. 05.05

 — http://freesearch.pe.kr —
*/
#include <stdio.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

#define BUFFER_SIZE     1024
#define DELIM   “\n\t”

int main(int argc, char *argv[]){
   char strLastKey[BUFFER_SIZE];
   char strLine[BUFFER_SIZE];
   int count = 0;

   *strLastKey = ‘\0’;
   *strLine = ‘\0’;

   while( fgets(strLine, BUFFER_SIZE – 1, stdin) ){
          char *strCurrKey = NULL;
          char *strCurrNum = NULL;

          strCurrKey  = strtok(strLine, DELIM);
          strCurrNum = strtok(NULL, DELIM); /* necessary to check error but…. */

          if( strLastKey[0] == ‘\0’){
                 strcpy(strLastKey, strCurrKey);
          }

          if(strcmp(strCurrKey, strLastKey)){
                 printf(“%s\t%d\n”, strLastKey, count);
                 count = atoi(strCurrNum);
          }else{
                 count += atoi(strCurrNum);
          }
          strcpy(strLastKey, strCurrKey);

   }

   printf(“%s\t%d\n”, strLastKey, count); /* flush the count */
   return 0;
}
[/CODE]

reduce function code(Reducer.c)

Reduce function is slightly complex to compare with map function, in Java API, reduce function provides a boundary of each Key but in streaming you should make a border of yours.

You can debug above code like this.

cat any_los | ./Mapper | sort | ./Reducer >> log_result

Above line is local version of mapreduce. can be a simple dedugging examples owned mapreduce functions.

it’s time to execute in cluster.

Shortcut to run from any directory:

setenv HSTREAMING “$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/build/hadoop-streaming.jar”

Execute below line.

$HSTREAMING -mapper “./Mapper” -file “/home/gogamza/Mapper” -reducer “./Reducer” -file “/home/gogamza/Reducer” -jobconf mapred.reduce.tasks=10 -jobconf mapred.map.tasks=10 -input “/usr/gogamza/logs” -output “/usr/gogamza/log_result”

I think, Streaming is so slow to compare
with original java api.  Performances
result of my case examples is below. (because of I/O bottle neck and
incessantely context switching of forked process.)

 

Using Java API >> C Streaming >>
Perl Streaming

 

But what is better decision between
learning new language and slow mapreduce performance depends on your own decision
and risk. (the latter commonly be recommanded, cause learning new language wastes many time. : )  )

if you understand above code, you can easily develope any language.

ps. 인터넷에 위의(Streaming) 예제 코드가 hadoop에 전혀 공개가 안되어 있어서 분석후 포스트로 작성을 해봤다.(아마도 최초의 공개된 예제코드가 아닐까 한다. ) 

물론 다른 외국 개발자들도 볼수 있게 영어로 작성을 했고, 아마도 돌아오는 Hadoop Korean User 모임에서 이야기하게 될지도 모르겠다. 하긴 저정도의 Streaming예제를 원하는 분들이 국내에 몇이나 될까 의문이 되지만, 흥미로운 예제인건 사실이다.

많은 분들이 새로운 언어를 배우기 보다는 기존의 알고 있는 언어로 쉽게 접근하기를 원한다. 그런 접근성 측면에서 바라볼때 누구나 쉽게 접근하기 위한 Streaming 라이브러리는 중요한 의미를 제시한다고 생각한다.

CC BY-NC 4.0 About Hadoop Streaming by from __future__ import dream is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.