2017年12月30日 星期六

[Data Science 到底是什麼從一個完全外行角度來看][10]用.Net Core跑Hadoop MapReduce - Streaming介紹

image
圖片來源: https://pixabay.com/en/books-spine-colors-pastel-1099067/https://pixabay.com/en/math-blackboard-education-classroom-1547018/

上一篇([09]了解Hadoop裡的MapReduce到底是什麼?)了解了什麼是MapReduce,並且了解了怎麼用Java寫一個MapReduce的Hello World程式:WordCount。

馬上會想到的一個問題是,難道只有Java可以寫MapReduce的程式嗎?

這篇將會介紹Hadoop的Streaming服務,讓任何語言只要透過Standard Input和Standard Output就可以寫出MapReduce程式。 將會使用最熟悉的語言,.Net Core來完成這個事情。

在這篇也會介紹另外一種測試Hadoop的方式,使用Docker來測試。

什麼是Hadoop Streaming

當一個MapReduce的程式被執行的時候,會先被切割成為一個一個的Task,然後由那台的DataNode用Java執行那個Task。

所以整個執行類似下圖,整個MapReduce都在JVM的環境下:

image
JVM的MapReduce

不過Hadoop考量到如果外部需要執行MapReduce要怎麼辦,因此建立了一個叫做Streaming的功能。

基本上,只要那台DataNode可以Run的起來都可以跑。

Hadoop Streaming透過Standard Input/Output/Error 3個管道 來和被Run起來的程式溝通。

MapReduce的程式只需要從Standard Input讀進來,做處理,然後在寫到Output。如果有錯誤訊息可以記錄在Error裡面。

整個概念大概是:

image
Hadoop Streaming

實際操作

還記得整個MapReduce基本上就是在每個階段做過處理之後,會產生一個key value pair。Hadoop用tab來切割Key 和 Value。

有了這個概念之後來看實際程式,以下使用的是.Net Core的console來開發,分幾個階段:

  1. Mapper開發
  2. Reducer開發
  3. 測試結果

Mapper開發

由於是透過Standard Input/Output,因此console非常適合,所以會建立一個Mapper的.Net Core Console程式。

在Mapper的階段,內容會是一行一行讀進來,所以把讀進來的內容做文字切割, 每找到一個word,就寫到output,word是key1是value(代表找到一筆)

會一直迴圈的讀,直到沒有任何檔案為止。如果把這個和之前java比照會發現邏輯一樣。

class Program
{
    static void Main(string[] args)
    {
        string line;

        while ((line = Console.ReadLine()) != null)
        {
            // 用文字切割
            var words = Regex.Matches(line, @"[\w]+");

            foreach (var word in words)
            {
                // 每一個找到的算1筆 - keyvalue用tab切割
                Console.WriteLine("{0}\t1", word);
            }
        }
    }
}

Reducer開發

會在建立另外一個專案用來放Reducer的程式。

Reducer一樣是讀Input然後寫到output。由於這次讀到的內容是從Mapper來的,所以會先用tab做切割,key是word,value就是筆數(也都是1)。

在這邊,有建立一個words dictionary,這個是因為在Mapper階段其實沒有管word有沒有重複,反正出現就是+1。

不過在Reducer因為要加總,因此用了words dictionary作為一個暫存的空間。

最後把所有結果寫到output - 也是 key value pair,key一樣是word,不過value就是word出現的總數。

