

下面来从零开发一个MapReduce程序,并在hadoop集群上运行。
mapper代码 map.py:
import sys
for line in sys.stdin:
word_list = line.strip().split(' ')
for word in word_list: print ' '.join([word.strip(), str(1)])View Code
reducer代码 reduce.py:
import sys
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split(' ')
if len(ss) < 2: continue
word = ss[0].strip()
count = ss[1].strip()
if cur_word == None:
cur_word = word
if cur_word != word: print ' '.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(count)
print ' '.join([cur_word, str(sum)])
sum = 0View Code
资源文件 src.txt(测试用,在集群中跑时,记得上传到hdfs上):
hello ni hao ni haoni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ao ni haoni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni haoao ni haoni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao ni hao Dad would get out his mandolin and play for the family Dad loved to play the mandolin for his family he knew we enjoyed singing I had to mature into a man and have children of my own before I realized how much he had sacrificed I had to,mature into a man and,have children of my own before.I realized how much he had sacrificed
View Code
首先本地调试查看结果是否正确,输入命令以下:
cat src.txt | python map.py | sort -k 1 | python reduce.py
命令行中输出的结果:
a 2 and 2 and,have 1 ao 1 before 1 before.I 1 children 2 Dad 2 enjoyed 1 family 2 for 2 get 1 had 4 hao 33 haoao 1 haoni 3 have 1 he 3 hello 1 his 2 how 2 I 3 into 2 knew 1 loved 1 man 2 mandolin 2 mature 1 much 2 my 2 ni 34 of 2 out 1 own 2 play 2 realized 2 sacrificed 2 singing 1 the 2 to 2 to,mature 1 we 1 would 1
View Code
通过调试发现本地调试,代码是OK的。下面扔到集群上面跑。为了方便,专门写了一个脚本 run.sh,解放劳动力嘛。
HADOOP_CMD="/home/hadoop/hadoop/bin/hadoop" STREAM_JAR_PATH="/home/hadoop/hadoop/contrib/streaming/hadoop-streaming-1.2.1.jar" INPUT_FILE_PATH="/home/input/src.txt" OUTPUT_PATH="/home/output" $HADOOP_CMD fs -rmr $OUTPUT_PATH $HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH -output $OUTPUT_PATH -mapper "python map.py" -reducer "python reduce.py" -file ./map.py -file ./reduce.py
下面解析下脚本:
HADOOP_CMD: hadoop的bin的路径 STREAM_JAR_PATH:streaming jar包的路径 INPUT_FILE_PATH:hadoop集群上的资源输入路径 OUTPUT_PATH:hadoop集群上的结果
输入以下命令查看经过reduce阶段后输出的记录:
cat src.txt | python map.py | sort -k 1 | python reduce.py | wc -l 命令行中
在浏览器输入:master:50030 查看任务的详细情况。
Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts map 100.00% 2 0 0 2 0 0 / 0 reduce 100.00% 1 0 0 1 0 0 / 0
Map-Reduce Framework中看到这个。
Counter Map Reduce Total Reduce output records 0 0 43
证明整个过程成功。第一个hadoop程序开发结束。
