在以前的一篇关于python版MR实例的文章中,只是简单走了一下流程,这次主要是解决上次遗留的key,partition,sort的问题。以进一步理解hadoop streaming,也尝试mapper使用python而reducer使用bash的结合方式

1. 省略本地测试这个环节,可参考以前的那篇文章,HDFS上的测试数据位:
[root@hadoop Desktop]# hadoop fs -cat /usr/egencia/travler/travler.txt
/usr/hadoop/hadoop-1.2.1/libexec/../conf/hadoop-env.sh: line 59: export: `mapred.tasktracker.reduce.tasks.maximum=4': not a valid identifier
Warning: $HADOOP_HOME is deprecated.

air:343;hotel:45;train:54467;nation:china
air:367;hotel:456;train:5567;nation:china
air:356;hotel:4522;train:54367;car:454;nation:china
air:343;hotel:45;train:54467;nation:usa
air:367;hotel:456;train:5567;nation:usa
air:356;hotel:4522;train:54367;car:454;nation:usa
air:343;hotel:45;train:54467;nation:india
air:367;hotel:456;train:5567;nation:india
air:356;hotel:4522;train:54367;car:454;nation:india

2.假定需求为:
2.1 每个国家一个输出文件
2.2 输出文件格式为:
air:343:china
hotel:45:china
........

2.3 每个输出文件按照第一个字段排序后按照第二个字段排序

3 设计
key:第一和第三个字段
mapper输出为:air:343:china
partition:第三个字段
sort:第一和第二字段

4.mapper(python):
[root@hadoop Desktop]# cat tmapper.py
#!/usr/bin/python

import sys

line=sys.stdin.readline()
#print line[-1]
try:
    while line:
        line=line[:-1]
        #print line
        products=line.split(";")
        nations=products[-1].split(":")
        nation=nations[-1]
        prolen=len(products)
        for index,pro in enumerate(products):
            if index==(prolen-1):
                break
            else:
                subs=pro.split(":")
                print subs[0]+":"+subs[-1]+":"+nation
        line=sys.stdin.readline()
except :
    print "error"

5.reducer(bash):
/bin/cat

6.先不执行分区和排序:
[root@hadoop Desktop]# hadoop jar /usr/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar \                                    > -D mapred.reduce.tasks=3 \
> -input /usr/egencia/travler \
> -output /usr/egencia/travler/out \
> -mapper tmapper.py \
> -reducer /bin/cat \
> -file tmapper.py

查看输出文件:
[root@hadoop Desktop]# hadoop fs -ls /usr/egencia/travler/out
/usr/hadoop/hadoop-1.2.1/libexec/../conf/hadoop-env.sh: line 59: export: `mapred.tasktracker.reduce.tasks.maximum=4': not a valid identifier
Warning: $HADOOP_HOME is deprecated.

