MapReduce编程案例

  • mapreduce解决问题的关键是确定key,只有key相同的结果才会到同一个reduce中进行处理
  • 默认分区使用HashPartitoner,hashCode%reduceNum所有分区个数与reduce个数有关,但是可以自定义Patitionner
  • 没有reduce就没有shuffle过程了,数据进入mapper处理后会直接输出,不再进行分区及之后的操作

reduce端join算法实现

  1. 需求:
    订单数据表t_order
id date pid amount
1001 20191210 P0001 2
1002 20191210 P0001 3
1002 20191210 P0002 3

商品信息表t_product

id pname category_id price
1001 小米8 2000 2
1002 apple X 8000 3

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
  1. 实现机制:
    通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

关联后结果集

/*** 关联后结果集* @author lxf* @version v1.0* @date 2018/4/9 10:56*/
public class JoinBean implements Writable {//orderprivate int orderId;private String date;private String pid;private int amount;//productprivate String productId;private String name;private String category_id;private float price;public JoinBean() {}public void set(int id, String date, String pid, int amount, String productId,String name, String category_id, float price) {this.orderId = id;this.date = date;this.pid = pid;this.amount = amount;this.productId = productId;this.name = name;this.category_id = category_id;this.price = price;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCategory_id() {return category_id;}public void setCategory_id(String category_id) {this.category_id = category_id;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}public String getProductId() {return productId;}public void setProductId(String productId) {this.productId = productId;}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(orderId);dataOutput.writeUTF(date);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(productId);dataOutput.writeUTF(name);dataOutput.writeUTF(category_id);dataOutput.writeFloat(price);}public void readFields(DataInput dataInput) throws IOException {//orderorderId = dataInput.readInt();date = dataInput.readUTF();pid = dataInput.readUTF();amount = dataInput.readInt();//productproductId = dataInput.readUTF();name = dataInput.readUTF();category_id = dataInput.readUTF();price = dataInput.readFloat();}@Overridepublic String toString() {return"orderId=" + orderId +", date='" + date + '\'' +", pid='" + pid + '\'' +", amount=" + amount +", productId=" + productId +", name='" + name + '\'' +", category_id='" + category_id + '\'' +", price=" + price;}
}

订单Bean

/*** 订单Bean,没有作key并且不需要比较,所以不需要实现WritableComparable接口* @author lxf* @version v1.0* @date 2018/4/9 10:07*/
public class OrderBean implements Writable {private int id;private String date;private String pid;private int amount;public OrderBean() {}public void set(int id, String date, String pid, int amount) {this.id = id;this.date = date;this.pid = pid;this.amount = amount;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(id);dataOutput.writeUTF(date);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);}@Overridepublic void readFields(DataInput dataInput) throws IOException {id = dataInput.readInt();date = dataInput.readUTF();pid = dataInput.readUTF();amount = dataInput.readInt();}@Overridepublic String toString() {return  "id=" + id +", date='" + date + '\'' +", pid='" + pid + '\'' +", amount=" + amount;}
}

产品Bean

