用python写MapReduce函数——以WordCount为例
原文:http://www.cnblogs.com/kaituorensheng/p/3826114.html
阅读目录
- 1. Python MapReduce 代码
- 2. 在Hadoop上运行python代码
- 3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码
- 4. 参考
尽管Hadoop框架是用java写的,但是Hadoop程序不限于java,可以用python、C++、ruby等。本例子中直接用python写一个MapReduce实例,而不是用Jython把python代码转化成jar文件。
例子的目的是统计输入文件的单词的词频。
- 输入:文本文件
- 输出:文本(每行包括单词和单词的词频,两者之间用'\t'隔开)
1. Python MapReduce 代码
使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。
我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。
1.1 Map阶段:mapper.py
在这里,我们假设把文件保存到hadoop-0.20.2/test/code/mapper.py
#!/usr/bin/env python import sys for line in sys.stdin:line = line.strip()words = line.split()for word in words:print "%s\t%s" % (word, 1)
文件从STDIN读取文件。把单词切开,并把单词和词频输出STDOUT。Map脚本不会计算单词的总数,而是输出<word> 1。在我们的例子中,我们让随后的Reduce阶段做统计工作。
为了是脚本可执行,增加mapper.py的可执行权限
chmod +x hadoop-0.20.2/test/code/mapper.py
1.2 Reduce阶段:reducer.py
在这里,我们假设把文件保存到hadoop-0.20.2/test/code/reducer.py
#!/usr/bin/env python from operator import itemgetter import syscurrent_word = None current_count = 0 word = Nonefor line in sys.stdin:line = line.strip()word, count = line.split('\t', 1)try:count = int(count)except ValueError: #count如果不是数字的话,直接忽略掉continueif current_word == word:current_count += countelse:if current_word:print "%s\t%s" % (current_word, current_count)current_count = countcurrent_word = wordif word == current_word: #不要忘记最后的输出print "%s\t%s" % (current_word, current_count)
文件会读取mapper.py 的结果作为reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。
为了是脚本可执行,增加reducer.py的可执行权限
chmod +x hadoop-0.20.2/test/code/reducer.py
细节:split(chara, m),第二个参数的作用,下面的例子很给力
str = 'server=mpilgrim&ip=10.10.10.10&port=8080' print str.split('=', 1)[0] #1表示=只截一次 print str.split('=', 1)[1] print str.split('=')[0] print str.split('=')[1]
输出
1
2
3
4
|
server
mpilgrim&ip = 10.10 . 10.10 &port = 8080
server
mpilgrim&ip
|
1.3 测试代码(cat data | map | sort | reduce)
这里建议大家在提交给MapReduce job之前在本地测试mapper.py 和reducer.py脚本。否则jobs可能会成功执行,但是结果并非自己想要的。
功能性测试mapper.py 和 reducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[rte@hadoop - 0.20 . 2 ]$cd test / code
[rte@code]$echo "foo foo quux labs foo bar quux" | . / mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
[rte@code]$echo "foo foo quux labs foo bar quux" | . / mapper.py | sort - k1, 1 | . / reducer.py
bar 1
foo 3
labs 1
quux 2
|
细节:sort -k1,1 参数何意?
-k, -key=POS1[,POS2] 键以pos1开始,以pos2结束
有时候经常使用sort来排序,需要预处理把需要排序的field语言在最前面。实际上这是
完全没有必要的,利用-k参数就足够了。
比如sort all
1
2
3
4
5
|
1 4
2 3
3 2
4 1
5 0
|
如果sort -k 2的话,那么执行结果就是
1
2
3
4
5
|
5 0
4 1
3 2
2 3
1 4
|
2. 在Hadoop上运行python代码
2.1 数据准备
下载以下三个文件的
- Plain Text UTF-8
- Plain Text UTF-8
- Plain Text UTF-8
我把上面三个文件放到hadoop-0.20.2/test/datas/目录下
2.2 运行
把本地的数据文件拷贝到分布式文件系统HDFS中。
bin/hadoop dfs -copyFromLocal /test/datas hdfs_in
查看
bin/hadoop dfs -ls
结果
1
|
drwxr - xr - x - rte supergroup 0 2014 - 07 - 05 15 : 40 / user / rte / hdfs_in
|
查看具体的文件
bin/hadoop dfs -ls /user/rte/hdfs_in
执行MapReduce job
bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \ -file test/code/mapper.py -mapper test/code/mapper.py \ -file test/code/reducer.py -reducer test/code/reducer.py \ -input /user/rte/hdfs_in/* -output /user/rte/hdfs_out
实例输出
查看输出结果是否在目标目录/user/rte/hdfs_out
bin/hadoop dfs -ls /user/rte/hdfs_out
输出
1
2
3
|
Found 2 items
drwxr-xr-x - rte supergroup 0 2014 -07 -05 20: 51 /user/rte/hdfs_out 2 /_logs
-rw-r--r-- 2 rte supergroup 880829 2014 -07 -05 20: 51 /user/rte/hdfs_out 2 /part -00000
|
查看结果
bin/hadoop dfs -cat /user/rte/hdfs_out2/part-00000
输出
以上已经达成目的了,但是可以利用python迭代器和生成器优化
3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码
3.1 python中的迭代器和生成器
看这
3.2 优化Mapper 和 Reducer代码
mapper.py
#!/usr/bin/env python import sys def read_input(file):for line in file:yield line.split()def main(separator='\t'):data = read_input(sys.stdin)for words in data:for word in words:print "%s%s%d" % (word, separator, 1)if __name__ == "__main__":main()
reducer.py
#!/usr/bin/env python from operator import itemgetter from itertools import groupby import sysdef read_mapper_output(file, separator = '\t'):for line in file:yield line.rstrip().split(separator, 1)def main(separator = '\t'):data = read_mapper_output(sys.stdin, separator = separator)for current_word, group in groupby(data, itemgetter(0)):try:total_count = sum(int(count) for current_word, count in group)print "%s%s%d" % (current_word, separator, total_count)except valueError:passif __name__ == "__main__":main()
细节:groupby
from itertools import groupby from operator import itemgetterthings = [('2009-09-02', 11),('2009-09-02', 3),('2009-09-03', 10),('2009-09-03', 4),('2009-09-03', 22),('2009-09-06', 33)]sss = groupby(things, itemgetter(0)) for key, items in sss:print keyfor subitem in items:print subitemprint '-' * 20
结果
1
2
3
4
5
6
7
8
9
10
11
12
13
|
>>>
2009 -09 -02
( '2009-09-02' , 11 )
( '2009-09-02' , 3 )
--------------------
2009 -09 -03
( '2009-09-03' , 10 )
( '2009-09-03' , 4 )
( '2009-09-03' , 22 )
--------------------
2009 -09 -06
( '2009-09-06' , 33 )
--------------------
|
注
- groupby(things, itemgetter(0)) 以第0列为排序目标
- groupby(things, itemgetter(1))以第1列为排序目标
- groupby(things)以整行为排序目标
4. 参考
python中的split函数中的参数问题
Writing an Hadoop MapReduce Program in Python
shell的sort命令的-k参数
转载于:https://www.cnblogs.com/zhizhan/p/5776373.html
用python写MapReduce函数——以WordCount为例相关推荐
- python写mapreduce_用python写MapReduce函数——以WordCount为例
使用 python 写 MapReduce 的 " 诀窍 " 是利用 Hadoop 流的 API ,通过 STDIN( 标准输入 ) . STDOUT( 标准输出 ) 在 Map ...
- 自动化测试(三)如何用python写一个函数,这个函数的功能是,传入一个数字,产生N条邮箱,产生的邮箱不能重复。...
写一个函数,这个函数的功能是,传入一个数字,产生N条邮箱,产生的邮箱不能重复.邮箱前面的长度是6-12之间,产生的邮箱必须包含大写字母.小写字母.数字和特殊字符 和上一期一样 代码中间有段比较混沌 有 ...
- 用python写一个函数_Python基础-函数篇
1. 函数基本语法及特性 2. 参数与局部变量 3. 返回值 嵌套函数 4.递归 5.匿名函数 6.函数式编程介绍 7.高阶函数 8.内置函数 函数与函数式编程 1.面向对象: 华山派----> ...
- MapReduce函数实现WordCount
简单介绍下词频分析,就是统计一个文件中的字段名的出现的次数. 实现过程: 1.创建maven工程 在pom.xml中加入配置信息 以下内容可以在maven 中心仓库中找到 <!-- https: ...
- python写公式函数_python的数学算法函数及公式用法
之前老是跟大家说看久了Python,总感觉就像是很多的数学公式运算,大家一致觉得只是一点点像,那今天跟大家直接就说下叫"数学"算法的内容,这样大家再来品鉴下,是不是可以贯通使用的内 ...
- python写公式函数_根据公式生成函数(Python)
让我们把这个分解一下.首先,如何创建函数pathlength?像这样:def pathlength(x, y): return 42 # The length is 42 miles 如您所见,它接受 ...
- 关于使用Python——写运用函数和字典完成手机销售系统
方法一 phone_info = [{'name':'vivox9', 'price':'1200', 'count':'30'}, {'name':'iphone6', 'price':'2000' ...
- python写接口函数_python接口自动化测试二十:函数写接口测试
# coding:utf-8 import requests import re from bs4 import BeautifulSoup # s = requests.session() # 全局 ...
- python怎么设置回文数_python如何写一个函数判断回文数?
python如何写一个函数判断回文数? python写一个函数判断回文数的方法: 设n是一任意自然数.若将n的各位数字反向排列所得自然数n1与n相等,则称n为一回文数.例如,若n=1234321,则称 ...
- python回文数判定_python如何写一个函数判断回文数?
python如何写一个函数判断回文数? python写一个函数判断回文数的方法: 设n是一任意自然数.若将n的各位数字反向排列所得自然数n1与n相等,则称n为一回文数.例如,若n=1234321,则称 ...
最新文章
- easymock快速入门
- ARouter源码探究
- java_Socket简单使用方法
- [C++][线程安全]单例模式下双检查锁和线程
- 串口通讯编程一日通2(Overlapped IO模型)
- 卡内基梅隆大学和斯坦福计算机,卡内基梅隆大学并列全美榜首的专业--计算机专业...
- 【Julia】ERROR: UndefVarError: linspace not defined
- 从安全和不安全两个角度,教你如何发布对象(含各种单例代码)
- Spring boot 配置文件,输入key值,自动补全--- 通过安装插件实现
- GitHub更新已经fork的项目
- QTTabBar安装后不生效
- Linux固态硬盘 设置写入缓存,Win10下的写入缓存策略严重影响SSD硬盘的性能!
- 2022年终总结:少年不惧岁月长,彼方尚有荣光在。
- 概率论复习笔记二——离散型分布和连续型分布
- 微信小程序video组件调用腾讯视频的解析
- 微信小程序之个人中心静态页面
- 如何修改已提交commit信息
- 一键装机linux_linux系统学习第十八天《搭建一键装机平台》终结篇
- 设施网络选址的基本方法,网络设施选址的方法
- cmd imp导入dmp文件_导入Oracle的dmp备份的dmp文件报错“IMP-00002:无法打开c:/Documents.DMP进行读取”...
热门文章
- linux中程序定时重启脚本,linux 程序定时重启脚本
- linux该专接本还是工作_先专接本还是先工作?
- qt设置背景图片变黑色_PS软件如何快速制作一个黑色创意海报
- android通过代码设置铃声_让你的手机铃声与众不同 (附ios音乐dj)
- SqlDataAdapter的增加,删除,修改
- windows server 系统SERVER服务消失无法共享
- 201621123079《Java程序设计》第1周学习总结
- C++Builder 2010深入TForm类之窗口与窗体
- 在Linux中,用.swp文件恢复未保存的文件
- 64-bit and iOS 8 Requirements for New Apps