一 Map和Reduce 

首先看下MR的工作原理

MapReduce的好处是它可以把在内存中不能完成的事转变成可以在硬盘上高效完成。
Map-­‐Reduce 对于集群的好处:
1,在多节点上冗余地存储数据,以保证数据的持续性和一直可取性
2, 将计算移向数据端,以最大程度减少数据移动
3,简单的程序模型隐藏所有的复杂度

Map,Reduce一般的流程:
Map阶段:
a, 逐个文件逐行扫描
b, 扫描的同时抽取出我们感兴趣的内容 (Keys)

Group by key
排序和洗牌
(Group by key阶段会自动的运行,不需要自己写)

Reduce阶段:
a, 聚合 、 总结 、 过滤或转换
b, 写入结果

二  Hadoop Streaming原理

Hadoop 不仅可以使用Java进行MapReduce的编写,也通过Hadoop Streaming的方式提供了其他语言编写MR的接口。更重要的是,使用python来编写MR,比使用亲儿子Java编写MR要更简单和方便……所以在一些不非常复杂的任务中使用python来编写MR比起使用Java,是更加划算的。

Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。

比如可以使用python语言来写map-reduce使用“Hadoop Streaming”来完成传统mapreduce的功能。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper mapper.py \
-reducer reducer.py

上述代码通过参数input,output,mapper,reducer来定义输入数据,输出数据,mapper文件,reducer文件。

在上面的代码中,mapper和reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。

如果一个可执行文件被用于mapper,则在mapper初始化时, 每一个mapper任务会把这个可执行文件作为一个单独的进程启动。 mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。 如果没有tab,整行作为key值,value值为null。

如果一个可执行文件被用于reducer,每个reducer任务会把这个可执行文件作为一个单独的进程启动。 Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成key/value对,作为reducer的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。

三 词频统计的例子

Python实现Wordcount:

1. mapper.py

[root@vm wordcount]# vim mapper.py
写入

#!/usr/bin/python
import sys
word2count = {}
for line in sys.stdin:line = line.strip()words = filter(lambda word:word,line.split())for word in words:print("%s\t%s" % (word,1))

2. reducer.py

[root@vm wordcount]# vim reducer.py
写入

#!/usr/bin/python
from operator import itemgetter
import sysword2count = {}
for line in sys.stdin:line = line.strip()word,count = line.split()try:count = int(count)word2count[word] = word2count.get(word,0) + countexcept ValueError as err:print(err)passsorted_word2count = sorted(word2count.items(),key=itemgetter(0))
for word,count in sorted_word2count:print("%s\t%s" % (word, count))

3. 准备一个测试文件test.txt

[root@vm wordcount]# vim test.txt
写入

this is a test
this is a test
this is a test
this is a test

4. 本地测试

[root@vm wordcount]# cat test.txt |python mapper.py |sort|python reducer.py

a       4
is      4
test    4
this    4

[root@vm wordcount]#
5. 集群运行

集群运行前要将本地的测试文件上传到hdfs
[root@vm wordcount]# hadoop fs -mkdir /user/root/wordcount
[root@vm wordcount]# hadoop fs -put test.txt /user/root/wordcount/
[root@vm wordcount]# hadoop fs -ls /user/root/wordcount/
Found 1 items
-rw-r--r--   3 root root         60 2018-05-14 09:58 /user/root/wordcount/test.txt
[root@vm wordcount]#

运行mapreduce

[root@vm wordcount]# hadoop jar /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/hadoop-streaming-2.6.0-cdh5.13.1.jar -D mapred.reduce.tasks=1 -mapper "python mapper.py" -reducer "python reducer.py" -file mapper.py -file reducer.py -input /user/root/wordcount/test.txt -output /user/root/wordcount/out

命令行查看结果

[root@vm wordcount]# hadoop fs -cat /user/root/wordcount/out/part-00000
a   4
is  4
test    4
this    4
[root@vm wordcount]# 

四 使用第三方的Python库

$HADOOP_HOME/bin/hadoop streaming -D mapred.job.priority='VERY_HIGH' -D mared.job.map.capacity=500 -D mapred.reduce.tasks=0 -D mapred.map.tasks=500 -input myInputDirs(你得HDFS路径) -output myOutputDir(你的HDFS路径) -mapper "python  yourpythonfile.py" -reducer "python  yourpythonfile.py" -file yourpythonfile.py(需要几个就添加几个-file) -cacheArchive "/xx/xx/xx/myvp.tar.gz#myvp"(此处是一个HDFS路径,稍后用到)

使用第三方库

需要使用第三方库如bs4,numpy等时,需要用到虚拟环境virtualenv
virtualenv的使用

安装

pip install virtualenv

新建虚拟环境

virtualenv myvp

使得虚拟环境的路径为相对路径

virtualenv --relocatable myvp

激活虚拟环境

source myvp/bin/activate

如果想退出,可以使用下面的命令

deactivate

激活后直接安装各种需要的包

pip install XXX

压缩环境包

tar -czf myvp.tar.gz myvp

在mapreduce上使用