/*** 产品Bean* @author lxf* @version v1.0* @date 2018/4/9 10:07*/
public class ProductBean implements Writable{private String id;private String name;private String category_id;private float price;public ProductBean() {}public void set(String id, String name, String category_id, float price) {this.id = id;this.name = name;this.category_id = category_id;this.price = price;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getCategory_id() {return category_id;}public void setCategory_id(String category_id) {this.category_id = category_id;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(name);dataOutput.writeUTF(category_id);dataOutput.writeFloat(price);}public void readFields(DataInput dataInput) throws IOException {id = dataInput.readUTF();name = dataInput.readUTF();category_id = dataInput.readUTF();price = dataInput.readFloat();}@Overridepublic String toString() {return "id=" + id +", name='" + name + '\'' +", category_id='" + category_id + '\'' +", price=" + price;}
}

Mapper


/*** @author lxf* @version v1.0* @date 2018/4/9 10:06*/
public class RJoinMapper extends Mapper<LongWritable,Text,Text,ObjectWritable>{private ProductBean productBean = new ProductBean();private OrderBean orderBean = new OrderBean();ObjectWritable objectWritable = new ObjectWritable();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();System.out.println(line);//获取切片信息,切片信息中包含文件信息FileSplit inputSplit = (FileSplit) context.getInputSplit();String name = inputSplit.getPath().getName();String[] fields = line.split(",");String pid = "";//通过文件名确定所属文件if(name.startsWith("product")){pid = fields[0];productBean.set(pid,fields[1],fields[2],Float.parseFloat(fields[3]));objectWritable.set(productBean);}else {pid = fields[2];orderBean.set(Integer.parseInt(fields[0]),fields[1],pid,Integer.parseInt(fields[3]));objectWritable.set(orderBean);}k.set(pid);context.write(k,objectWritable);}
}

Reducer


/*** 以关联字段pid作为key* 具有相同key的订单、产品信息进入到同一个reducer中,在reducer中进行join操作* 若某个产品的订单量太大,而其它产品的订单量很少,则造成某个reducer繁忙,其它reducer闲置,* 造成数据倾斜.* 为解决此问题,可使用map端join* 提前把所有产品信息加载到内存中,map读取每行数据时直接与产品信息进行join* @author lxf* @version v1.0* @date 2018/4/9 10:06*/
public class RJoinReducer extends Reducer<Text,ObjectWritable,JoinBean,NullWritable>{ProductBean productBean = new ProductBean();JoinBean joinBean = new JoinBean();@Overrideprotected void reduce(Text key, Iterable<ObjectWritable> values, Context context) throws IOException, InterruptedException {ArrayList<OrderBean> orderBeans = new ArrayList<OrderBean>();//区分出订单与商品for (ObjectWritable value : values) {try {Object obj = value.get();if (obj instanceof ProductBean) {BeanUtils.copyProperties(productBean, obj);} else {OrderBean orderBean = new OrderBean();BeanUtils.copyProperties(orderBean,obj);orderBeans.add(orderBean);}} catch (Exception e) {e.printStackTrace();}}//拼接两类数据,形成最终结果for (OrderBean orderBean : orderBeans) {joinBean.set(orderBean.getId(),orderBean.getDate(),orderBean.getPid(),orderBean.getAmount(),productBean.getId(),productBean.getName(),productBean.getCategory_id(),productBean.getPrice());context.write(joinBean,NullWritable.get());}}
}

Driver

/*** reduce端进行map* mapreduce join* @author lxf* @version v1.0* @date 2018/4/9 10:04*/
public class RJoinDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);//1.指定本程序的jar包所在的本地路径;2.配置集群信息;3.提交到集群执行job.setJar("D:\\Document\\ideal\\hadoop_itcast\\out\\artifacts\\rjoin_jar\\rjoin.jar");//指定本业务job要使用mapper/Reducer业务类job.setMapperClass(RJoinMapper.class);job.setReducerClass(RJoinReducer.class);//指定mapper输出数据kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(ObjectWritable.class);//指定最终输出数据kv类型job.setOutputKeyClass(JoinBean.class);job.setOutputValueClass(NullWritable.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job,new Path(args[0]));//指定job的输出结果所在目录FileOutputFormat.setOutputPath(job,new Path(args[1]));//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/*job.submit();*/boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
  • 缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
  • 解决方案: mapjoin实现方式

map端join算法实现

  • 原理阐述
    适用于关联表中有小表的情形;
    可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
  • 实现示例
    –先在mapper类中预先定义好小表,并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join进行join
    –引入实际场景中的解决方案:一次加载数据库或者用distributedcache

distributedcache分布式缓存

hadoop提供distributedcache分布式缓存,可以方便的把所需要的数据加载到分布式计算框架中
有了分布式缓存,程序如果依赖第三方jar有两种解决方法