static void Main(string[] args)
{
    // 用來儲存已經出現過的字 - java版本會自動處理,不過這個stream需要手動記錄
    Dictionary<string, int> words = new Dictionary<string, int>();

    string line;

    while ((line = Console.ReadLine()) != null)
    {
        // 傳過來的key value用tab分割(Mapper也是用tab切割key和value)
        var keyValuePair = line.Split('\t');

        string word = keyValuePair[0];

        int count = Convert.ToInt32(keyValuePair[1]);

        // 如果已經有這個word,和字典的加總,不然就建立新的
        if (words.ContainsKey(word))
        {
            words[word] += count;
        }
        else
        {
            words.Add(word, count);
        }
    }

    // 把所有結果寫出來
    foreach (var word in words)
    {
        Console.WriteLine("{0}\t{1}", word.Key, word.Value);
}
和Java的版本不同,java版本會自動幫忙把key一樣組成一個list比較好操作,但是透過streaming需要自己手動操作。

測試結果

當整個程式準備好了之後,接下來就可以對這個程式做測試了。

在接下來將會用一個docker版本的hadoop做測試 - 希望透過docker方式也可以了解用docker做測試有多方便。

接下來的測試都是在powershell可以直接執行。

如果對docker不熟悉,那麼下面做不了。要跑docker基本上要Windows 10 Professional以上或者linux,並且有裝docker。

裡面用到的docker image是一個linux的container。

下面也可以直接在之前建立的Ubuntu環境裡面執行,不過需要先:

  • 安裝.net core 2.0
  • 跳過前面的步奏,知道後面呼叫hadoop Streaming那段即可

接下來的指令操作都是在從github clone下來的專案裡面src\chapter-10-dotnet-mapreduce的資料夾下面執行。

完整的指令是:

git clone https://github.com/alantsai/blog-data-science-series.git
cd .\blog-data-science-series\src\chapter-10-dotnet-mapreduce
                        
先把.net core的console 發佈出來
在powershell執行指令:dotnet publish -o ${pwd}\dotnetmapreduce .\DotNetMapReduceWordCount\DotNetMapReduceWordCount.sln
image
發佈結果
把hadoop用docker compose啟動
使用指令把hadoop啟動:docker-compose up -d。 會看到:
  1. 執行完有1個master 2個worker啟動
  2. 在YARN的web節點看到有兩個Node
  3. 在DataNode看到有兩個節點
image
可以看到啟動成功並且有兩個節點
把.Net core程式複製到master的hadoop節點裡面
把剛剛發佈出來的.Net core程式複製到master裡面,並且進入到master裡面的bash並且可以看到有copy進去的內容
docker cp dotnetmapreduce hadoop-dotnet-master:/dotnetmapreduce
docker exec -it hadoop-dotnet-master bash
ls
ls /dotnetmapreduce
image
進入到master的bash並且檢查copy是否成功
把要計算的檔案放到hadoop的HDFS
透過下面指令把檔案放到hadoop的HDFS的input資料夾並且檢查:
hadoop fs -mkdir -p /input
hadoop fs -copyFromLocal /dotnetmapreduce/jane_austen.txt /input
hadoop fs -ls /input
                        
image
複製檔案到HDFS
用hadoop Streaming執行net core mapreduce
用hadoop的streaming執行:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
    -files "/dotnetmapreduce" \
    -mapper "dotnet dotnetmapreduce/DotNetMapReduceWordCount.Mapper.dll" \
    -reducer  "dotnet dotnetmapreduce/DotNetMapReduceWordCount.Reducer.dll" \
    -input /input/* -output /output
                        
image
和之前執行map reduce的log一樣
檢查結果
執行完了之後,可以看到計算的每個字出現次數
hadoop fs -ls /output
hadoop fs -cat /output/part-00000
image
執行結果
會注意到這邊的結果和java版本有點不同,因為判斷字的邏輯不同導致。
如果docker不需要了,可以用docker-compose down把整個hadoop關掉。

結語

在這篇介紹了透過Hadoop Streaming達到在hadoop用.Net core 2.0的console程式做MapReduce如何。

這篇也改成使用docker來做hadoop測試而不是用一直以來建立的VM。用docker和VM比較會發現到docker其實做這種事情非常方便,如果對docker不熟悉,可以考慮花點時間做些學習(之後我的部落格也會有個系列介紹docker使用,有興趣的話請持續關注)。

在這個系列的後面,之前建立的VM還會用到 - 用來和R做結合。所以如果對後面操作有興趣,VM還是先保留。

在這個系列的Hadoop介紹也到了一個尾聲,在下一篇([11]Hadoop總結(上篇)–Ecosystem介紹)將會對目前hadoop有介紹的部分做一個總結,介紹hadoop的ecosystem,和還有什麼部分是應該繼續關注下去。


沒有留言 :

張貼留言