hadoop提供了自动分发文件也压缩包的功能,只需要在启动hadoop streaming作业的时候增加响应的配置参数(-file)即可实现。

在执行streaming程序时,使用 -file 选项指定需要分发的本地文件;





  • wordwhite (只统计的单词)
$ vim wordwhite
  • mapper程序
$ vim mapper.py #!/usr/bin/env pythonimport sysdef read_wordwhite(file):word_set = set()with open(file, 'r') as fd:for line in fd:word = line.strip()word_set.add(word)return word_setdef mapper(file_fd):word_set = read_wordwhite(file_fd)for line in sys.stdin:line = line.strip()words = line.split()for word in words:if word != "" and (word in word_set):print "%s\t%s" %(word, 1)if __name__ == "__main__":if sys.argv[1]:file_fd = sys.argv[1]mapper(file_fd)
  • reducer程序
 vim reducer.py #!/usr/bin/env pythonimport sysdef reducer():current_word = Noneword_sum = 0for line in sys.stdin:word_list = line.strip().split('\t')if len(word_list) < 2:continueword = word_list[0].strip()word_value = word_list[1].strip()if current_word == None:current_word = wordif current_word != word:print "%s\t%s" %(current_word, str(word_sum))current_word = wordword_sum = 0word_sum += int(word_value)print "%s\t%s" %(current_word, str(word_sum))if __name__ == "__main__":reducer()
  • run_streaming程序
$ vim runstreaming.sh#!/bin/bash

$HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH \-output $OUTPUT_FILE_PATH \-mapper "python mapper.py wordwhite" \-reducer "python reducer.py" \-file ./mapper.py \-file ./reducer.py \-file ./wordwhite
  • 执行程序
    首先需要将测试的文件:The_Man_of_Property 上传到hdfs,同时创建wordcount输出目录;

    $ hadoop fs -put ./The_Man_of_Property /input/
    $ hadoop fs -mkdir /output/wordcount

    注:本次hadoop环境是伪分布式,hadoop 2.6版本。

$ ./runstreaming.sh
18/01/26 13:30:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./mapper.py, ./reducer.py, ./wordwhite, /tmp/hadoop-unjar7204532228900236640/] [] /tmp/streamjob7580948745512643345.jar tmpDir=null
18/01/26 13:30:29 INFO client.RMProxy: Connecting to ResourceManager at /
18/01/26 13:30:29 INFO client.RMProxy: Connecting to ResourceManager at /
18/01/26 13:30:31 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/26 13:30:31 INFO mapreduce.JobSubmitter: number of splits:2
18/01/26 13:30:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1516345010544_0008
18/01/26 13:30:32 INFO impl.YarnClientImpl: Submitted application application_1516345010544_0008
18/01/26 13:30:32 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1516345010544_0008/
18/01/26 13:30:32 INFO mapreduce.Job: Running job: job_1516345010544_0008
18/01/26 13:30:40 INFO mapreduce.Job: Job job_1516345010544_0008 running in uber mode : false
18/01/26 13:30:40 INFO mapreduce.Job:  map 0% reduce 0%
18/01/26 13:30:50 INFO mapreduce.Job:  map 50% reduce 0%
18/01/26 13:30:51 INFO mapreduce.Job:  map 100% reduce 0%
18/01/26 13:30:58 INFO mapreduce.Job:  map 100% reduce 100%
18/01/26 13:30:59 INFO mapreduce.Job: Job job_1516345010544_0008 completed successfully
18/01/26 13:30:59 INFO mapreduce.Job: Counters: 49File System CountersFILE: Number of bytes read=73950FILE: Number of bytes written=582815FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=636501HDFS: Number of bytes written=27HDFS: Number of read operations=9HDFS: Number of large read operations=0HDFS: Number of write operations=2Job Counters Launched map tasks=2Launched reduce tasks=1Data-local map tasks=2Total time spent by all maps in occupied slots (ms)=12815Total time spent by all reduces in occupied slots (ms)=5251Total time spent by all map tasks (ms)=12815Total time spent by all reduce tasks (ms)=5251Total vcore-milliseconds taken by all map tasks=12815Total vcore-milliseconds taken by all reduce tasks=5251Total megabyte-milliseconds taken by all map tasks=13122560Total megabyte-milliseconds taken by all reduce tasks=5377024Map-Reduce FrameworkMap input records=2866Map output records=9243Map output bytes=55458Map output materialized bytes=73956Input split bytes=198Combine input records=0Combine output records=0Reduce input groups=3Reduce shuffle bytes=73956Reduce input records=9243Reduce output records=3Spilled Records=18486Shuffled Maps =2Failed Shuffles=0Merged Map outputs=2GC time elapsed (ms)=332CPU time spent (ms)=3700Physical memory (bytes) snapshot=707719168Virtual memory (bytes) snapshot=8333037568Total committed heap usage (bytes)=598736896Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format Counters Bytes Read=636303File Output Format Counters Bytes Written=27
18/01/26 13:30:59 INFO streaming.StreamJob: Output directory: /output/wordcount/wordwhitetest
  • 查看结果

    $ hadoop fs -ls /output/wordcount/wordwhitetest/
    Found 2 items
    -rw-r--r--   1 centos supergroup          0 2018-01-26 13:30 /output/wordcount/wordwhitetest/_SUCCESS
    -rw-r--r--   1 centos supergroup         27 2018-01-26 13:30 /output/wordcount/wordwhitetest/part-00000
    $ hadoop fs -text /output/wordcount/wordwhitetest/part-00000
    and 2573
    had 1526
    the 5144


2、hadoop streaming 语法参考

  • http://blog.51cto.com/balich/2065419
本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2065424

