package org.test.CommonDep;/** 主要功能是针对输入为年份tab温度格式的数据,返回每年的最高汽温* 1,partitioner分区,将同一年份的数据放一起。key=年份 温度 value=空 * 2,KeyComparator,key比较算法,让数据先按年份升序排序,如果年份相同,按温度降序。* 3,GroupComparator,同一年份的数据为一组。* 4,输出为每个分组里面的第一个数据* 5,分布式缓存的使用* */
/**
*author:carl.zhang
*email:18510665908@163.com*/
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.Buffer;
import java.util.HashMap;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SecondarySort.IntPair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class PartitionGroupingDemo{enum MYCounter{MISSING,O}public static final Pattern DELIMITER = Pattern.compile("[\t,]");public static class PartitionGroupingMapper  extends Mapper<LongWritable, Text, Text, NullWritable>{HashMap<String,String> cache=new HashMap<String,String>();public void setup(Context context) throws IOException{FileSplit fileSplit=(FileSplit) context.getInputSplit();Configuration conf=context.getConfiguration();String name=conf.get("whoami");}private Text k=new Text();public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException{String  []valueArray = PartitionGroupingDemo.DELIMITER.split(value.toString().trim());if (value.toString().trim().length()==0 || valueArray.length!=2 || value.toString().length()==0) return;if (valueArray[0].length()==0 || valueArray[1].length()==0) return;k.set(valueArray[0]+" "+valueArray[1]);context.write(k,NullWritable.get());}}public static class PartitionGroupingReducer extends Reducer<Text,NullWritable,Text,NullWritable> {public void cleanup(Context context){}HashMap<String,String> cache=new HashMap<String,String>();public void setup(Context context) throws IOException {/*单项的配置信息*/BufferedReader br=new BufferedReader(new InputStreamReader(new FileInputStream("symLink")));String pair=null;while(null!=(pair=br.readLine())){cache.put(pair.split("\t")[0], pair.split("\t")[1]);}
br.close();
}public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {String k=null;if (cache.get(key.toString().split(" ")[0]) != null)k=cache.get(key.toString().split(" ")[0])+" "+key.toString().split(" ")[1];elsek=key.toString();context.write(new Text(k), NullWritable.get());}}public static class FirstPartitioner extends Partitioner<Text,NullWritable>{@Overridepublic int getPartition(Text arg0, NullWritable arg1, int arg2) {// TODO Auto-generated method stubString []keyArray=arg0.toString().split(" ");return Math.abs(Integer.parseInt(keyArray[0].trim())*127)%arg2;}}public static class KeyComparator extends WritableComparator{protected KeyComparator() {super(Text.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {String []keyArray1=((Text)a).toString().split(" ");String []keyArray2=((Text)b).toString().split(" ");int w1Left=Integer.parseInt(keyArray1[0].trim());int w1Right=Integer.parseInt(keyArray1[1].trim());int w2Left=Integer.parseInt(keyArray2[0].trim());int w2Right=Integer.parseInt(keyArray2[1].trim());int result=0;if (w1Left>w2Left)result=1;else if (w1Left<w2Left)result=-1;else if (w1Right>w2Right)result=-1;else if (w1Right<w2Right)result=1;elseresult=0;return result;}}public static class GroupComparator extends WritableComparator{protected GroupComparator(){super(Text.class,true);}@Overridepublic int compare(WritableComparable a,WritableComparable b){String []keyArray1=((Text)a).toString().split(" ");String []keyArray2=((Text)b).toString().split(" ");int w1Left=Integer.parseInt(keyArray1[0].trim());int w1Right=Integer.parseInt(keyArray1[1].trim());int w2Left=Integer.parseInt(keyArray2[0].trim());int w2Right=Integer.parseInt(keyArray2[1].trim());int result=0;if (w1Left>w2Left)result=1;else if (w1Left<w2Left)result=-1;return result;}}
@SuppressWarnings("deprecation")
public static int run(String[] args) throws Exception {// TODO Auto-generated method stubConfiguration conf=new Configuration();//conf.set("mapreduce.framework.name", "yarn");//conf.set("yarn.resourcemanager.address", "192.168.10.225:8032");conf.set("mapreduce.job.jar","/export/workspace/CommonDep/CommonScheduler.jar" );/*配置数据可以直接通过configuration传递*/conf.set("whoami", "carlzhang");/*分布式缓存使用*/String cacheFile="hdfs://192.168.10.225:9000/input/cacheFiles/yeartocode.conf";Path inPath=new Path(cacheFile);String pathLink=inPath.toUri().toString()+"#symLink";DistributedCache.addCacheFile(new URI(pathLink), conf);DistributedCache.createSymlink(conf);String []remaingArgs=new GenericOptionsParser(conf,args).getRemainingArgs();Job job=new Job(conf);job.setJobName("PartitionGroupingDemo");job.setJarByClass(PartitionGroupingDemo.class);job.setNumReduceTasks(2);job.setMapperClass(PartitionGroupingMapper.class);job.setReducerClass(PartitionGroupingReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setPartitionerClass(FirstPartitioner.class);job.setSortComparatorClass(KeyComparator.class);job.setGroupingComparatorClass(GroupComparator.class);/*避免输出空文件*/LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);job.setInputFormatClass(TextInputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));TextOutputFormat.setOutputPath(job,new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;
}
}

mapreduce 的partitioner,GroupComparator,KeyComparator,分布式缓存使用示例相关推荐

  1. 【Flink】Flink Distributed Cache 分布式缓存

    1.美图 2.概述 Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件.此功能可用于共享文件,包含静态的外部数据,例如字典或者machine-learned ...

  2. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  3. MapReduce 分布式缓存 map Side Join

    MapReduce 分布式缓存 文章目录 MapReduce 分布式缓存 前言 一.Map Side join 二.分布式缓存 1.概念 2.代码实现 3.驱动类代码 4.在Yarn上运行代码 前言 ...

  4. Apache Ignite——集合分布式缓存、计算、存储的分布式框架

    Apache Ignite内存数据组织平台是一个高性能.集成化.混合式的企业级分布式架构解决方案,核心价值在于可以帮助我们实现分布式架构透明化,开发人员根本不知道分布式技术的存在,可以使分布式缓存.计 ...

  5. Hadoop - YARN NodeManager 剖析、NodeManger内部架构、分布式缓存、目录结构、状态机管理、Container 生命周期剖、资源隔离

    一 概述 NodeManager是运行在单个节点上的代理 ,它管理Hadoop集群中单个计算节点,功能包括与ResourceManager保持通信,管理Container的生命周期.监控每个Conta ...

  6. hadoop 分布式缓存

    Hadoop 分布式缓存实现目的是在所有的MapReduce调用一个统一的配置文件,首先将缓存文件放置在HDFS中,然后程序在执行的过程中会可以通过设定将文件下载到本地具体设定如下: public s ...

  7. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  8. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  9. 填坑之路——Hadoop分布式缓存

    一.概念介绍 1.分布式缓存的理解:Hadoop为MapReduce框架提供的一种分布式缓存机制,它会将需要缓存的文件分发到各个执行任务的子节点的机器中,各个节点可以自行读取本地文件系统上的数据进行处 ...

  10. 分布式专题-分布式缓存技术之MongoDB01-应用场景及实现原理

    目录导航 前言 什么是 NoSQL 关系型数据库 PK 非关系型数据库 NoSQL 数据库分类 MongoDB的数据结构与关系型数据库数据结构对比 MongoDB中的数据类型 图解MongoDB底层原 ...

最新文章

  1. 第二单元linux系统
  2. 完整的Ubuntu18.04深度学习GPU环境配置,英伟达显卡驱动安装、cuda9.0安装、cudnn的安装、anaconda安装
  3. The 12th Zhejiang Provincial Collegiate Programming Contest - I Earthstone Keeper浙江省赛
  4. 2-1 nodejs和npm的安装和环境搭建
  5. Eslint 配置 + 规则说明 - 综合引入篇
  6. 利用Sigar获取系统信息
  7. 服务器是通用计算机吗,服务器与台式计算机不同吗?有何区别?
  8. 简单的Python购物流程
  9. Java推箱子游戏,文档+源码
  10. [Hadoop in China 2011] 蒋建平:探秘基于Hadoop的华为共有云
  11. Android DataBinding RecyclerView AAPT: error: attribute adapter (aka......) not found.
  12. SpringBoot集成微信支付微信退款
  13. linux转化大小写,linux转换大小写
  14. 软件项目进度安排与跟踪:关键路径的计算
  15. FreeNAS安装过程
  16. [数据空间]浅谈信息革命背景下数字技术的变与不变
  17. CUDA atomic原子操作
  18. 2017年11场下半年热门电子商务相关会议电商峰会合集
  19. 如果一个数字从左边读和从右边读一样,那么这个数字就是一个回文数,例如32123就是一个回文数。 但事实上, 17在某种意义上也是一个回文数,因为它的二进制(10001)是一个回文数。
  20. 艾瑞咨询发布最新AI细分行业报告,多维剖析快商通等多家AI领军企业

热门文章

  1. Windows服务器配置与管理-------DHCP服务器搭建与管理
  2. 怎么设置Linux swap分区?方法教程
  3. hdu3790最短路径问题 (用优先队列实现的)
  4. WP7 手机软件纪念 - 稍后读软件
  5. 多个时间合并并集mysql_写个 Go 时间交并集小工具
  6. classpath路径浅谈
  7. 一个java文件里可以有多个类嘛?
  8. 腾讯云服务器安装AMH控制面板
  9. OpenCV-图像处理(17、Sobel算子)
  10. java radix sort_Java RadixSort