Hadoop Streaming示例程序(wordcount)

run_hadoop_word_counter.sh

$HADOOP_BIN streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \-file "mapper_word_counter.py" \                                                                                                                                         -file "reducer_word_counter.py" \-file "filter_word_counter.py" \-mapper "./python2.7.2/bin/python mapper_word_counter.py" \-combiner "./python2.7.2/bin/python reducer_word_counter.py" \-reducer "./python2.7.2/bin/python reducer_word_counter.py" \-jobconf abaci.job.base.environment="centos6u3_hadoop" \-jobconf mapred.job.priority="NORMAL" \-jobconf mapred.job.name="${TASK_NAME}" \-jobconf mapred.map.tasks="${MAP_NUM}" \-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \-jobconf mapred.map.memory.limit="1000" \-jobconf mapred.reduce.memory.limit="1000" \-jobconf mapred.job.map.capacity="3000" \-jobconf mapred.job.reduce.capacity="2500" \-jobconf mapred.job.keep.files.hours=12 \-jobconf mapred.max.map.failures.percent=1 \-jobconf mapred.reduce.tasks.speculative.execution="false"

mapper_word_counter.py

import sys for line in sys.stdin:fields = line.strip().split('\t')try:cnt = 1                                                                                                                                                              dateval = fields[1]sys.stdout.write('%s\t%d\n' %(dateval, cnt))except Exception as exp:sys.stderr.write("exp:%s, %s" %(str(exp), line))

reducer_word_counter.py

import sys word_pre = None
counter_pre = 0 for line in sys.stdin:try:word, cnt  = line.strip().split('\t')                                                                                                                                cnt = int(cnt)except Exception as exp:sys.stderr.write('Exp:%s,line:%s' %(str(exp), line.strip()))continueif word == word_pre:counter_pre += cnt else:if word_pre:print('%s\t%d' %(word_pre, counter_pre))word_pre = wordcounter_pre = cnt if word_pre:print('%s\t%d' %(word_pre, counter_pre))

纯文本输入格式

  • 每个mapper输入若干行
    -inputformat "org.apache.hadoop.mapred.TextInputFormat"
  • 指定每个mapper输入的行数
    -inputformat "org.apache.hadoop.mapred.lib.NLineInputFormat" -jobconf mapred.line.input.format.linespermap="5"

文件分发方式:

-file将客户端本地文件打成jar包上传到HDFS然后分发到计算节点;
-cacheFile将HDFS文件分发到计算节点;
-cacheArchive将HDFS压缩文件分发到计算节点并解压;

分桶&排序

Hadoop默认会把map输出行中遇到的第一个分隔符(默认为\t)前面的部分作为key,后面的作为value,如果输出行中没有指定的分隔符,则整行作为key,value被设置为空字符串。mapper输出的key经过partition分发到不同的reduce里。

  • 应用示例
${HADOOP_BIN} streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-mapper cat \-reducer cat \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \-jobconf stream.num.map.output.key.fields=4 \-jobconf stream.map.output.field.separator=. \-jobconf map.output.key.field.separator=. \-jobconf mapred.text.key.partitioner.options=-k1,2 \-jobconf mapred.text.key.comparator.options="-k3,3 -k4nr" \-jobconf stream.reduce.output.field.separator=. \-jobconf stream.num.reduce.output.key.fields=4 \-jobconf mapred.reduce.tasks=5

说明:

  • 设定mapper输出的key
    stream.map.output.field.separator 设置map输出的字段分隔符
    stream.num.map.output.key.fields 设置map输出的前几个字段作为key
  • 设定根据key进行分桶的规则
    org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner partition类
    map.output.key.field.separator 设置key内的字段分隔符(KeyFieldBasedPartitioner和KeyFieldBasedComparator所特有)
    num.key.fields.for.partition 设置key内前几个字段用来做partition
    mapred.text.key.partitioner.options 可单独指定key中哪些字段做partition,和num.key.fields.for.partition一起使用以num.key.fields.for.partition为准
  • 设定根据key进行排序的规则
    KeyFieldBasedComparator 可灵活设置的高级比较器,默认使用Text的基于字典序或者通过-n来基于数字比较
    mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围
  • 设定reducer输出的key
    stream.reduce.output.field.separator 设置reduce输出的字段分隔符
    stream.num.reduce.output.key.fields 设置reduce输出的前几个字段作为key

