大数据团队搞数据质量评测。自动化质检和监控平台是用django,MR也是通过python实现的。(后来发现有orc压缩问题,python不知道怎么解决,正在改成java版本)

这里展示一个python编写MR的例子吧。

抄一句话:Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。

1、首先,先介绍一下背景,我们的数据是存放在hive里的。hive建表语句如下:

我们将会解析元数据,和HDFS上的数据进行merge,方便处理。这里的partition_key用的是year/month/day。

hive (gulfstream_ods)> desc g_order;
OK
col_name        data_type       comment
order_id                bigint                  订单id
driver_id               bigint                  司机id,司机抢单前该值为0
driver_phone            string                  司机电话
passenger_id            bigint                  乘客id
passenger_phone         string                  乘客电话
car_id                  int                     接驾车辆id
area                    int                     城市id
district                string                  城市区号
type                    int                     订单时效,0 实时  1预约
current_lng             decimal(19,6)           乘客发单时的经度
current_lat             decimal(19,6)           乘客发单时的纬度
starting_name           string                  起点名称
starting_lng            decimal(19,6)           起点经度
starting_lat            decimal(19,6)           起点纬度
dest_name               string                  终点名称
dest_lng                decimal(19,6)           终点经度
dest_lat                decimal(19,6)           终点纬度
driver_start_distance   int                     司机与出发地的路面距离,单位:米
start_dest_distance     int                     出发地与终点的路面距离,单位:米
departure_time          string                  出发时间(预约单的预约时间,实时单为发单时间)
strive_time             string                  抢单成功时间
consult_time            string                  协商时间
arrive_time             string                  司机点击‘我已到达’的时间
setoncar_time           string                  上车时间(暂时不用)
begin_charge_time       string                  司机点机‘开始计费’的时间
finish_time             string                  完成时间
year                    string
month                   string
day                     string                                      # Partition Information
# col_name              data_type               comment             year                    string
month                   string
day                     string              

2、我们解析元数据

这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order,我们将会将此配置文件连同MR脚本一起上传到hadoop集群。

import subprocess
from subprocess import Popendef desc_table(db, table):process = Popen('hive -e "desc %s.%s"' % (db, table),shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)stdout, stderr = process.communicate()is_column = Truestructure_list = list()column_list = list()for line in stdout.split('\n'):value_list = list()if not line or len(line.split()) < 2:breakif is_column:column_list = line.split()is_column = Falsecontinueelse:value_list = line.split()structure_dict = dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list

3、下面是hadoop streaming执行脚本。

#!/bin/bashsource /etc/profilesource ~/.bash_profile

#hadoop目录echo "HADOOP_HOME: "$HADOOP_HOMEHADOOP="$HADOOP_HOME/bin/hadoop"

DB=$1TABLE=$2YEAR=$3MONTH=$4DAY=$5echo $DB--$TABLE--$YEAR--$MONTH--$DAY

if [ "$DB" = "gulfstream_ods" ]then    DB_NAME="gulfstream"else    DB_NAME=$DBfiTABLE_NAME=$TABLE

#输入路径input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"#标记文件后缀名input_mark="_SUCCESS"echo $input_path#输出路径output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"output_mark="_SUCCESS"echo $output_path#性能约束参数capacity_mapper=500capacity_reducer=200map_num=10reducer_num=10queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"#启动job namejob_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"mapper="python mapper.py $DB $TABLE_NAME"reducer="python reducer.py"

$HADOOP fs -rmr $output_path$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \    -jobconf mapred.job.name="$job_name" \    -jobconf mapred.job.queue.name=$queue_name \    -jobconf mapred.map.tasks=$map_num \    -jobconf mapred.reduce.tasks=$reducer_num \    -jobconf mapred.map.capacity=$capacity_mapper \    -jobconf mapred.reduce.capacity=$capacity_reducer \    -input $input_path \    -output $output_path \    -file ./mapper.py \    -file ./reducer.py \    -file ./utils.py \    -file ./"desc.${DB}.${TABLE_NAME}" \    -mapper "$mapper" \    -reducer "$reducer"if [ $? -ne 0 ]; then        echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"fi$HADOOP fs -touchz "${output_path}/$output_mark"rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}

4、这里是Wordcount的进阶版本,第一个功能是分区域统计订单量,第二个功能是在一天中分时段统计订单量。

mapper脚本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口
def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)
def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returnmapper_output(key, {'area': area, 'district': district, 'order_id': order_id, 'count': 1})def mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]mapper_output(key, {'order_id': order_id, 'strive_time': strive_time, 'count': 1, 'day_hour': day_hour})except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)

reducer脚本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route1':reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key == 'route2':reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])

def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('area') or not data.get('district') or not data.get('count'):continuekey = '_'.join([data.get('area'), data.get('district')])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] += int(data.get('count'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('order_id') or not data.get('strive_time') or not data.get('count') or not data.get('day_hour'):continuekey = data.get('day_hour')reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault('count', 0)reducer_result[route_key][key].setdefault('order_list', [])reducer_result[route_key][key]['count'] += int(data.get('count'))if len(reducer_result[route_key][key]['order_list']) < 100:reducer_result[route_key][key]['order_list'].append(data.get('order_id'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])
def serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()

5、上一个版本,遭遇了reduce慢的情况,原因有两个:一是因为route的设置,所有相同的route都将分发到同一个reducer,造成单个reducer处理压力大,性能下降。二是因为集群是搭建在虚拟机上的,性能本身就差。可以对这个问题进行改进。改进版本如下,方案是在mapper阶段先对数据进行初步的统计,缓解reducer的计算压力。

mapper脚本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口
def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict = dict()ret_dict['route_key'] = route_keyret_dict['key'] = keyret_dict.update(value)mapper_output('route_total', ret_dict)def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returntry:# total统计
        mapper_result.setdefault(key, {})mapper_result[key].setdefault('_'.join([area, district]), {})mapper_result[key]['_'.join([area, district])].setdefault('count', 0)mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])mapper_result[key]['_'.join([area, district])]['count'] += 1if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]# total统计
        mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault('count', 0)mapper_result[key][day_hour].setdefault('order_id', [])mapper_result[key][day_hour]['count'] += 1if len(mapper_result[key][day_hour]['order_id']) < 10:mapper_result[key][day_hour]['order_id'].append(order_id)except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)