  • 法1. 把所依赖的包打到一个包中,但此时文件会比较大
  • 法2. 使用分布式缓存job.addArchiveToClassPath(Path archive);指定所依赖jar包的路径(本地或hdfs)
job.addArchiveToClassPath(archive);       // 缓存jar包到task运行节点的classpath中,archive为jar包路径,可以为本地路径,也可以是hdfs路径
job.addFileToClassPath(file);            // 缓存普通文件到task运行节点的classpath中
job.addCacheArchive(uri);                // 缓存压缩包文件到task运行节点的工作目录
job.addCacheFile(uri)                  // 缓存普通文件到task运行节点的工作目录

修改后的Mapper

/*** 描述:* map端join解决数据倾斜** @author: lxf* @version: v1.0* @date: 2018/4/10 11:20*/
public class MapSideJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {/***  用一个hashmap来加载保存产品信息表*/private HashMap<String,String> productInfoMap = new HashMap<String,String>();Text k = new Text();/*** 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt")));String line;while (StringUtils.isNotEmpty(line = reader.readLine())){String[] fields = line.split(",");//pid,pnameproductInfoMap.put(fields[0],fields[1]);}reader.close();}/*** 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了* @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String orderLine = value.toString();String[] orderFields = orderLine.split(",");String productName = productInfoMap.get(orderFields[2]);k.set(orderLine + "," + productName);context.write(k,NullWritable.get());}
}

驱动程序

public class MapSideJoinDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MapSideJoinDriver.class);//        job.setJar("D:/SoftDatas/IdeaProjects/hadoop_itcast/out/artifacts/mapSideJoin_jar/mapSideJoin.jar");job.setMapperClass(MapSideJoinMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("D:/localHadoop/data/mapSideJoin/input"));FileOutputFormat.setOutputPath(job,new Path("D:/localHadoop/data/mapSideJoin/output"));// 指定需要缓存一个文件到所有的maptask运行节点工作目录/* job.addArchiveToClassPath(archive); */       // 缓存jar包到task运行节点的classpath中/* job.addFileToClassPath(file); */             // 缓存普通文件到task运行节点的classpath中/* job.addCacheArchive(uri); */                 // 缓存压缩包文件到task运行节点的工作目录/* job.addCacheFile(uri) */                     // 缓存普通文件到task运行节点的工作目录job.addCacheFile(new URI("file:///D:/localHadoop/data/mapSideJoin/cache/product.txt"));//map端join的逻辑不需要reduce阶段,设置reducetask数量为0,否则默认有一个reducerjob.setNumReduceTasks(0);boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}

倒排索引建立

需求:有大量的文本(文档、网页),需要建立搜索索引,计算每个单词在每个文件中的数量
a.txt

hello tom
hello jerry
hello tom

b.txt

hello jerry
hello jerry
tom jerry

c.txt

hello jerry
hello tom

rs.txt结果表

hello a.txt-->3 b.txt-->2 c.txt-->2
jerry a.txt-->1 b.txt-->3 c.txt-->1
tom   a.txt-->2 b.txt-->1 c.txt--1

分析:
数据不能通过一个mapreduce解决,可以通过多个mapreduce串连计算

  • 先以单词-文件名为key,计算出每个单词在每个文件中出现的个数<单词-文件名,num>
  • 再以单词为key计算每个单词在各个文件中出现的次数hello a.txt-->3 b.txt-->2 c.txt-->2

实现

第一步:程序代码

/**
* 以`单词-文件名`为key,计算每个单词出现的次数
*/
public class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,IntWritable> {Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();String fileName = split.getPath().getName();String line = value.toString();String[] words = line.split(" ");for (String word : words) {k.set(word + "--" + fileName);context.write(k,v);}}
}public class IndexStepOneReducer extends Reducer<Text,IntWritable,Text,IntWritable> {IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable wordCount : values) {count += wordCount.get();}v.set(count);context.write(key,v);}
}
/*** 描述:* 索引排序*  单词在每个文件中的数量*  hello a.txt 3 b.txt 2 c.txt 1*  hadoop a.txt 3 b.txt 2 c.txt 1**  中间结果** hello--a.txt    3* hello--b.txt 2* hello--c.txt 2* jerry--a.txt 1* jerry--b.txt 3* jerry--c.txt 1* tom--a.txt   2* tom--b.txt   1* tom--c.txt   1** @author: lxf* @version: v1.0* @date: 2018/4/10 14:55*/
public class IndexStepOneDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(IndexStepOneDriver.class);job.setMapperClass(IndexStepOneMapper.class);job.setReducerClass(IndexStepOneReducer.class);//mapper reducer中输出类型一致,所以可以配置一次job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job,new Path("D:/localHadoop/data/indexStepOne/input"));FileOutputFormat.setOutputPath(job,new Path("D:/localHadoop/data/indexStepOne/output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

第二步:程序代码