多路输出

Hadoop支持多路输出,可以将MapReduce的处理数据输出到多个part-xxxxx-X文件中(X是A-Z共26个字母中的一个)。程序需要在maper(正对仅有mapper的MR任务)/reducer(针对包含reducer的任务)程序中将输出形式由<key,value>变为<key, value#X>,以便输出特定后缀的文件中。其中#X仅仅用做指定输出文件后缀, 不会出现在输出内容中。
启动脚本中需要指定
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat
或者
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat

  • 应用示例
    run_hadoop.sh
${HADOOP_BIN} streaming \-input "${INPUT}" \-output "${OUT_DIR}" \-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \-file "mapper_worker.sh" \-file "reducer_worker.py" \-mapper "sh mapper_worker.sh" \-reducer "python2.7.2/bin/python reducer_worker.py" \-inputformat "org.apache.hadoop.mapred.TextInputFormat" \-outputformat "org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" \-jobconf mapred.job.priority="NORMAL" \-jobconf mapred.job.name="${TASK_NAME}" \-jobconf mapred.map.tasks="${MAP_NUM}" \-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \-jobconf mapred.max.split.size=134217728 \-jobconf mapred.map.memory.limit="800" \-jobconf mapred.reduce.memory.limit="500" \-jobconf mapred.job.map.capacity="3500" \-jobconf mapred.job.reduce.capacity="2000" \-jobconf mapred.job.keep.files.hours=12 \-jobconf mapred.max.map.failures.percent=1 \-jobconf mapred.reduce.tasks.speculative.execution="false"

reducer_worder.py

for line in sys.stdin:record = line.strip()fields = record.split('\t')if len(fields) != 7:continuevcpurl, playurl, title, poster, duration, pubtime, accept = fieldsduration = int(duration)pubtime = int(pubtime)accept = int(accept)if duration < 60:sys.stdout.write('%s#A\n' %(record))elif duration < 300:sys.stdout.write('%s#B\n' %(record))else:sys.stdout.write('%s#C\n' %(record))

本地调试

为避免在启动MR任务后才发现程序bug,最好提前在本地模拟MR的运行流程,验证结果是否符合预期

cat inputfile | ./mapper_task.sh | sort -t$'\t' -k1,1 | ./reducer.sh

压缩输出

Hadoop默认支持gzip压缩, streaming作业中指定以下参数即可使输出以gzip形式压缩.

-D mapreduce.output.fileoutputformat.compress=true
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

Hadoop 是可自行读取gzip压缩的数据,无需特殊指明输入是 Gzip 压缩。Gzip 的特点是压缩比较高,Hadoop 原生支持,缺点是压缩效率并不是很高,压缩比和效率不可兼得,需要考虑其他压缩方式。

Hadoop常用配置项