reducer脚本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route_total':reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif data.get('route_key') == 'route1':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)elif data.get('route_key') == 'route2':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)else:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()

遇到的问题:

1、The DiskSpace /user/bigdata/qa quota of  is exceeded

在reducer结束后,遭遇如上问题,是因为HDFS  路径下的disk容量已经被沾满,释放容量即可;

转载于:https://www.cnblogs.com/kangoroo/p/6151104.html

hadoop streaming编程小demo(python版)相关推荐

  1. Hadoop Streaming编程实例

    Hadoop Streaming是Hadoop提供的多语言编程工具,通过该工具,用户可采用任何语言编写MapReduce程序,本文将介绍几个Hadoop Streaming编程实例,大家可重点从以下几 ...

  2. Hadoop Streaming 编程

    1.概述 Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如: 采用shell脚本语言中的一些命令作为ma ...

  3. spark编程基础python版实验报告_Spark编程基础(Python版)

    章 大数据技术概述 1.1 大数据概念与关键技术 1.1.1 大数据的概念 1.1.2 大数据关键技术 1.2 代表性大数据技术 1.2.1 Hadoop 1.2.2 Spark 1.2.3 Flin ...

  4. 编程语言经典小例题—Python版【持续更新】

    本文用于记录一些学习过程中使用python3写过的小程序,和一些经典的编程小例题. [例题1] 编写一个简单的个人所得税计算器,设定起征点为3500元. print("个人所得税计算器\n& ...

  5. python中的概率例题_编程语言经典小例题—Python版【持续更新】

    本文用于记录一些学习过程中使用python3写过的小程序,和一些经典的编程小例题. [例题1] 编写一个简单的个人所得税计算器,设定起征点为3500元. print("个人所得税计算器\n& ...

  6. python小游戏编程100例_经典编程100例——python版(例9例10)

    最近事情比较多,python还在学习之中,更新速度慢了一些.今天就2例. 例9:一个数如果恰好等于它的因子之和,这个数就称为"完数".如6=1+2+3.编程找出1000之内所有的完 ...

  7. python编程小游戏-python编程游戏有哪些

    python编程游戏有哪些?下面给大家介绍几款由Python开发的游戏: 1.Github上面有个项目Free Python Games,里面集合了不少的Python开发的小游戏,能玩,也适合新手用来 ...

  8. python编程小游戏-python趣味入门——写几个常玩的游戏

    文档介绍 利用python写"猜数字","猜词语","谁是卧底"这三个游戏,从而快速掌握python编程的入门知识,包括python语法/列 ...

  9. python编程300例_经典编程100例——python版(例75)

    例75:一个自然数, 若它的质因数至少是两重的(相同的质因数至少个数为二个, 如36=2*2*3*3)则称该数为"漂亮数".  若相邻两个自然数都是"漂亮数", ...

最新文章

  1. pandas基于元组列表(list of tuples)、列表词典(dictionary of lists)、词典列表(list of dictionaries)构建dataframe数据实战
  2. php 返回map,PHP Ds\Map get()用法及代码示例
  3. ​Efficient GlobalPointer:少点参数,多点效果
  4. c语言中mw shl code,cacoshl - [ C语言中文开发手册 ] - 在线原生手册 - php中文网
  5. java socket 实现增删改查 + 在线答题小案例
  6. access制作封装软件_用Access开发《社工服务管理系统》
  7. 【Elasticsearch】Elasticsearch 中增加分片数量,聚合一定会变快吗?
  8. Sharding-JDBC简介_Sharding-Sphere,Sharding-JDBC分布式_分库分表工作笔记006
  9. 解决微信0day上线CobaltStike的几个问题
  10. Bootstrap的学习笔记
  11. 6.3 API : XGBoost
  12. centos llvm安装_CentOS7.x安装LLVM6.0
  13. 创建学生管理系统java实训1
  14. C盘扩容-Win10
  15. 【EndNote】使用教程
  16. mc服务器 无限夜视,我的世界夜视指令是什么?这里告诉你无限夜视的方法
  17. 使用Fluxion搭建钓鱼热点破解WiFi密码
  18. python-格式化写入xml文件
  19. 超级简单小程序点赞功能
  20. 嵌入式技术栈之spi协议的时序

热门文章

  1. 机器学习中的随机过程_机器学习过程
  2. opencv 创建图像_非艺术家的图像创建(OpenCV项目演练)
  3. 房贷利率上浮30%利息太高吃不消,如何让利率变成下浮10%?
  4. 余额宝升级,限额限时解除,以后再也不用定闹钟抢破头了!
  5. 对身体的挣脱与自足的想象:谈阿满的小说集《双花祭》
  6. 与鬼古女夜晚后海边的“恐怖”聚会
  7. Fedora20配置tftp服务器
  8. HIS中的医学影像信息处理系统(PACS、RIS、LIS)
  9. shell tr 替换 空格_Shell 字符串分隔符!!!(全网最详细总结)
  10. mysql主键用完了怎么办_MySQL 自增 ID 用完了怎么办?