场景:将Python程序通过hadoop-streaming提交到Hadoop集群执行。
参考:http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

1、Python编写Mapper
   业务逻辑是从会从标准输入(stdin)读取数据,默认以空格分割单词,然后按行输出单词机器出现频率到标准输出(stdout),不过整个Map处理过程并不会统计每个单词出现的总次数,而是直接输出“word,1”,以便作为Reduce的输入进行统计。

代码如下:

#coding:utf-8'''
Created on 2017年6月7日
@author: fjs
'''#!/usr/bin/env python
import sys# input comes from STDIN (standard input)
for line in sys.stdin:# remove leading and trailing whitespaceline = line.strip()# split the line into wordswords = line.split()# increase countersfor word in words:# write the results to STDOUT (standard output);# what we output here will be the input for the# Reduce step, i.e. the input for reducer.py## tab-delimited; the trivial word count is 1print '%s\t%s' % (word, 1)

2、Python编写Reducer
   Reduce代码,它会从标准输入(stdin)读取mapper.py的结果,然后统计每个单词出现的总次数并输出到标准输出(stdout)。
   代码如下:

#coding:utf-8'''
Created on 2017年6月7日
@author: fjs
'''#!/usr/bin/env pythonfrom operator import itemgetter
import syscurrent_word = None
current_count = 0
word = None# input comes from STDIN
for line in sys.stdin:# remove leading and trailing whitespaceline = line.strip()# parse the input we got from mapper.pyword, count = line.split('\t', 1)# convert count (currently a string) to inttry:count = int(count)except ValueError:# count was not a number, so silently# ignore/discard this linecontinue# this IF-switch only works because Hadoop sorts map output# by key (here: word) before it is passed to the reducerif current_word == word:current_count += countelse:if current_word:# write result to STDOUTprint '%s\t%s' % (current_word, current_count)current_count = countcurrent_word = word# do not forget to output the last word if needed!
if current_word == word:print '%s\t%s' % (current_word, current_count)

3、文件准备
   1)将python程序文件上传到Hadoop集群客户机,为文件赋予执行权限
   #chmod +x /data/etlcj/python/mapper.py
   #chmod +x /data/etlcj/python/reducer.py
   2)上传测试文件到集群
   #vi /data/etlcj/python/wcin.txt   加入:

foo foo quux labs foo bar quux abc bar see you by test welcome test abc labs foo me python hadoop ab ac bc bec python

上传到集群
   #hadoop fs -put /data/etlcj/python/wcin.txt  /apps/etlcj/python/

4、基于hadoop-streaming执行MapReduce任务:

执行语句:

#hadoop jar /usr/hdp/2.5.3.0-37/hadoop-mapreduce/hadoop-streaming-2.7.3.2.5.3.0-37.jar -files '/data/etlcj/python/mapper.py,/data/etlcj/python/reducer.py' -input /apps/etlcj/python/wcin.txt -output /apps/etlcj/python/out/ -mapper ./mapper.py -reducer ./reducer.py

执行过程中提示:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 126at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:415)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

怀疑是py脚本代码问题或版本环境不匹配问题,对python语法不熟悉,暂无法深入,但python提交到hadoop集群的方法可以。

5、hadoop-streaming参数参考:
  Usage:hadoop jar $Haoop_Home$/hadoop-streaming-*.jar 
   -input <输入目录> \ # 可以指定多个输入路径,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
   -inputformat <输入格式 JavaClassName> 
   -output <输出目录> 
   -outputformat <输出格式 JavaClassName> 
   -mapper <mapper executable or JavaClassName> 
   -reducer <reducer executable or JavaClassName> 
   -combiner <combiner executable or JavaClassName> 
   -partitioner <JavaClassName> \
   -cmdenv <name=value> \ # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个
   -file <依赖的文件> \ # 配置文件,字典等依赖
   -D <name=value> \ # 作业的属性配置

【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce相关推荐

  1. Python学习系列 -- 改善 Python 程序的 91 个建议

    转载自:https://zhuanlan.zhihu.com/p/32817459 自己写Python也有四五年了,一直是用自己的"强迫症"在维持自己代码的质量,除了Google的 ...

  2. Python学习篇(五) Python中的循环

    文章目录 前言 一.range函数 二.while循环 2.1四步循环法 三.for in 循环 四.流程控制语句 4.1 break 4.2 continue 五.else语句 六.嵌套循环 七.二 ...

  3. Python学习系列(六)(模块)

    Python学习系列(六)(模块) Python学习系列(五)(文件操作及其字典) 一,模块的基本介绍 1,import引入其他标准模块 标准库:Python标准安装包里的模块. 引入模块的几种方式: ...

  4. Python学习系列(五)(文件操作及其字典)

    Python学习系列(五)(文件操作及其字典) Python学习系列(四)(列表及其函数) 一.文件操作 1,读文件      在以'r'读模式打开文件以后可以调用read函数一次性将文件内容全部读出 ...

  5. Python: 学习系列之七:模块、PIPY及Anaconda

    系列 Python: 学习系列之一:Python能做什么 Python: 学习系列之二:基础介绍(int/float/string/range/list/tuple/dict/set) Python: ...

  6. Python学习教程(Python学习路线):Day14A-网络编程入门

    Python学习教程(Python学习路线):网络编程入门 计算机网络基础 计算机网络是独立自主的计算机互联而成的系统的总称,组建计算机网络最主要的目的是实现多台计算机之间的通信和资源共享.今天计算机 ...

  7. Python学习22:Python之禅和PEP 8规范

    笔者:风起怨江南 出处:https://blog.csdn.net/JackMengJin 笔者原创,文章转载需注明,如果喜欢请点赞+关注,感谢支持! 导读:Python之禅和PEP 8规范,值得所有 ...

  8. python学习一(python与pip工具下载与安装)

    python学习一(python与pip工具下载与安装)  一 Python下载  二 安装Python  三 安装 pip   3.1 采用cd命令进入到Scripts 目录下面   3.2 输入命 ...

  9. Python学习教程(Python学习路线):Python面试100题(二)

    Python学习教程(Python学习路线):面试题接着给大家整理! 16.<div class="nam">中国</div>,用正则匹配出标签里面的内容( ...

最新文章

  1. c#中接口的使用方法图解_C#图解教程 第十五章 接口
  2. 关于数据库的增删改查
  3. C/C++ 中的0长数组(柔性数组)
  4. SQL Server 扩展事件系列 (1 of 31) -- 扩展事件概述
  5. Shell脚本函数(函数传参、递归、创建库)
  6. 内核编程小结(引用)
  7. 创新 - 王屋村的魔方们
  8. 自旋锁 Linux内核,Linux内核中的自旋锁
  9. 关于C# DataGridView 全选与取消的小问题
  10. c语言必背的100代码
  11. 苹果录屏功能没有声音_安卓最高清的录屏软件,没有之一,已解锁VIP功能!
  12. 觉得小鹤双拼鹤形超级难的看一下-by老随风-2015-05-22
  13. CSS —— 选择器
  14. Leecode 55跳跃游戏
  15. Carson带你学数据结构:手把手带你了解 ”图“ 所有知识!(含DFS、BFS)
  16. 【07】概率图推断之信念传播
  17. 国外智能化农机装备简介
  18. UGUI 源码之 RectMask2D、Clipping、RectangularVertexClipper
  19. 计算机毕业设计php旅游网站的设计与实现
  20. weblogic的集群与配置--架构师第九天

热门文章

  1. 大数据:Hive和Hbase的区别于优势
  2. python16进制字节序_第 1 章 套接字、IPv4和简单的客户端/服务器编程
  3. win2012中让IIS同时支持多版本ASP.NET 3.5/4.0/4.5的方法
  4. BZOJ.4888.[TJOI2017]异或和(树状数组)
  5. 06.正则表达式基本知识
  6. linux虚拟机安装oracle全过程(一)
  7. ASP.net之策略模式
  8. easyui 行编辑修改
  9. flush privileges 什么意思
  10. 修改Linux内核的printk缓冲区(log缓冲区)大小