配置名 说明
abaci.job.base.environment centos6u3_hadoop 如果系统环境需要升级,可以指定为 centos6u3_hadoop 支持更高版本的 glibc
stream.memory.limit 单个map/reduce最高使用内存,默认800M
mapred.map.memory.limit 单个map最高使用内存,优先级高于stream.memory.limit
mapred.reduce.memory.limit 单个reduce最高使用内存,优先级高于stream.memory.limit
mapred.map.capacity.per.tasktracker 每台机器最多同时启动map个数
mapred.reduce.capacity.per.tasktracker 每台机器最多同时启动reduce个数
mapred.job.map.capacity map并发数目
mapred.job.reduce.capacity reduce并发数目
abaci.job.map.max.capacity map并发限制,默认10000
abaci.job.reduce.max.capacity reduce并发限制,默认3000
mapred.map.tasks map数目
mapred.reduce.tasks reduce数目
mapred.job.reuse.jvm.num.tasks 1表示不reuse,-1表示无限reuse,其他数值表示每个jvm reuse次数。reuse的时候,map结束时不会释放内存
mapred.compress.map.output 指定map的输出是否压缩。有助于减小数据量,减小io压力,但压缩和解压有cpu成本,需要慎重选择压缩算法
mapred.map.output.compression.codec map输出的压缩算法
mapred.output.compress reduce输出是否压缩
mapred.output.compression.codec 控制mapred的输出的压缩的方式
io.compression.codecs 压缩算法
mapred.max.map.failures.percent 容忍map错误百分比,默认为0
mapred.max.reduce.failures.percent 容忍reduce错误百分比,默认为0
stream.map.output.field.separator map输出分隔符,默认Tab
stream.reduce.output.field.separator reduce输出分隔符,默认Tab
mapred.textoutputformat.separator 设置TextOutputFormat的输出key,value分隔符,默认Tab
mapred.textoutputformat.ignoreseparator 设置为true后,当只有key没有value会去掉自动补上的Tab
mapred.min.split.size 指定map最小处理数据量,单位B
mapred.max.split.size 指定map最多处理数据量,单位B,同时设置inputformat=org.apache.hadoop.mapred.CombineTextInputFormat
mapred.combine.input.format.local.only 是否只合并本节点,默认true,设置为false可以跨节点合并数据
abaci.job.map.cpu.percent map消耗cpu占比,参数默认值40(表示1个cpu的40%,即0.4个cpu)
abaci.job.reduce.cpu.percent reduce消耗cpu占比,参数默认值40(表示1个cpu的40%,即0.4个cpu)
mapred.map.capacity.per.tasktracker 表示每个节点最多并行跑几个该job的map任务(请根据内存情况适当增减该参数,默认是8)
mapred.reduce.capacity.per.tasktracker 表示每个节点最多并行跑几个该job的reduce任务(请根据内存情况适当增减该参数,默认是8)
mapred.map.tasks.speculative.execution 开启map预测执行,默认true
mapred.reduce.tasks.speculative.execution 开启reduce预测执行,默认true

Hadoop环境下系统变量

  • 变量名列表
变量名 变量说明
HADOOP_HOME 计算节点上配置的Hadoop路径
LD_LIBRARY_PATH 计算节点上加载库文件的路径列表
PWD 当前工作目录
dfs_block_size 当前设置的HDFS文件块大小
map_input_file mapper正在处理的输入文件路径
mapred_job_id 作业ID
mapred_job_name 作业名
mapred_tip_id 当前任务的第几次重试
mapred_task_id 任务ID
mapred_task_is_map 当前任务是否为map
mapred_output_dir 计算输出路径
mapred_map_tasks 计算的map任务数
mapred_reduce_tasks 计算的reduce任务数

https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Configured+Parameters

  • 应用示例:
    Shell版
#!/bin/bashset -o pipefail
HOST="localhost"
PORT=$((1000 + ${mapred_task_partition}))awk '{print $2}' \| ./access_remote_data ${HOST} ${PORT} outdata.gzhdfs_outfile=${mapred_work_output_dir}/${mapred_task_partition}.pack
cat outdata.gz \| gzip -d \| python ../postprocess.py| ${HADOOP_HOME}/bin/hadoop fs -D hadoop.job.ugi="username,pwd" -copyFromLocal - ${hdfs_outfile}

Python版

import osinput_file = os.environ['mapreduce_map_input_file']
#do something else

References

Hadoop Streaming相关官方文档:https://hadoop.apache.org/docs/r3.1.2/hadoop-streaming/HadoopStreaming.html
Hadoop Streaming入门:http://icejoywoo.github.io/2015/09/28/introduction-to-hadoop-streaming.html
Hadoop排序工具用法小结:http://www.dreamingfish123.info/?p=1102
Hadoop压缩选项权衡:https://www.slideshare.net/Hadoop_Summit/singh-kamat-june27425pmroom210c

转载于:https://www.cnblogs.com/jeromeblog/p/11464693.html