/*** 描述:* hello--a.txt   3* hello--b.txt 2* hello--c.txt 2** @author: lxf* @version: v1.0* @date: 2018/4/10 15:20*/
public class IndexStepTwoMapper extends Mapper<LongWritable,Text,Text,Text> {Text k = new Text();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] wordFileCounts = line.split("--");k.set(wordFileCounts[0]);v.set(wordFileCounts[1]);context.write(k,v);}
}public class IndexStepTwoReducer extends Reducer<Text,Text,Text,Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();for (Text value : values) {sb.append(value + " ");}v.set(sb.toString());context.write(key,v);}
}/*** 描述:* 将中间结果处理为最终结果* hello a.txt 3 b.txt 2 c.txt 1* hadoop a.txt 3 b.txt 2 c.txt 1** @author: lxf* @version: v1.0* @date: 2018/4/10 15:18*/
public class IndexStepTwoDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(IndexStepTwoDriver.class);job.setMapperClass(IndexStepTwoMapper.class);job.setReducerClass(IndexStepTwoReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path("D:/localHadoop/data/indexStepOne/output"));FileOutputFormat.setOutputPath(job,new Path("D:/localHadoop/data/indexStepTwo/output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

社交粉丝数据分析

以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的),求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

分析:

  1. 先求出A/B/C…都是那些人的共同好友
A:B C D
B:A
C:A B
D:A C
  1. 将好友所属的人两两合并,即可得两两共同好友
 B C: AB D: AC D: A

伪代码分析

第一步
map
读一行   A:B,C,D,F,E,O
输出    <B,A><C,A><D,A><F,A><E,A><O,A>
再读一行   B:A,C,E,K
输出   <A,B><C,B><E,B><K,B>reduce
拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......
输出:
<A-B,C>
<A-E,C>
<A-F,C>
<A-G,C>
<B-E,C>
<B-F,C>....第二步
map
读入一行<A-B,C>
直接输出<A-B,C>reduce
读入数据  <A-B,C><A-B,F><A-B,G>.......
输出: A-B  C,F,G,.....

StepOne

public class ShareFriendsStepOneMapper extends Mapper<LongWritable,Text,Text,Text> {Text k = new Text();Text v = new Text();/**** @param key* @param value A:B,C,D,F,E,O* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] personFriends = line.split(":");String person = personFriends[0];String[] friends = personFriends[1].split(",");v.set(person);for (String friend : friends) {k.set(friend);// 输出<好友,人>context.write(k,v);}}
}/*** 描述:* <好友:人,人,人>** @author: lxf* @version: v1.0* @date: 2018/4/10 16:28*/
public class ShareFriendsStepOneReducer extends Reducer<Text,Text,Text,Text> {Text k = new Text();Text v = new Text();@Overrideprotected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text person : persons) {sb.append(person).append(",");}k.set(friend);v.set(sb.toString());context.write(k,v);}
}
public class ShareFriendsStepOneDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ShareFriendsStepOneDriver.class);job.setMapperClass(ShareFriendsStepOneMapper.class);job.setReducerClass(ShareFriendsStepOneReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path("D:/localHadoop/data/shareFriendsStepOne/input"));FileOutputFormat.setOutputPath(job,new Path("D:/localHadoop/data/shareFriendsStepOne/output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

StepTwo


/*** 描述:* A I,K,C,B,G,F,H,O,D,* B   A,F,J,E,* C A,E,B,H,F,G,K,* D   G,C,K,A,L,F,E,H,* E G,M,L,H,A,F,B,D,* F L,M,D,C,G,A,* G M,* H   O,* I   O,C,* J O,* K   B,* L   D,E,* M E,F,* O A,H,I,J,F,* 好友 人,人,人,人* <p>* 获取:人人,好友** @author: lxf* @version: v1.0* @date: 2018/4/10 16:29*/
public class ShareFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {Text k = new Text();Text v = new Text();/*** 拿到的数据是上一个步骤的输出结果* A I,K,C,B,G,F,H,O,D,* 友 人,人,人** @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] friendPersons = line.split("\t");String friend = friendPersons[0];String[] persons = friendPersons[1].split(",");//避免出现A-B,B-A相同关系进入不同reducer中Arrays.sort(persons);v.set(friend);for (int i = 0; i < persons.length - 2; i++) {for (int j = i + 1; j < persons.length - 1; j++) {// 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去k.set(persons[i] + "-" + persons[j]);context.write(k, v);}}}
}/*** 描述:* 输入:人人,好友* 输出:人人 好友,好友,好友,好友* @author: lxf* @version: v1.0* @date: 2018/4/10 16:29*/
public class ShareFriendsStepTwoReducer extends Reducer<Text,Text,Text,Text> {Text k = new Text();Text v = new Text();@Overrideprotected void reduce(Text personPerson, Iterable<Text> friends, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : friends) {sb.append(friend).append(",");}k.set(personPerson);v.set(sb.toString());context.write(k,v);}
}
/**** @author: lxf* @version: v1.0* @date: 2018/4/10 16:28*/
public class ShareFriendsStepTwoDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ShareFriendsStepTwoDriver.class);job.setMapperClass(ShareFriendsStepTwoMapper.class);job.setReducerClass(ShareFriendsStepTwoReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path("D:/localHadoop/data/shareFriendsStepOne/output"));FileOutputFormat.setOutputPath(job,new Path("D:/localHadoop/data/shareFriendsStepTwo/output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

web日志预处理

# ip timestamp method refferr stateCode bytes os
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
  • 需求:
    对web访问日志中的各字段识别切分
    去除日志中不合法的记录
    根据KPI统计需求,生成各类访问请求过滤数据
  • 实现代码:
    a) 定义一个bean,用来记录日志数据中的各数据字段
public class WebLogBean {private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息private boolean valid = true;// 判断数据是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}/*** 使用二进行控制分隔符,避免与正文内容冲突*/@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString();
}
}

b)定义一个parser用来解析过滤web访问日志原始记录