Found 5 items
-rw-r--r--   1 root supergroup          0 2013-09-06 02:42 /usr/egencia/travler/out/_SUCCESS
drwxr-xr-x   - root supergroup          0 2013-09-06 02:42 /usr/egencia/travler/out/_logs
-rw-r--r--   1 root supergroup        160 2013-09-06 02:42 /usr/egencia/travler/out/part-00000
-rw-r--r--   1 root supergroup        189 2013-09-06 02:42 /usr/egencia/travler/out/part-00001
-rw-r--r--   1 root supergroup        132 2013-09-06 02:42 /usr/egencia/travler/out/part-00002
[root@hadoop Desktop]# hadoop fs -cat /usr/egencia/travler/out/part-00000  /usr/egencia/travler/out/part-00001 \
>  /usr/egencia/travler/out/part-00002
/usr/hadoop/hadoop-1.2.1/libexec/../conf/hadoop-env.sh: line 59: export: `mapred.tasktracker.reduce.tasks.maximum=4': not a valid identifier
Warning: $HADOOP_HOME is deprecated.

air:343:usa
air:367:usa
car:454:india
hotel:4522:china
hotel:4522:usa
hotel:456:india
hotel:45:usa
train:54367:china
train:54367:usa
train:5567:india
air:343:india
air:356:india
air:356:usa
air:367:india
car:454:china
car:454:usa
hotel:456:china
hotel:45:india
train:54467:india
train:54467:usa
train:5567:china
train:5567:usa
air:343:china
air:356:china
air:367:china
hotel:4522:india
hotel:456:usa
hotel:45:china
train:54367:india
train:54467:china

7. 删除输出目录后,
指定分割符号为:
指定第一和第三个字段为key
指定分区为第三个字段
重新执行
第一种执行:
 hadoop jar /usr/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar -D mapred.reduce.tasks=3 -D stream.map.output.field.separator=: -D tream.num.map.output.key.fields=1  -input /usr/egencia/travler -output /usr/egencia/travler/out -mapper tmapper.py -reducer /bin/cat -file tmapper.py

结果
Warning: $HADOOP_HOME is deprecated.

hotel   45:china
hotel   456:china
hotel   4522:china
hotel   45:usa
hotel   456:usa
hotel   4522:usa

每个文件都是按照产品进行了partition

hadoop jar /usr/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar \
-D mapred.reduce.tasks=3 \
-D stream.map.output.field.separator=: \
-D tream.num.map.output.key.fields=1 \
-D map.output.key.field.separator=: \
-D mapred.text.key.partitioner.options=-k3 \
-input /usr/egencia/travler \
-output /usr/egencia/travler/out \
-mapper tmapper.py \
-reducer /bin/cat -file tmapper.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

hadoop jar /usr/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar \
-D stream.map.output.field.separator=: \
-D stream.num.map.output.key.fields=3 \
-D map.output.key.field.separator=: \
-D mapred.text.key.partitioner.options=-k2 \
-D mapred.reduce.tasks=2 \
-input /usr/egencia/travler \
-output /usr/egencia/travler/out \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-mapper tmapper.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-file tmapper.py \
-outputformat org.apache.hadoop.mapred.TextOutputFormat \
-reducer /bin/cat

streaming mr相关推荐

  1. 2.东软跨境电商数仓项目技术选型

    东软跨境电商数仓项目技术选型.框架版本选型.服务器选型.集群规划 文章目录 东软跨境电商数仓项目技术选型.框架版本选型.服务器选型.集群规划 1.数据采集传输技术选型 1.1 DataX和Sqoop比 ...

  2. 分布式流式计算框架Storm

    Storm用于实时处理,就好比 Hadoop 用于批处理.         --> 离线计算:批量获取数据,批量传输数据,周期性比量计算数据,数据展示(Sqoop-->HDFS--> ...

  3. Storm和MR及Spark Streaming的区别

    1.Storm和MR对比 Storm进程常驻内存,数据不经过磁盘,在内存中处理,数据通过网络传导.是流式处理框架,体现出了实时性. MR是为TB.PB级别数据设计的批处理离线计算框架 2.Storm和 ...

  4. python还是hadoop_使用Python和Hadoop Streaming编写MapReduce

    最近有个需求,就是对视频日志中的部分URL提取出来,并随机挑选五条.由于线上日志比较大,而且需要每天执行一次,如果单纯的用python即便是多线程性能也会大大折扣.于是考虑到用hadoop的MR去实现 ...

  5. 使用Hadoop Streaming 完成MapReduce(Python代码)

    一 Map和Reduce  首先看下MR的工作原理 MapReduce的好处是它可以把在内存中不能完成的事转变成可以在硬盘上高效完成. Map-­‐Reduce 对于集群的好处: 1,在多节点上冗余地 ...

  6. hadoop streaming编程小demo(python版)

    大数据团队搞数据质量评测.自动化质检和监控平台是用django,MR也是通过python实现的.(后来发现有orc压缩问题,python不知道怎么解决,正在改成java版本) 这里展示一个python ...

  7. C#码农的大数据之路 - 使用C#编写MR作业

    写在前面 从Hadoop出现至今,大数据几乎就是Java平台专属一般.虽然Hadoop或Spark也提供了接口可以与其他语言一起使用,但作为基于JVM运行的框架,Java系语言有着天生优势.而且能找到 ...

  8. python hadoop streaming_Hadoop Streaming 使用及参数设置

    1. MapReduce 与 HDFS 简介 什么是 Hadoop ? Google 为自己的业务需要提出了编程模型 MapReduce 和分布式文件系统 Google File System,并发布 ...

  9. 用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试

    原文地址:http://www.cnblogs.com/joyeecheung/p/3757915.html 相关随笔:  点击打开链接 Hadoop-1.0.4集群搭建笔记 用python + ha ...

最新文章

  1. Linux装多个apache,windows linux如何安装多个apache?
  2. Install Odoo 11 on CentOS 7
  3. python自动化运维快速入门pdf下载_Python自动化运维快速入门
  4. android baseactivity,Android应用开发Android通过BaseActivity获取到当前启动的Activity名称...
  5. mysql安装失败net_mysql安装后.net程序运行出错的解决方法
  6. 写给省选前的自己V2
  7. 跟我一起认识axure(三)
  8. VStudio 2003 remote debug
  9. [高精度整数] a+b [2010年华中科技大学计算机研究生机试真题]
  10. 实验7-3-4 字符串替换 (15 分)
  11. python 字节字符串_Python字符串转换为字节,字节转换为字符串
  12. 【转】如何把Matlab中的m文件转化成C语言代码
  13. 2016百度笔试题交流
  14. python九九乘法口诀_Python3 九九乘法口诀(99乘法口诀)
  15. NVIDIA NCCL 源码学习(五)- 路径计算
  16. 《新编计算机科学概论》一2.5 计算机软件系统
  17. 影视广告创意与制作(二)
  18. 简单了解下DDOS产业
  19. git 交互式rebase
  20. 嵌入式实时操作系统RTX5快速入门 (完结)

热门文章

  1. VxWorks学习笔记一 ------Bootrom和VxWorks镜像的引导
  2. java.sql.SQLException: Invalid utf8 character string: 'ACED00'
  3. AutoCAD二次开发:ObjectArx下的两种命令注册方式
  4. 第一节 函数与极限——映射与函数
  5. 阿里云发送短信功能(环境搭建篇)
  6. 【Python 22】52周存钱挑战2.0(列表list和math函数)
  7. 大数据入门的五大核心技术
  8. myeclipse8.5 TPTP插件的使用问题
  9. python|面向对象(一)
  10. 哈工大计算机科学与捄术学院,[哈尔滨工业大学]管理科学与工程