Hadoop常用操作汇总相关推荐

  1. dataframe数据分析常用操作汇总

    dataframe的常用操作汇总 目录 1.建 1 2.读 2 3.存 2 4.阅 2 5.取 2 6.改 4 7.增 4 8.删 5 9.并 5 10.序 7 1.建 创建只有一列的数据框 df = ...

  2. mysql sql 字符串字段保留数字_SQL字符串以及数字常用操作汇总

    SQL字符串以及数字常用操作汇总 更新时间:2013年06月11日 09:05:12   作者: 本篇文章是对SQL字符串以及数字的常用操作进行了详细的总结与分析,需要的朋友参考下 --将字符串中从某 ...

  3. arcgis select by attributes一次选多个_ArcGIS中属性表的常用操作汇总

    ArcGIS中属性表的常用操作汇总 本篇文章将平时对arcgis属性表的相关操作记录下来,防止忘记.此外,在技术摸索中参考了一些gis大牛的博客和技术分享,我在博客结尾也粘贴了他们的博客地址在此表示感 ...

  4. HFSS常用操作汇总(持续更新)

    HFSS常用操作 操作类 convert to read only 切角 Non-Model 圆极化天线使用Discrete扫频 peakGain and GainTotal 局部区域的电流或电场等的 ...

  5. Linux常用操作汇总:内容有点杂,但很实用

    这一阵操作虚拟机Linux比较多,一些操作使用也比较频繁,在这里总结一下,方便回顾. 1.获取动态IP 刚装的centos,你可能获取不到动态IP,类似下面的情况: 解决方案: 首先确定你网卡的名称, ...

  6. Python和Excel的完美结合:常用操作汇总

    在以前,商业分析对应的英文单词是Business Analysis,大家用的分析工具是Excel,后来数据量大了,Excel应付不过来了(Excel最大支持行数为1048576行),人们开始转向pyt ...

  7. arcgis 字段计算器 条件赋值_ArcGIS中属性表的常用操作汇总

    本篇文章将平时对arcgis属性表的相关操作记录下来,防止忘记.此外,在技术摸索中参考了一些gis大牛的博客和技术分享,我在博客结尾也粘贴了他们的博客地址在此表示感谢. 案例一:arcgis属性表某个 ...

  8. Hadoop应用实战100讲(二)-Hadoop常用命令汇总

    前言 以下是我为大家准备的几个精品专栏,喜欢的小伙伴可自行订阅,你的支持就是我不断更新的动力哟! MATLAB-30天带你从入门到精通 MATLAB深入理解高级教程(附源码) tableau可视化数据 ...

  9. 【Kotlin 初学者】字符串常用操作汇总

    作者简介:CSDN博客专家.华为云享专家认证 系列专栏:Kotlin 初学者 学习交流:三人行必有我师焉:择其善者而从之,其不善者而改之. 目录 一.字符串创建 1.1 字符串属性 1.2 字符串函数 ...

最新文章

  1. python北京时间代码_python代码定时同步本机的北京时间详解
  2. 封装一个类搞定90%安卓客户端与服务器端交互
  3. CTFshow 文件包含 web116
  4. 如何把文件压缩变成一张图片?
  5. INTEL和AMD两大巨头的前身
  6. 300plc与组态王mpi通讯_S7-300与S7-200之间的MPI通信
  7. Mybatis之typeAlias配置的3种方法
  8. 产业链人士:部分客户订单减少 联发科四季度营收可能环比下滑
  9. 报告正在使用哪些Reporting Services数据集字段?
  10. 2019 年备受争议的 Facebook 能否走出去年的阴影?| 畅言
  11. debian-nagios3.2,linux+apache+mysql+php
  12. 【GlobalMapper精品教程】005:影像拼接与裁切(分幅)作业案例教程
  13. 正则表达式-国际手机号或座机号校验
  14. C语言——白细胞计数
  15. android美拍sd卡,AI美拍,智慧四摄;小i也有大不同
  16. linux自动启动 oracle
  17. 阿克曼函数java代码_阿克曼函数
  18. x200装linux驱动下载,手把手教你在x200上安装原汁原味的mac Os x10.6系统
  19. OSChina 周三乱弹 ——小时候,女孩子喜欢娃娃,长大后
  20. 波若波罗蜜心经学习心得

热门文章

  1. 1716: 棒棒糖(暴力破解+优化)
  2. java 图形校验_java图形验证码生成工具类 web页面校验验证码
  3. 对话李飞飞,揭秘国际体育赛事风“云”背后的黑科技
  4. 如何打造“智能助理”?阿里对话开发平台这样做
  5. 以外包角度谈美术制程 Studio Voltz联合创始人开发经验
  6. iPhone 13发售日期偷跑:9月17日全系开售、共4款
  7. 判断一个字符串是否全部相同
  8. ASP.NET MVC项目的创建
  9. 揭秘猪八戒调戏嫦娥背后的阴谋
  10. 金蝶K3cloud问题单排查