public class WebLogParser {public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);webLogBean.setTime_local(arr[3].substring(1));webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);if (arr.length > 12) {webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static String parserTime(String time) {time.replace("/", "-");return time;}
}

c) mapreduce程序

public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);if (!webLogBean.isValid())return;k.set(webLogBean.toString());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));       job.waitForCompletion(true);        }
}

自定义inputFormat

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

小文件的优化无非以下几种方式:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 在业务处理之前,在HDFS上使用mapreduce程序(集群方式)对小文件进行合并
  3. mapreduce处理时,可采用combineInputFormat提高效率

实现

本节实现的是上述第二种方式
程序的核心机制:

  • 自定义一个InputFormat,设置切片规则,使每个小文件不可分片,避免每个文件使用一个mapper,这样可以使多个文件使用同一个mapper去合并数据
  • 改写RecordReader,实现一次读取一个完整文件封装为KV(文件名,文件内容),可以确保合并后的文件有序,并且可以方便找到原始文件内容
  • 在输出时使用SequenceFileOutPutFormat输出合并文件

代码如下:
自定义InputFromat

public class WholeFileInputFormat extendsFileInputFormat<NullWritable, BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}
}

自定义RecordReader

/*** * RecordReader的核心工作逻辑:* 通过nextKeyValue()方法去读取数据,构造将返回的key   value* 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value* @author lxf**/
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}/*** 返回当前进度*/@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {// do nothing}
}

定义mapreduce处理流程

public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}
}public class SmallFilesToSequenceFileConverterDriver extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();/*System.setProperty("HADOOP_USER_NAME", "hadoop");*/
//      String[] otherArgs = new GenericOptionsParser(conf, args)
//              .getRemainingArgs();
//      if (otherArgs.length != 2) {//          System.err.println("Usage: combinefiles <in> <out>");
//          System.exit(2);
//      }
//Job job = Job.getInstance(conf, "combine small files to sequencefile");job.setJarByClass(SmallFilesToSequenceFileConverterDriver.class);job.setInputFormatClass(WholeFileInputFormat.class);
//      输出文件类型job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);
//        不指定reducer默认使用一个reducerjob.setMapperClass(SequenceFileMapper.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}/*** 第二种方式启动mapreduce* @param args* @throws Exception*/public static void main(String[] args) throws Exception {args = new String[]{"D:/localHadoop/data/combineFile/input", "D:/localHadoop/data/combineFile/output"};int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverterDriver(),args);System.exit(exitCode);}
}

自定义outputFormat

现有一些原始日志需要做增强解析处理,流程:

  1. 从原始日志文件中读取数据
  2. 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
  3. 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

实现

实现要点:

  1. mapreduce中访问外部资源
  2. 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

代码实现如下:
数据库获取数据的工具

