听了超哥的一席课后逐渐明白了partition,记录一下自己的理解!(thanks 超哥)

package partition;import java.io.IOException;
import java.net.URI;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** @ClassName: FlowCount2 * @Description: TODO(这里用一句话描述这个类的作用) * @author zhangweixiang* @date 2014年3月6日 下午3:27:56*/
/*** 分区的例子必须打成jar运行* 用处: 1.根据业务需要,产生多个输出文件*       2.多个reduce任务在运行,提高整体job的运行效率*/
public class FlowCount2 {public static final String INPUT_PATH = "hdfs://192.168.0.9:9000/wlan2";public static final String OUT_PATH = "hdfs://192.168.0.9:9000/myout";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = new Job(conf, FlowCount2.class.getSimpleName());//指定打包的jarjob.setJarByClass(FlowCount2.class);// 1.1指定输入文件的路径FileInputFormat.addInputPath(job, new Path(INPUT_PATH));// 指定输入信息的格式化类job.setInputFormatClass(TextInputFormat.class);// 1.2指定自定义map类job.setMapperClass(MyMapper.class);// 设置map输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowWritable.class);// 1.3指定分区job.setPartitionerClass(MyPartition.class);// 设置reduce的任务个数,由于map输出后建立了两个分区,所以应该设置两个reduce任务输出到不同的文件(一个分区对应一个reduce任务)job.setNumReduceTasks(2);// 1.4排序,分组// 1.5规约// 2.2指定自定义的reduce类job.setReducerClass(MyReduce.class);// 设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowWritable.class);// 设置输出格式化类job.setOutputFormatClass(TextOutputFormat.class);// 如果输出文件路径存在则删除FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH),new Configuration());Path path = new Path(OUT_PATH);if (fileSystem.exists(path)) {fileSystem.delete(path, true);}// 2.3指定输出路径FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// 提交任务job.waitForCompletion(true);}static class MyMapper extendsMapper<LongWritable, Text, Text, FlowWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 分割行String[] split = value.toString().split("\t");// 获取用户电话号码String mobile = "";long upPackNum = 0l;long downPackNum = 0l;long upPayLoad = 0l;long downPayLoad = 0l;// 符合规范的电话号码if (!("".equals(split[2]))) {mobile = split[2];// 获取流量信息if (!("".equals(split[21]))) {upPackNum = Long.parseLong(split[21]);}if (!("".equals(split[22]))) {downPackNum = Long.parseLong(split[22]);}if (!("".equals(split[23]))) {upPayLoad = Long.parseLong(split[23]);}if (!("".equals(split[24]))) {downPayLoad = Long.parseLong(split[24]);}FlowWritable flowWritable = new FlowWritable(upPackNum,downPackNum, upPayLoad, downPayLoad);context.write(new Text(mobile), flowWritable);}}}static class MyReduce extendsReducer<Text, FlowWritable, Text, FlowWritable> {@Overrideprotected void reduce(Text k2, Iterable<FlowWritable> v2s,Context context) throws IOException, InterruptedException {long upPackNum = 0l;long downPackNum = 0l;long upPayLoad = 0l;long downPayLoad = 0l;for (FlowWritable flowWritable : v2s) {upPackNum += flowWritable.upPackNum;downPackNum += flowWritable.downPackNum;upPayLoad += flowWritable.upPayLoad;downPayLoad += flowWritable.downPayLoad;}FlowWritable flowWritable = new FlowWritable(upPackNum,downPackNum, upPayLoad, downPayLoad);context.write(k2, flowWritable);}}}
package partition;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** @ClassName: flowWritable* @Description: 自定义类型实现Writable接口,包含四个参数(upPackNum 上行包, downPackNum 下行包,*               upPayLoad 发送流量,downPayLoad 下载流量)* @author zhangweixiang* @date 2014年3月5日 上午11:37:10*/
public class FlowWritable implements Writable {public long upPackNum;public long downPackNum;public long upPayLoad;public long downPayLoad;public FlowWritable() {// TODO Auto-generated constructor stub}public FlowWritable(long upPackNum, long downPackNum, long upPayLoad,long downPayLoad) {this.upPackNum = upPackNum;this.downPackNum = downPackNum;this.upPayLoad = upPayLoad;this.downPayLoad = downPayLoad;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upPackNum);out.writeLong(downPackNum);out.writeLong(upPackNum);out.writeLong(downPayLoad);}@Overridepublic void readFields(DataInput in) throws IOException {this.upPackNum = in.readLong();this.downPackNum = in.readLong();this.upPayLoad = in.readLong();this.downPayLoad = in.readLong();}/** (非 Javadoc)* * * @return* * @see java.lang.Object#toString()*/@Overridepublic String toString() {return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"+ downPayLoad;}}
package partition;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/*** @ClassName: MyPartition * @Description: 根据电话号码分区,正规号码分区代号为0,非正规号码分区为1(在此建立了两个分区,即会产生两个reduce任务输出到不同的文件0和1)* @param K k2(map输出的键), V v2(map输出的值)* @author zhangweixiang* @date 2014年3月6日 下午3:02:29*/
public class MyPartition extends HashPartitioner<Text,FlowWritable>{@Overridepublic int getPartition(Text key, FlowWritable value, int numReduceTasks) {int p=0;if(key.toString().length()!=11){p=1;}return p;}
}

注:必须要达成jar包上传到linux下执行(我开始没有打成jar包直接在eclipse下执行抛了异常)

执行完成后会产生两个文件(part-r-00000和part-r-00001)分别记录不同条件的信息。

eclipse直接运行抛的异常:

14/03/06 15:41:13 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for 10.80.203.79 (1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:120)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
14/03/06 15:41:14 INFO mapred.JobClient:  map 0% reduce 0%
14/03/06 15:41:14 INFO mapred.JobClient: Job complete: job_local_0001
14/03/06 15:41:14 INFO mapred.JobClient: Counters: 0

记录超哥的总结:

分区的例子必须打成jar运行* 用处:   
 *1.根据业务需要,产生多个输出文件* 2.多个reduce任务在运行,提高整体job的运行效率

mapreduce之partition分区相关推荐

