原文:http://www.cnblogs.com/kaituorensheng/p/3826114.html

阅读目录

  • 1. Python MapReduce 代码
  • 2. 在Hadoop上运行python代码
  • 3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码
  • 4. 参考

尽管Hadoop框架是用java写的,但是Hadoop程序不限于java,可以用python、C++、ruby等。本例子中直接用python写一个MapReduce实例,而不是用Jython把python代码转化成jar文件。

例子的目的是统计输入文件的单词的词频。

  • 输入:文本文件
  • 输出:文本(每行包括单词和单词的词频,两者之间用'\t'隔开)
回到顶部

1. Python MapReduce 代码

使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。

我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。

1.1 Map阶段:mapper.py

在这里,我们假设把文件保存到hadoop-0.20.2/test/code/mapper.py

#!/usr/bin/env python
import sys
for line in sys.stdin:line = line.strip()words = line.split()for word in words:print "%s\t%s" % (word, 1)

文件从STDIN读取文件。把单词切开,并把单词和词频输出STDOUT。Map脚本不会计算单词的总数,而是输出<word> 1。在我们的例子中,我们让随后的Reduce阶段做统计工作。

为了是脚本可执行,增加mapper.py的可执行权限

chmod +x hadoop-0.20.2/test/code/mapper.py

1.2 Reduce阶段:reducer.py

在这里,我们假设把文件保存到hadoop-0.20.2/test/code/reducer.py

#!/usr/bin/env python
from operator import itemgetter
import syscurrent_word = None
current_count = 0
word = Nonefor line in sys.stdin:line = line.strip()word, count = line.split('\t', 1)try:count = int(count)except ValueError:  #count如果不是数字的话,直接忽略掉continueif current_word == word:current_count += countelse:if current_word:print "%s\t%s" % (current_word, current_count)current_count = countcurrent_word = wordif word == current_word:  #不要忘记最后的输出print "%s\t%s" % (current_word, current_count)

文件会读取mapper.py 的结果作为reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。

为了是脚本可执行,增加reducer.py的可执行权限

chmod +x hadoop-0.20.2/test/code/reducer.py

细节:split(chara, m),第二个参数的作用,下面的例子很给力

str = 'server=mpilgrim&ip=10.10.10.10&port=8080'
print str.split('=', 1)[0]  #1表示=只截一次
print str.split('=', 1)[1]
print str.split('=')[0]
print str.split('=')[1]

输出

1
2
3
4
server
mpilgrim&ip=10.10.10.10&port=8080
server
mpilgrim&ip 

1.3 测试代码(cat data | map | sort | reduce)

这里建议大家在提交给MapReduce job之前在本地测试mapper.py 和reducer.py脚本。否则jobs可能会成功执行,但是结果并非自己想要的。