public class DBLoader {public static void dbLoader(HashMap<String, String> ruleMap) {Connection conn = null;Statement st = null;ResultSet res = null;try {Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root");st = conn.createStatement();res = st.executeQuery("select url,content from urlcontent");while (res.next()) {ruleMap.put(res.getString(1), res.getString(2));}} catch (Exception e) {e.printStackTrace();} finally {try{if(res!=null){res.close();}if(st!=null){st.close();}if(conn!=null){conn.close();}}catch(Exception e){e.printStackTrace();}}}public static void main(String[] args) {DBLoader db = new DBLoader();HashMap<String, String> map = new HashMap<String,String>();db.dbLoader(map);System.out.println(map.size());}
}

自定义一个outputformat


/*** 描述:* 自定义输出格式**  maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter*  然后再调用RecordWriter的write(k,v)方法将数据写出* @author: lxf* @version: v1.0* @date: 2018/4/13 14:18*/
public class WebLogEnhanceOutputFormat extends FileOutputFormat<Text,NullWritable> {/*** 获取RecordWriter* @param taskAttemptContext* @return* @throws IOException* @throws InterruptedException*/@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {FileSystem fs = FileSystem.get(taskAttemptContext.getConfiguration());/*** 自定义数据输出路径*/
//        Path enhancePath = new Path("hdfs://master:9000/flow/enhancelog/enhanced.log");       Path enhancePath = new Path("D:/localHadoop/data/weblogenhance/enhance/log.dat");Path tocrawlPath = new Path("D:/localHadoop/data/weblogenhance/crw/url.dat");FSDataOutputStream enhancedOs = fs.create(enhancePath);FSDataOutputStream tocrawlOs = fs.create(tocrawlPath);return new WebLogEnhanceRecordWriter(enhancedOs,tocrawlOs);}
}
/*** 描述:* 自定义记录输出** @author: lxf* @version: v1.0* @date: 2018/4/13 14:20*/
public class WebLogEnhanceRecordWriter extends RecordWriter<Text,NullWritable> {FSDataOutputStream enhancedOs = null;FSDataOutputStream tocrawlOs = null;public WebLogEnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {super();this.enhancedOs = enhancedOs;this.tocrawlOs = tocrawlOs;}@Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {String result = text.toString();// 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.datif (result.contains("toCrawl")) {tocrawlOs.write(result.getBytes());} else {// 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.datenhancedOs.write(result.getBytes());}}@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {if (tocrawlOs != null) {tocrawlOs.close();}if (enhancedOs != null) {enhancedOs.close();}}
}

开发mapreduce处理流程

/*** 这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行原始日志后面)* * @author* */
public class WebLogEnhancerMapper extends Mapper<LongWritable,Text,Text,NullWritable> {Map<String, String> ruleMap = new HashMap<String, String>();Text k = new Text();NullWritable v = NullWritable.get();/*** 从数据库中加载规则信息倒ruleMap中* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {DBLoader.dbLoader(ruleMap);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称,输出到屏幕Counter counter = context.getCounter("malformed", "malformedLine");String line = value.toString();String[] fields = StringUtils.split(line,"\t");try {String url = fields[26];String content_tag = ruleMap.get(url);// 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志if (content_tag == null) {if(url.startsWith("http://")){k.set(url + "\t" + "toCrawl" + "\n");}else {k.set(url + "\t" + "inValid" + "\n");counter.increment(1);}context.write(k, v);} else {k.set(line + "\t" + content_tag + "\n");context.write(k, v);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}/*** 描述:* 现有一些原始日志需要做增强解析处理,流程:* 1、  从原始日志文件中读取数据* 2、    根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志* 3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录** 程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,* 这类灵活的输出需求可以通过自定义outputformat来实现** 实现要点:* 1、    在mapreduce中访问外部资源* 2、   自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()** @author: lxf* @version: v1.0* @date: 2018/4/12 9:15*/
public class WebLogEnhancerDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WebLogEnhancerDriver.class);job.setMapperClass(WebLogEnhancerMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法job.setOutputFormatClass(WebLogEnhanceOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:/localHadoop/data/weblogenhance/input"));// 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat// 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出pathFileOutputFormat.setOutputPath(job, new Path("D:/localHadoop/data/weblogenhance/output"));// 不需要reducerjob.setNumReduceTasks(0);job.waitForCompletion(true);System.exit(0);}}

自定义GroupingComparator

有如下订单数据,现在需要求出每一个订单中成交金额最大的一笔交易

订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8

分析

方法1. 订单id作为key 其它数据作为value,同一订单发往同一reducer,reducer自定义排序,找出最大一笔交易
缺点:需要缓存所有订单数据,资源消耗大,效率不高

方法2. 使用map reducer框架排序机制

  1. 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
  2. reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

代码

定义订单Bean

/*** 描述:*     订单Bean** @author: lxf* @version: v1.0* @date: 2018/4/11 16:04*/
public class OrderBean implements WritableComparable<OrderBean> {/*** 使用对象为了使用其中的compareTo()*/private Text orderId;private Text productName;private DoubleWritable amount;public OrderBean() {}public void set(Text orderId, Text productName, DoubleWritable amount) {this.orderId = orderId;this.productName = productName;this.amount = amount;}public Text getOrderId() {return orderId;}public void setOrderId(Text orderId) {this.orderId = orderId;}public Text getProductName() {return productName;}public void setProductName(Text productName) {this.productName = productName;}public DoubleWritable getAmount() {return amount;}public void setAmount(DoubleWritable amount) {this.amount = amount;}/*** 用来对数据进行排序* shuffle过程中的同一分区数据排序* 同一订单,按价格倒序排列* @param orderBean* @return*/@Overridepublic int compareTo(OrderBean orderBean) {int tmp = this.orderId.compareTo(orderBean.getOrderId());if (tmp == 0){tmp = -this.amount.compareTo(orderBean.getAmount());}return tmp;}@Overridepublic void write(DataOutput output) throws IOException {output.writeUTF(orderId.toString());output.writeUTF(productName.toString());output.writeDouble(amount.get());}@Overridepublic void readFields(DataInput input) throws IOException {this.orderId = new Text(input.readUTF());this.productName = new Text(input.readUTF());this.amount = new DoubleWritable(input.readDouble());}@Overridepublic String toString() {return "MaxOrder:" +"orderId=" + orderId +", productName=" + productName +", amount=" + amount;}
}

自定义GroupingComparator用来对分区内数据进行分组

/*** 描述:* 利用reduce端的GroupingComparator来实现将一组bean看成相同的key** GroupingComparator用来对数据进行分组* @author: lxf* @version: v1.0* @date: 2018/4/11 16:02*/
public class OrderIdGroupingComparator extends WritableComparator{/*** 传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象*/public OrderIdGroupingComparator() {super(OrderBean.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//比较两个bean时,指定只比较bean中的orderIdreturn abean.getOrderId().compareTo(bbean.getOrderId());}
}

自定义分区方法OrderIdPartitioner对数据进行分区

/*** 描述:*     自定义分区方法* OrderIdPartitioner用来对数据进行分区* @author: lxf* @version: v1.0* @date: 2018/4/11 16:18*/
public class OrderIdPartitioner extends Partitioner<OrderBean,NullWritable> {/*** 相同orderId的订单bean,会发往相同的partition* 而且,产生的分区数,是会跟用户设置的reduce task数保持一致* @param orderBean* @param nullWritable* @param numReduceTasks reducesTask个数* @return*/@Overridepublic int getPartition(OrderBean orderBean, NullWritable nullWritable, int numReduceTasks) {/*** orderId相同的bean进入同一partition*/return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE % numReduceTasks);}
}

mapreduce程序

/*** 描述:*  订单id  商品id    成交金额*  Order_0000001    Pdt_01  222.8** @author: lxf* @version: v1.0* @date: 2018/4/11 16:02*/
public class MaximumProductMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable>{OrderBean orderBean = new OrderBean();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split(",");orderBean.set(new Text(fields[0]),new Text(fields[1]),new DoubleWritable(Double.parseDouble(fields[2])));context.write(orderBean,v);}
}/*** 描述:** @author: lxf* @version: v1.0* @date: 2018/4/11 16:02*/
public class MaximumProductReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable>{/*** 到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位* 程序内部不是先遍历数据进行分区的,数据量大、资源消耗太大,* 调用reduce()方法时,依次迭代相同分区的排序过的数据* 1.先传入一个key,然后调用hasNext()判断是否存在下一个值* 2.hasNext()存在,然后调用GroupingComparator判断是否属于同一组* 3.若是同一组,继续调用hasNext()调用步骤2* 4.若不是同一组,调用步骤1* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}
}
public class MaximumProductDriver {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MaximumProductDriver.class);job.setMapperClass(MaximumProductMapper.class);job.setReducerClass(MaximumProductReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("D:/localHadoop/data/groupComparator/input"));FileOutputFormat.setOutputPath(job, new Path("D:/localHadoop/data/groupComparator/output"));//在此设置自定义的Groupingcomparator类job.setGroupingComparatorClass(OrderIdGroupingComparator.class);//在此设置自定义的partitioner类job.setPartitionerClass(OrderIdPartitioner.class);job.setNumReduceTasks(2);job.waitForCompletion(true);}}

计数器应用

在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现

public class MultiOutputs {//通过枚举形式定义自定义计数器enum MyCounter{MALFORORMED,NORMAL}static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(",");for (String word : words) {context.write(new Text(word), new LongWritable(1));}//对枚举定义的自定义计数器加1context.getCounter(MyCounter.MALFORORMED).increment(1);//通过动态设置自定义计数器加1context.getCounter("counterGroupa", "countera").increment(1);}}

多job串联

一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现

      ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());cJob1.setJob(job1);cJob2.setJob(job2);cJob3.setJob(job3);// 设置作业依赖关系cJob2.addDependingJob(cJob1);cJob3.addDependingJob(cJob2);JobControl jobControl = new JobControl("RecommendationJob");jobControl.addJob(cJob1);jobControl.addJob(cJob2);jobControl.addJob(cJob3);// 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束Thread jobControlThread = new Thread(jobControl);jobControlThread.start();while (!jobControl.allFinished()) {Thread.sleep(500);}jobControl.stop();return 0;

方法2,使用shell脚本自定义执行顺序,方便组合

大数据 - MapReduce编程案例 -BH3相关推荐

  1. Hadoop大数据--Mapreduce编程规范及入门示例

    Mapreduce是一个分布式的运算编程框架,核心功能是将用户编写的核心逻辑代码分布式地运行在一个集群的很多服务器上. Mapreduce的存在价值 (1)海量数据在单机上处理因为硬件资源限制,无法胜 ...

  2. 大数据Mapreduce编程——矩阵乘法

    编程要求 完成矩阵乘法的 Map 函数和 Reduce 函数 1.设计两个矩阵(3050,50100),在每个单元格中填入一个 0-99 的随机数,并写入 两个文件中,作为 Map 函数的输入 2.测 ...

  3. 大数据||MapReduce编程模板

    标准模板代码 package com.lizh.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Co ...

  4. ma-云计算 大数据 mapreduce概念和关系

    0 云计算: 什么是云计算?针对这个问题,恐怕十个专家会给出十一个互不相同的答案,而事实上也有无数的文章从各个角度试图给出一个简单而确切的定义. 在最肤浅的级别上来说,原来基于web 2.0技术开发的 ...

  5. 【赵强老师】MapReduce编程案例之求工资总额

    先看视频. [赵强老师]MapReduce编程案例之求工资总额 Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上, ...

  6. 《深入理解Hadoop(原书第2版)》——1.3大数据的编程模型

    本节书摘来自华章计算机<深入理解Hadoop(原书第2版)>一书中的第1章,第1.3节,作者 [美]萨米尔·瓦德卡(Sameer Wadkar),马杜·西德林埃(Madhu Siddali ...

  7. 大数据培训课程数据清洗案例实操-简单解析版

    数据清洗(ETL) 在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据.清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序.大数据培训 数据 ...

  8. 大数据基础编程第二版(林子雨)官网,代码与软件资源

    官网:大数据基础编程.实验和案例教程(第2版)教材官网_厦门大学数据库实验室 (xmu.edu.cn) 教材对应使用软件.代码,百度网盘:百度网盘 请输入提取码 (baidu.com) 提取码请至官网 ...

  9. MapReducer——MapReduce编程案例:求部门的工资总额(2)

    MapReduce编程案例:求部门的工资总额 1.员工表  SQL:select deptno,sum(sal) from emp group by deptno; 2.分析数据处理的过程 3.开发程 ...

最新文章

  1. Image Super-Resolution Using Deep Convolutional Networks
  2. 批量获取成员机管理员组用户信息
  3. 对现有的所能找到的DDOS代码(攻击模块)做出一次分析----CC篇
  4. 使用MVCPager做AJAX分页所需要注意的地方
  5. .NET应用迁移到.NET Core(三)从商业角度看移植过程
  6. Z-Blog 扩展数据库 字段 二次开发
  7. Python用subprocess的Popen来调用系统命令
  8. [原创]Android SDK下载(Linux下载SDK最新教程2020.11.26)
  9. 期刊论文分析的技巧与程序
  10. 公众号采集文章插件下载-支持各大网站自动采集发布的公众号插件
  11. 惠普136w墨粉量低_打印机墨粉量低怎么处理_打印机显示墨粉量低解决方法
  12. php工程师的学习之道以及需要掌握的知识体系
  13. 图片链接(a标签和img标签的使用)
  14. 用例文档应该包括哪些内容
  15. 【智能优化算法】基于融合改进 Logistics 混沌和正弦余弦算子的自适应 t 分布海鸥算法求解单目标优化问题附matlab代码
  16. Android程序员必备,offer拿到手软
  17. 阅读javascript高级程序设计随笔(五)
  18. callback 回调函数
  19. 极客日报:阿里再度调整组织架构:天猫淘宝大融合,新设三大中心;苹果M1首席芯片设计师跳槽至英特尔
  20. 物联网产业到2023年连接数将突破20亿

热门文章

  1. python代码完成Fisher判别
  2. Learning records1:nltk安装的一些注意点(学习自用)
  3. button按钮的tittle 折行且居中显示
  4. 一段代码之仿LOL移动方式
  5. ps磨皮滤镜插件Portraiture for Mac 破解方法
  6. 华为防火墙基础自学系列 | IKE介绍
  7. 常见端口对应服务及入侵方式
  8. Hadoop3.2.0 HDFS HA ( Quorum Journal Manager )
  9. 您需要administrator权限才能对此文件进行更改
  10. 小型机、PC服务器、大型机常识