  1. MapReduce之Partition分区实例操作

    1.需求: 将统计结果按照手机归属地不同省份输出到不同文件中(分区) 2.案例数据: phone_data.txt 如下: 1363157985066 13726230503 00-FD-07-A4- ...

  2. Partition分区及实例

    Partition分区及实例 Partition分区介绍 概念 自定义分区步骤 分区总结: Partition分区案例 需求 1.需求说明 2.文件 案例分析 1.需求 输入数据 期望输出数据 实现分 ...

  3. Partition分区的使用案例

    Partition分区的使用案例: 将统计结果按照条件输出到不同文件中(分区) 文章目录 1)需求 2)需求分析 3)编程实现 1.创建Partitioner类 2.创建Bean类 3.创建Mappe ...

  4. Partition分区

    文章目录 Partition分区 1.默认partitioner分区 2.自定义Partitioner步骤 3.注意事项 4.案例 Partition分区 需求:按照条件输出到不同文件中. 案例:按照 ...

  5. mysql的partition分区

    前言:当一个表里面存储的数据特别多的时候,比如单个.myd数据都已经达到10G了的话,必然导致读取的效率很低,这个时候我们可以采用把数据分到几张表里面来解决问题. 方式一:通过业务逻辑根据数据的大小通 ...

  6. mysql in partition_MySQL Partition分区扫盲

    MySQL从5.1.3开始支持Partition,你可以使用如下命令来确认你的版本是否支持Partition: mysql> SHOW VARIABLES LIKE '%partition%'; ...

  7. kafka专题:kafka的Topic主题、Partition分区、消费组偏移量offset等基本概念详解

    文章目录 1. kafka集群整体架构 2. kafka相关元素的基本概念 2.1 主题Topic和分区Partition 2.2 kafka消息存储在哪里? 2.3 分区副本 2.4 消费组和偏移量 ...

  8. maxvalue mysql自动分区_mysql的partition分区

    前言:当一个表里面存储的数据特别多的时候,比如单个.myd数据都已经达到10G了的话,必然导致读取的效率很低,这个时候我们可以采用把数据分到几张表里面来解决问题. 方式一:通过业务逻辑根据数据的大小通 ...

  9. MapReduce分片、分区、分组 傻傻分不清

    MapReduce分片.分区.分组关系图 分片 对于HDFS中存储的一个文件,要进行Map处理前,需要将它切分成多个块,才能分配给不同的MapTask去执行.分片的数量等于启动的MapTask的数量. ...

最新文章

  1. Linux查看文件内容的5种方式
  2. 关于vmstat,top,ps aux查看的cpu占用率不一致的问题
  3. 云闪付怎么设置不跳华为支付_【教程】华为Pay用闪付券撸京东E卡!
  4. 超长整数相加 c语言类,二个超长正整数的相加
  5. 计算机可执行指令吧,电脑“开始-运行”的常用命令及用法!很有用!
  6. FIND_IN_SET 精确查找
  7. 排序1+1:冒泡排序法(BubbleSort)的改进以及效率比较
  8. linux 杀掉php,Linux_在Linux系统中使用xkill命令杀掉未响应的进程,我们如何在Linux中杀掉一个资 - phpStudy...
  9. SQLServer数据库获取重复记录中日期最新的记录
  10. 大学英语计算机开学考试试题,2018年全国大学英语四级考试阅读理解试题:学习计算机...
  11. CLRS10.1-6练习 - 用双栈实现队列
  12. 残差平方和(RSS)、均方误差(MSE)、均方根误差(RMSE)、平均绝对误差(MAE)
  13. sublime Boxy Theme安装方法
  14. 膜态沸腾UDF【转载】
  15. 李笑来和 stormzhang,其实是一类人
  16. windows下查看错误码与错误信息
  17. [oeasy]python0088_字节_Byte_存储单位_KB_MB_GB_TB
  18. 事件推送网关:让cmdb告别“花瓶”
  19. 修改hosts文件(win10版)
  20. wifi模块有哪些关键指标?如何选择wifi模块?ESP32-S3Wi-Fi模组

热门文章

  1. 调用python接口并画图_【PySpark源码解析】教你用Python调用高效Scala接口
  2. cobar mysql cluster_Cobar使用文档(可用作MySQL大型集群解决方案)
  3. Elasticsearch索引备份与清理
  4. 用Python实现一个SVM分类器策略
  5. Node.js构建可扩展的Web应用1
  6. vue+express 构建后台管理系统
  7. 多线程并发神器--ThreadLocal
  8. OA应用分析:机电企业如何选型OA系统
  9. pku 3270 Cow Sorting 置换群
  10. 如何使用你手中的利器