功能性测试mapper.py 和 reducer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[rte@hadoop-0.20.2]$cd test/code
[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py
foo 1
foo 1
quux    1
labs    1
foo 1
bar 1
quux    1
[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
bar 1
foo 3
labs    1
quux    2

细节:sort -k1,1  参数何意?

-k, -key=POS1[,POS2]     键以pos1开始,以pos2结束

有时候经常使用sort来排序,需要预处理把需要排序的field语言在最前面。实际上这是

完全没有必要的,利用-k参数就足够了。

比如sort all

1
2
3
4
5
1 4
2 3
3 2
4 1
5 0

如果sort -k 2的话,那么执行结果就是

1
2
3
4
5
5 0
4 1
3 2
2 3
1 4

回到顶部

2. 在Hadoop上运行python代码

2.1 数据准备

下载以下三个文件的

  • Plain Text UTF-8
  • Plain Text UTF-8
  • Plain Text UTF-8

我把上面三个文件放到hadoop-0.20.2/test/datas/目录下

2.2 运行

把本地的数据文件拷贝到分布式文件系统HDFS中。

bin/hadoop dfs -copyFromLocal /test/datas  hdfs_in

查看

bin/hadoop dfs -ls

结果

1
drwxr-xr-x   - rte supergroup          0 2014-07-05 15:40 /user/rte/hdfs_in

查看具体的文件

bin/hadoop dfs -ls /user/rte/hdfs_in

执行MapReduce job

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file test/code/mapper.py     -mapper test/code/mapper.py \
-file test/code/reducer.py    -reducer test/code/reducer.py \
-input /user/rte/hdfs_in/*    -output /user/rte/hdfs_out

实例输出

查看输出结果是否在目标目录/user/rte/hdfs_out

bin/hadoop dfs -ls /user/rte/hdfs_out

输出

1
2
3
Found 2 items
drwxr-xr-x   - rte supergroup          0 2014-07-05 20:51 /user/rte/hdfs_out2/_logs
-rw-r--r--   2 rte supergroup     880829 2014-07-05 20:51 /user/rte/hdfs_out2/part-00000

查看结果

bin/hadoop dfs -cat /user/rte/hdfs_out2/part-00000

输出

以上已经达成目的了,但是可以利用python迭代器和生成器优化

回到顶部

3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码

3.1 python中的迭代器和生成器

看这

3.2 优化Mapper 和 Reducer代码

mapper.py

#!/usr/bin/env python
import sys
def read_input(file):for line in file:yield line.split()def main(separator='\t'):data = read_input(sys.stdin)for words in data:for word in words:print "%s%s%d" % (word, separator, 1)if __name__ == "__main__":main()

reducer.py

#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sysdef read_mapper_output(file, separator = '\t'):for line in file:yield line.rstrip().split(separator, 1)def main(separator = '\t'):data = read_mapper_output(sys.stdin, separator = separator)for current_word, group in groupby(data, itemgetter(0)):try:total_count = sum(int(count) for current_word, count in group)print "%s%s%d" % (current_word, separator, total_count)except valueError:passif __name__ == "__main__":main()

细节:groupby

from itertools import groupby
from operator import itemgetterthings = [('2009-09-02', 11),('2009-09-02', 3),('2009-09-03', 10),('2009-09-03', 4),('2009-09-03', 22),('2009-09-06', 33)]sss = groupby(things, itemgetter(0))
for key, items in sss:print keyfor subitem in items:print subitemprint '-' * 20

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
>>>
2009-09-02
('2009-09-02'11)
('2009-09-02'3)
--------------------
2009-09-03
('2009-09-03'10)
('2009-09-03'4)
('2009-09-03'22)
--------------------
2009-09-06
('2009-09-06'33)
--------------------

  • groupby(things, itemgetter(0)) 以第0列为排序目标
  • groupby(things, itemgetter(1))以第1列为排序目标
  • groupby(things)以整行为排序目标
回到顶部

4. 参考

python中的split函数中的参数问题

Writing an Hadoop MapReduce Program in Python

shell的sort命令的-k参数

转载于:https://www.cnblogs.com/zhizhan/p/5776373.html

用python写MapReduce函数——以WordCount为例相关推荐

  1. python写mapreduce_用python写MapReduce函数——以WordCount为例

    使用 python 写 MapReduce 的 " 诀窍 " 是利用 Hadoop 流的 API ,通过 STDIN( 标准输入 ) . STDOUT( 标准输出 ) 在 Map ...

  2. 自动化测试(三)如何用python写一个函数,这个函数的功能是,传入一个数字,产生N条邮箱,产生的邮箱不能重复。...

    写一个函数,这个函数的功能是,传入一个数字,产生N条邮箱,产生的邮箱不能重复.邮箱前面的长度是6-12之间,产生的邮箱必须包含大写字母.小写字母.数字和特殊字符 和上一期一样 代码中间有段比较混沌 有 ...

  3. 用python写一个函数_Python基础-函数篇

    1. 函数基本语法及特性 2. 参数与局部变量 3. 返回值 嵌套函数 4.递归 5.匿名函数 6.函数式编程介绍 7.高阶函数 8.内置函数 函数与函数式编程 1.面向对象: 华山派----> ...

  4. MapReduce函数实现WordCount

    简单介绍下词频分析,就是统计一个文件中的字段名的出现的次数. 实现过程: 1.创建maven工程 在pom.xml中加入配置信息 以下内容可以在maven 中心仓库中找到 <!-- https: ...

  5. python写公式函数_python的数学算法函数及公式用法

    之前老是跟大家说看久了Python,总感觉就像是很多的数学公式运算,大家一致觉得只是一点点像,那今天跟大家直接就说下叫"数学"算法的内容,这样大家再来品鉴下,是不是可以贯通使用的内 ...

  6. python写公式函数_根据公式生成函数(Python)

    让我们把这个分解一下.首先,如何创建函数pathlength?像这样:def pathlength(x, y): return 42 # The length is 42 miles 如您所见,它接受 ...

  7. 关于使用Python——写运用函数和字典完成手机销售系统

    方法一 phone_info = [{'name':'vivox9', 'price':'1200', 'count':'30'}, {'name':'iphone6', 'price':'2000' ...

  8. python写接口函数_python接口自动化测试二十:函数写接口测试

    # coding:utf-8 import requests import re from bs4 import BeautifulSoup # s = requests.session() # 全局 ...

  9. python怎么设置回文数_python如何写一个函数判断回文数?

    python如何写一个函数判断回文数? python写一个函数判断回文数的方法: 设n是一任意自然数.若将n的各位数字反向排列所得自然数n1与n相等,则称n为一回文数.例如,若n=1234321,则称 ...

  10. python回文数判定_python如何写一个函数判断回文数?

    python如何写一个函数判断回文数? python写一个函数判断回文数的方法: 设n是一任意自然数.若将n的各位数字反向排列所得自然数n1与n相等,则称n为一回文数.例如,若n=1234321,则称 ...

最新文章

  1. easymock快速入门
  2. ARouter源码探究
  3. java_Socket简单使用方法
  4. [C++][线程安全]单例模式下双检查锁和线程
  5. 串口通讯编程一日通2(Overlapped IO模型)
  6. 卡内基梅隆大学和斯坦福计算机,卡内基梅隆大学并列全美榜首的专业--计算机专业...
  7. 【Julia】ERROR: UndefVarError: linspace not defined
  8. 从安全和不安全两个角度,教你如何发布对象(含各种单例代码)
  9. Spring boot 配置文件,输入key值,自动补全--- 通过安装插件实现
  10. GitHub更新已经fork的项目
  11. QTTabBar安装后不生效
  12. Linux固态硬盘 设置写入缓存,Win10下的写入缓存策略严重影响SSD硬盘的性能!
  13. 2022年终总结:少年不惧岁月长,彼方尚有荣光在。
  14. 概率论复习笔记二——离散型分布和连续型分布
  15. 微信小程序video组件调用腾讯视频的解析
  16. 微信小程序之个人中心静态页面
  17. 如何修改已提交commit信息
  18. 一键装机linux_linux系统学习第十八天《搭建一键装机平台》终结篇
  19. 设施网络选址的基本方法,网络设施选址的方法
  20. cmd imp导入dmp文件_导入Oracle的dmp备份的dmp文件报错“IMP-00002:无法打开c:/Documents.DMP进行读取”...

热门文章

  1. linux中程序定时重启脚本,linux 程序定时重启脚本
  2. linux该专接本还是工作_先专接本还是先工作?
  3. qt设置背景图片变黑色_PS软件如何快速制作一个黑色创意海报
  4. android通过代码设置铃声_让你的手机铃声与众不同 (附ios音乐dj)
  5. SqlDataAdapter的增加,删除,修改
  6. windows server 系统SERVER服务消失无法共享
  7. 201621123079《Java程序设计》第1周学习总结
  8. C++Builder 2010深入TForm类之窗口与窗体
  9. 在Linux中,用.swp文件恢复未保存的文件
  10. 64-bit and iOS 8 Requirements for New Apps