在上面的脚本中可以看到使用了-catchArchive,但是路径是HDFS的路径,因此需要提前将本地的myvp.tai.gz包上传到HDFS上。
同时#后面的myvp是文件的文件夹,解压后还有一个myvp(因为压缩的时候把文件夹本身也压缩进去了),所有map中使用的时候的路径就是myvp/myvp/bin/…
在map的python脚本中加入如下的代码,会把第三方库加入到python 路径

import sys
sys.path.append("myvp/myvp/lib/python2.7")

参考:

https://blog.csdn.net/wawa8899/article/details/80305720

https://blog.csdn.net/wh357589873/article/details/70049088

使用Hadoop Streaming 完成MapReduce(Python代码)相关推荐

  1. python还是hadoop_使用Python和Hadoop Streaming编写MapReduce

    最近有个需求,就是对视频日志中的部分URL提取出来,并随机挑选五条.由于线上日志比较大,而且需要每天执行一次,如果单纯的用python即便是多线程性能也会大大折扣.于是考虑到用hadoop的MR去实现 ...

  2. python hadoop streaming_如何在Hadoop中使用Streaming编写MapReduce(转帖)

    作者:马士华 发表于:2008-03-05 12:51 最后更新于:2008-03-25 11:18 版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息. http://www ...

  3. 用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试

    原文地址:http://www.cnblogs.com/joyeecheung/p/3757915.html 相关随笔:  点击打开链接 Hadoop-1.0.4集群搭建笔记 用python + ha ...

  4. hadoop streaming编程小demo(python版)

    大数据团队搞数据质量评测.自动化质检和监控平台是用django,MR也是通过python实现的.(后来发现有orc压缩问题,python不知道怎么解决,正在改成java版本) 这里展示一个python ...

  5. 基于hadoop的商品推荐系统_[零基础入门推荐系统(1)]基于用户和基于物品的协同过滤方法(python代码实现)...

    1. 前言: 为什么会有该系列? 最近,打算写<零基础入门推荐系统>系列,为了系统地介绍推荐系统知识,以及加强基础的实践能力. 该系列将结合一些书籍,比如项亮的<推荐系统实践> ...

  6. mapreduce python实例_MapReduce程序实例(python)

    问题背景 现在有两份数据,file1是校园新闻版块,每一条新闻点击记录:file2是校园新闻版块使用活跃度高的学生记录.用mr统计出某一天的点击记录里,使用ios/android手机的活跃学生的总的点 ...

  7. 利用Hadoop Streaming处理二进制格式文件

    Hadoop Streaming是Hadoop提供的多语言编程工具,用户可以使用自己擅长的编程语言(比如python.php或C#等)编写Mapper和Reducer处理文本数据.Hadoop Str ...

  8. Hadoop Streaming高级编程

    1. 概要 本文主要介绍了Hadoop Streaming的一些高级编程技巧,包括,怎样在mapredue作业中定制输出输出格式?怎样向mapreduce作业中传递参数?怎么在mapreduce作业中 ...

  9. java vo转map_Jython:在 Java 程序里运行 Python 代码 4.5

    彭翌 彭翌,网易游戏资深运维开发工程师,从事大数据相关的基础架构平台研发工作,业余时间也关注分布式系统等相关领域. 前言 众所周知,JVM 在大数据基础架构领域可以说是独占鳌头,当我们需要开发大数据处 ...

最新文章

  1. hdu oj1096题解
  2. WinAPI: waveInUnprepareHeader - 清除由 waveInPrepareHeader 完成的准备
  3. Linux shell ==运算符
  4. 【转】jmeter响应结果乱码问题
  5. 隐藏了十年的Sudo漏洞爆出:无需密码就能获取root权限
  6. 如何优雅地实现 C 编译期静态反射
  7. 渗透测试入门20之渗透测试七阶段
  8. 大数据之-Hadoop源码编译_源码编译的意义---大数据之hadoop工作笔记0044
  9. Dynamo和Bigtable对比研究
  10. matlab 空集判定,在使用matlab 符号运算中的solve函数时,为啥计算的结果是空集?该怎么办?...
  11. atq1_使用at,atq,atrm和batchLinux调度命令示例
  12. 王者荣耀体验服怎么显示服务器,王者荣耀体验服怎么进 王者荣耀体验服申请攻略...
  13. 【经典游戏系列】回忆杀?扒一扒那些年我们玩过的打地鼠小游戏。
  14. 2021-09-16关于初学C的心路历程~
  15. 一个在线测试正则表达式的网站推荐
  16. dbms_aw.eval_number
  17. 计算机组成原理ACC中文含义,计算机组成原理 作业一
  18. Markdown 入门及语法详细指南 ★
  19. 七牛云CNAME设置,七牛云绑定域名。
  20. Spring-SecondDay

热门文章

  1. post 表单中常见的四种表单请求方式
  2. 易语言 mysql查询 中文乱码_大佬们E语言连接MYSQL输出中文乱码怎么破
  3. Mybatis源码分析--Mapper接口的代理生成原理
  4. eclipse怎么导入maven项目 eclipse导入maven项目详细教程
  5. 从一个简洁的进度刻度绘制中了解自定义 View 的思路流程
  6. 现在就启用 HTTPS,免费的!
  7. USACO Section 4.2 题解
  8. Java 建模:UML 工作簿,第 1 部分
  9. left join on用法_MySQL 多表查询 quot;Joinquot;+“case when”语句总结
  10. Spring Security加密策略