2017年12月27日 星期三

[Data Science 到底是什麼從一個完全外行角度來看][09]了解Hadoop裡的MapReduce到底是什麼?

image
圖片來源: https://pixabay.com/en/books-spine-colors-pastel-1099067/https://pixabay.com/en/math-blackboard-education-classroom-1547018/

在上一篇([08]Hadoop 改成完全分散模式)透過複製VM的方式建立出了fully-distributed mode,基本上在這個系列裡面對於Hadoop的介紹也快到了一個尾聲。

不過,還有一個部分被忽略了,也就是實際在Hadoop做運算的程式,也是WordCount的實際運算邏輯。

這篇會介紹MapReduce的概念,並且看一下WordCount的java程式是如何撰寫。

什麼是MapReduce

MapReduce其實是一種開發模式(Program Model),基本上可以把整個邏輯分成為Map階段和Reduce階段。

  • Map階段基本上會做filtering和sorting並且傳出一個key value pair做結果(以wordcount為例,每一個字會作為最後的key,而value則是1代表有一筆)
  • Reduce階段基本上會做整合(以wordcount為例,從Map傳過來的key如果一樣,表示同一個字,因此把一樣的key做加總最後的出總筆數)

從下圖可以看到整個的流程:

image
整個WordCount的MapReduce流程。來源:https://www.mssqltips.com/sqlservertip/3222/big-data-basics--part-5--introduction-to-mapreduce/

input
這個是要做計算的原始資料,以上圖為例其實就是一堆文字清單
split
把input資料做分散處理 - 以hadoop來說,當MapReduce工作被輸入的時候,會被切割到各個cluster裡面等待做處理
map
這個就是MapReduce裡面的Map階段 - 每一個節點會把對應切割出來的資料建立key value結果 - key是字本身,然後value是1代表找到一筆
combine
這個其實也是在map的機器裡面做 - 把每一個key一樣的先做一次加總,避免傳送多次出去
shuffle & sort
在進入reduce階段之前,會先被做一個排序,因此相關的key值會放在一起
reduce
這個階段會做實際的加總,因此每一個key以的的value會被加總
outpu
這個是最後得到的結果

這邊需要注意一下,當提到map和reduce是小寫的時候,指的會是functional programing提供的方法。MapReduce則是開發模式。

上圖雖然用了小寫,不過這邊指的還是hadoop裡面的MapReduce。

Map和Reduce階段回傳的結果都是一個key value pair。

換個方式理解 - 用選舉為例

如果上面那個例子看了還是有點模糊,換個生活遇到的例子作說明

當台灣遇到選舉的時候,一般來說有選舉權的民眾會去戶籍地去做投票 - 投票完有沒有看當天新聞了解這些投票是怎麼計算的嗎?

如果那個時候看新聞,會注意到,會有跑馬燈一直跑說,某某縣市目前xxx有幾票 - 這個票數是及時在變動:

image
選舉的時候新聞及時播放票數。來源:http://my-own-post.com/new20150116/

整個數票的動作其實就是MapReduce。

input
所有有投票的票數就是整個input
split
每個可以投票的民眾去戶籍地投票,同等於把這個input split到不同的區域
map

投票時間截止了之後,每一個投票站會開始從箱子取出來,然後唱名這張票屬於哪個候選人。

每一張票的候選人就是key,然後唱名1票就是value

combine
當每一個投票站都分好了之後,會先做一個初步的加總,得到的每個站的總票數。
shuffle & sort
在這個階段,會把每個投票站同一個候選人(key)的放在一起
reduce
做最後加總 - 把所有一樣key的值加在一起
output
最後結果就是誰當選了

首先,每個可以投票的會去戶籍地做投票的動作,這個其實同等於

怎麼在Hadoop寫MapReduce

希望透過上面的比喻方式,對於整個MapReduce有個更清楚的了解,那在Hadoop裡面怎麼寫MapReduce呢?

Hadoop是java的程式,因此用java寫一定是最容易,下面快速介紹一下如何用java寫MapReduce,大概會分幾個部分:

  • Map
  • Reduce
  • 設定

Map

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException,
        InterruptedException {

            String line = value.toString();

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, one);
            }
        }
}

基本上,上面建立了一個Mapclass繼承Mapper並且定義了一個方法叫做map

Hadoop會把每一段文字個用value傳過來,因此用了tokenizer把裡面的word取出來。

每一個取出來的word,會被寫成一組key value pair(context.write(word,one)),word是key,value是數值1

會一直做,直到整個word都處理完。

Reduce

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
  IOException, InterruptedException {
   int sum = 0;

   for (IntWritable val : values) {
    sum += val.get();
   }

   context.write(key, new IntWritable(sum));
 }
}

Reducer和mapper類似,先定義一個class叫做Reduce繼承Reducer

裡面有一個reduce的程式定義reduce階段要做什麼

在這邊,java已經有處理好把一樣的key放成一組,因此可以透過迴圈的方式把所有值加總。

最後把整個結果寫出去,一樣是key value pair,key還是原來的key,不過value是所有的加總。

設定

Map階段和Reduce階段的功能都定義好了之後,接下來需要做的是告訴程式執行的時候那個是Map和那個是Reduce。

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

 // 剛剛定義的 Map
 ....

 // 剛剛定義的 Reduce
 ....

 public static void main(String[] args) throws Exception {
  JobConf conf = new JobConf(WordCount.class);
  conf.setJobName("wordcount");

  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(IntWritable.class);

  conf.setMapperClass(Map.class);
  conf.setCombinerClass(Reduce.class);
  conf.setReducerClass(Reduce.class);

  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);
 }
}

這個程式應該蠻好理解,基本上就是把剛剛定義好的Map和Reduce做設定。

這邊比較特別是Combiner的部分,因為也是加總所以和reduce是一樣的概念。

結語

透過這篇了解了整個MapReduce的運作機制,並且看了如何用Java寫過一個WordCount的MapReduce程式。

這邊會發現到,程式裡面完全沒有任何分散式處理的概念,但是Hadoop會自動以分散式的模式執行。這個讓撰寫變得非常簡單。

可是另外一個問題會浮現出來,難道只有Java可以寫MapReduce嗎?

在下一篇([10]用.Net Core跑Hadoop MapReduce - Streaming介紹)將會介紹如何用.net core寫出可以再Hadoop透過stream的方式執行的MapReduce,並且這次會改成用docker的方式來執行,提供另外一種更快速和容易測試Hadoop的方式。


沒有留言 :

張貼留言