Erlang으로 구현해본 멀티 코어 CPU용 MapReduce

사실 대부분 코드의 본체는 Programming Erlang 의 예제코드에서 얻었고 간단하게 멀티 코어용 MapReduce 모듈을 구현한것을 보고 감동을 받아서 조금 고쳐 봤다.

책에서는 Reducer 하나만을 생성해서 결과를 combine 했는데 이 모듈에서는 Reducer 갯수를 조절할 수 있게 해놓았고, Hadoop과 유사하게 Key, Values 쌍으로 소팅해서 결과를 가져오게 했다.
Hadoop처럼 대용량 파일을 모두 메모리에 올릴 수 없어 파일로 쓸수 밖에 없는 경우에 빠른 서치를 위해 소팅을 하게 되는데 내 모듈에서는 출력의 편리함을 위한 목적으로 소팅을 했다.

사실 이 코드를 다중의 서버에서 돌아가는 모듈로 간단하게 컨버팅 할 수 있는데, 그렇게 하게 되면 Hadoop과 비슷한(완성도는 낮지만, 기능적으로 유사한) 분산처리 환경으로 만들 수 있을거라 생각한다.

Erlang 코딩에 익숙해 지는데 생각보다 조금 오래 걸렸는데 (물론 함수형 언어에 대해서 경험이 있음에도 불구하고) 아마도 문법에 대해서 너무 대충 훓고 지나간게 원인이 된게 아니였나 생각한다.

간단하게 코드 소개를 하자면..

아래 파일은 mapper에서 수행할 함수와, reducer에서 해야될 함수를 정의해서 입력해 준다.

[#M_View Code..|less..| [CODE js]
-module(test_mapreduce).
-compile(export_all).
-import(lists, [sort/2]).

test(Reducer_number) ->
    wc_dir(“.”, Reducer_number).

wc_dir(Dir, Reducer_number) ->
    F1 = fun generate_words/2,
    F2 = fun count_words/3,
    Files = lib_find:files(Dir, “*.erl”, false),
    L1 = phofs:mapreduce(F1, F2, Files, Reducer_number),
    sort(fun order/2, L1).

order({_, Value1}, {_, Value2}) ->
    if
        Value1 > Value2 ->
            true;
        Value1 =< Value2 ->
            false
    end.
      
generate_words(Pid, File) ->
    F = fun(Word) -> Pid ! {Word, 1} end,
    lib_misc:foreachWordInFile(File, F).

count_words(Key, Vals, A) ->
    [{Key, length(Vals)}|A].
[/CODE]_M#]

아래는 본격적인 MapReduce 모듈이다. 파일 갯수와 원하는 수의 Reducer수에 따라 프로세스를 만들어 프로세싱을 한다.

[#M_View Code..|less..| [CODE js]
-module(phofs).
-export([mapreduce/4, split/2, mergeReduce/2]).

-import(lists, [foreach/2, map/2]).

%% F1(Pid, X) -> sends {Key,Val} messages to Pid
%% F2(Key, [Val], AccIn) -> AccOut

split(L, Num)  ->
    if
        length(L) < Num ->
            [L];
        true ->   
            split(L, Num, length(L) div Num)
    end.

split(L, Num, Len) ->
    case lists:split(Len,L) of
        {First, Second} ->
            if
                length(Second) < Len -> 
                    [First ++ Second];
                true ->
                    [First|split(Second, Num, Len)]
            end   
    end.

mapreduce(F1, F2, L, Red_num) ->
    S = self(),
    Pid_list =  map(fun(X) -> spawn(fun() -> reduce(S, F1, X) end) end, split(L, Red_num)),
    combine(Pid_list, F2).

combine(L, F2) ->
    combine(L, dict:new(), F2).

combine([Pid|T], Dics, F2)  ->
    receive
        {Pid, Dic} ->
            io:format(“receive complete from ~p~n”, [Pid]),
            combine(T, mergeReduce(Dics, Dic), F2)
    end;

combine([], Dict, F2) ->
    Acc = dict:fold(F2, [], Dict),
    Acc.
   

mergeReduce(Dic1, Dic2) ->
    F = fun(_Key, Value1, Value2) ->
            Value1 ++ Value2
        end,
    dict:merge(F, Dic1, Dic2).

reduce(Parent, F1, L) ->
    process_flag(trap_exit, true),
    ReducePid = self(),
    foreach(fun(X) ->
            spawn_link(fun() -> do_job(ReducePid, F1, X) end)
        end, L),
    N = length(L),
    Dict0 = dict:new(),
    Dict1 = collect_replies(N, Dict0),
    Parent ! {self(), Dict1}.

collect_replies(0, Dict) ->
    Dict;

collect_replies(N, Dict) ->
    receive
    {Key, Val} ->
        case dict:is_key(Key, Dict) of
        true ->
            Dict1 = dict:append(Key, Val, Dict),
            collect_replies(N, Dict1);
        false ->
            Dict1 = dict:store(Key,[Val], Dict),
            collect_replies(N, Dict1)
        end;
    {‘EXIT’, _,  _Why} ->
        %io:format(“error ~p~n”, [_Why]),
        collect_replies(N-1, Dict)
    end.

do_job(ReducePid, F, X) ->
    F(ReducePid, X).
[/CODE]
_M#]

아래는 수행된 결과를 의미하는 모니터링 화면이다.
대상이 되는 파일 112개에서 wordcount를 수행했으며, 이 파일들을 파싱하고 카운팅하는데 0.4초 정도 시간이 걸렸다.

사용자 삽입 이미지

CC BY-NC 4.0 Erlang으로 구현해본 멀티 코어 CPU용 MapReduce by from __future__ import dream is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.