一、MapReduce编程规范

Mapper阶段extends Mapper<LongWritable, Text, Text, IntWritable>LongWritable, Text 输入K1V1Text, IntWritable 输出K2V2map处理我们的业务逻辑MapTask中运行的每个K1V1执行一次Reducer阶段extends Reducer<Text, IntWritable, Text, IntWritable>Text, IntWritable:Reducer的输入数据类型 = Mapper的输出类型Text, IntWritable:Reducer的输出类型reduce完成业务逻辑操作是在ReduceTask中执行相同的key对应调用一次Driver组装设置运行的主类Mapper/ReducerMap的输出KV类型Reduce的输出KV类型输入输出:输出一定要配置,不能事先存在

二、序列化

XXXWritable、Text:序列化对象
序列化:Hadoop、Spark、Flink
内存中的对象 ==> 字节数组
反序列化:字节数组 ==> 内存中的对象
分布式计算框架里,是需要序列化/反序列化 网络传输

Java:
Hadoop自定义序列化实现 Writable
紧凑 速度 扩展 互操作

需求:Key Value是普通的Hadoop build-in支持不了
==> 自定义序列化类

public interface WritableComparable extends Writable, Comparable {

Writable 自定义序列化类的顶层类
每个手机号上下行流量和以及总流量
==>
手机号:第二个字段
上行流量:倒数第三个字段
下行流量:倒数第二个字段
总流量: 上行+下行

Mapper: <LongWritable, Text, Text,Access>
Access:phone,up,down,sum

Reducer:<Text,Access,NullWritable,Access>

java.lang.NoSuchMethodException:
com.ruozedata.bigdata.hadoop.mapreduce.ser.Access.()
构造器的问题,要求要有一个无参的构造器

自定义序列化类的实现步骤

1)implements Writable
2)留一个无参构造
3)write和readFields
4)写出去的字段顺序和读进来的字段顺序必须一致
5)可选的:toString

int maps = writeSplits(job, submitJobDir);

InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
已经明确知道现在使用的InputFormat
在MapReduce里面如果你想读数据一定要用到InputFormat
只不过是InputFormat的某个子类

InputSplit 被一个Mapper处理 等同于Block
200M ==> 128M + 72M

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
1
long maxSize = getMaxSplitSize(job);
Long.MAX_VALUE

isSplitable:你的输入文件是否能被切分

200M ==> 128 72 ==> 2MapTasks
200M ==> 1MapTasks

long blockSize = file.getBlockSize(); // 128M
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
return Math.max(minSize, Math.min(maxSize, blockSize));
max(1, min(Long.MAX_VALUE, 128M))
==> max(1, 128M)
==> 128M

long bytesRemaining = length; 150M
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
150M/128M
SPLIT_SLOP = 1.1 10%

129M block:? InputSplit=1

128M

…|…

FileInputFormat.setInputPaths(job, new Path(input));
input:可以是文件,也可以是文件夹
遍历文件夹==>Path==>BlockLocation <== 你输入的数据在哪个节点上
拿到文件,对应的大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

Math.max(minSize, Math.min(maxSize, blockSize));max(1, min(Long.MAX_VALUE, 128M))maxSize > blockSize
但是 别动100M 128BLOCK 100 28

TextInputFormat extends FileInputFormat {
isSplitable
RecordReader<LongWritable, Text> createRecordReader
}

TextInputFormat是FileInputFormat的实现类
按照行进行数据的读取
K:LongWritable 该行数据在整个文件中的offset
V:Text 该行数据的内容

MySQLReadDriver
一定是可以在本地运行的
是不能在服务器运行的!!!

本地:mysql驱动==> mysql驱动传到服务器上去

MySQLReadDriverV2这个版本 生产上推荐的
extends Configured implements Tool
把mysql jar加载到hadoop能访问的到的路径 *****

流量统计

package com.ccj.pxj.phone;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Access implements Writable {private  String phone;private  long up;private  long down;private  long sum;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public long getUp() {return up;}public void setUp(long up) {this.up = up;}public long getDown() {return down;}public void setDown(long down) {this.down = down;}public Access(String phone, long up, long down) {this.phone = phone;this.up = up;this.down = down;this.sum=up+down;}public Access() {}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeLong(up);out.writeLong(down);out.writeLong(sum);}@Overridepublic void readFields(DataInput in) throws IOException {this.phone= in.readUTF();this.up= in.readLong();this.down=in.readLong();this.sum=in.readLong();}@Overridepublic String toString() {returnphone + '\t' +up +"\t" + down +"\t" + sum ;}
}
package com.ccj.pxj.phone;import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class SerDriver {public static void main(String[] args) throws Exception {String input = "data";String output = "out";// 1)获取Job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(SerDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMaper .class);job.setReducerClass(MyReduce.class);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Access.class);// 5)设置Reducer阶段输出数据的类型job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Access.class);// 6)设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}public static class  MyMaper extends Mapper<LongWritable, Text,Text,Access>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t");String phone = data[1];// 上行流量long up = Long.parseLong(data[data.length - 3]);// 下行流量long down = Long.parseLong(data[data.length - 2]);context.write(new Text(phone),new Access(phone,up,down));}}public  static  class MyReduce extends Reducer<Text,Access, NullWritable,Access>{@Overrideprotected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {long ups=0;long downs=0;for (Access value : values) {ups+=value.getUp();downs+=value.getDown();}context.write(NullWritable.get(),new Access(key.toString(),ups,downs));}}
}

MySQL

package com.ccj.wfy.mysql.mr;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class DeptWritable implements DBWritable, Writable {private int deptno;private String dname;private String loc;public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}public String getDname() {return dname;}public void setDname(String dname) {this.dname = dname;}public String getLoc() {return loc;}public DeptWritable() {}public DeptWritable(int deptno, String dname, String loc) {this.deptno = deptno;this.dname = dname;this.loc = loc;}public void setLoc(String loc) {this.loc = loc;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(deptno);out.writeUTF(dname);out.writeUTF(loc);}@Overridepublic void readFields(DataInput in) throws IOException {this.deptno= in.readInt();this.dname=in.readUTF();this.loc=in.readUTF();}@Overridepublic void write(PreparedStatement statement) throws SQLException {statement.setInt(1,deptno);statement.setString(2,dname);statement.setString(3,loc);}@Overridepublic void readFields(ResultSet result) throws SQLException {deptno=result.getInt(1);dname=result.getString(2);loc=result.getString(3);}@Overridepublic String toString() {returndeptno +"\t" + dname +loc;}
}package com.ccj.wfy.mysql.mr;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MySQLReadDriver  {public static void main(String[] args) throws Exception {String output = "out";// 1)获取Job对象Configuration configuration = new Configuration();
//        configuration.set(DBConfiguration.DRIVER_CLASS_PROPERTY, "com.mysql.jdbc.Driver");DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/mrtest", "root", "");Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(MySQLReadDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMapper.class);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(DeptWritable.class);// 6)设置输入和输出路径String[] fields = {"deptno", "dname", "loc"};DBInputFormat.setInput(job, DeptWritable.class, "dept", null, null, fields);FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {@Overrideprotected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {context.write(NullWritable.get(), value);}}
}
package com.ccj.wfy.mysql.mr;import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
public class MySQLReadDriverV2 extends Configured implements Tool {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();int run = ToolRunner.run(configuration, new MySQLReadDriverV2(), args);System.exit(run);}@Overridepublic int run(String[] strings) throws Exception {String output = "out1";// 1)获取Job对象Configuration configuration = super.getConf();
//        configuration.set(DBConfiguration.DRIVER_CLASS_PROPERTY, "com.mysql.jdbc.Driver");DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/mrtest", "root", "");Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(MySQLReadDriverV2.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMapper.class);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(DeptWritable.class);// 6)设置输入和输出路径String[] fields = {"deptno", "dname", "loc"};DBInputFormat.setInput(job, DeptWritable.class, "dept", null, null, fields);FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);return 1;}public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {@Overrideprotected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {context.write(NullWritable.get(), value);}}
}

pom配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cc.pxj.wfy</groupId><artifactId>phoneWcRuoZe</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><hadoop.version>2.6.0-cdh5.16.2</hadoop.version></properties><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><!-- 添加Hadoop依赖 --><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency></dependencies><build><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin><!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --><plugin><artifactId>maven-site-plugin</artifactId><version>3.7.1</version></plugin><plugin><artifactId>maven-project-info-reports-plugin</artifactId><version>3.0.0</version></plugin></plugins></pluginManagement></build>
</project>

补充MySQL的上传服务器的操作:

[pxj@pxj /home/pxj/lib]$export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/pxj/app/hive-1.1.0-cdh5.16.2/lib/mysql-connector-java-5.1.27-bin.jar
[pxj@pxj /home/pxj/app/hive-1.1.0-cdh5.16.2/lib]$hadoop  jar /home/pxj/lib/phoneWcRuoZe-1.0-SNAPSHOT.jar  com.ccj.wfy.mysql.mr.MySQLReadDriverV2 -libjars ~/lib/mysql-connector-java-5.1.27-bin.jar

作者:pxj(潘陈)
日期:2020-01-07 1:41:32

mr编程实现手机流量统计和读取MySQL数据相关推荐

  1. r mysql utf8_R读取MySQL数据出现乱码,解决该问题的方法总结

    R读取MySQL数据出现乱码,解决该问题的方法总结 我用的都是utf-8编码,电脑系统win7, MySQL-Front进行数据库的可视化. 1.我用的是RStudio,先去设置R的默认编码: Too ...

  2. R读取MySQL数据出现乱码,解决该问题的方法总结

    R读取MySQL数据出现乱码,解决该问题的方法总结 参考文章: (1)R读取MySQL数据出现乱码,解决该问题的方法总结 (2)https://www.cnblogs.com/yiyezhouming ...

  3. 华南农业大学Linux课程综合实验-超详细版(实现用Go、nodejs、python、php读取mysql数据)

    文章目录 一.准备工作 1. 领取阿里云服务器 2. 服务器初始设置 2.1 设置实例密码 2.1.1 找不到控制台页面 2.2 远程登录云服务器 2.3 修改云服务器密码 2.4 实现自动远程连接 ...

  4. Java代码读取MySQL数据,遇到‘0000-00-00’报错Value ‘0000-00-00‘ can not be represented as java.sql.Date

    报错 再使用Java代码读取MySQL数据的时候,读取date格式的数据,然后使用DateTimeFormatter格式化的时候突然在控制台发现了报错,Value '0000-00-00' can n ...

  5. python获取mysql中的数据供js调用_python 读取mysql数据至csv文件中,并发送邮件

    test 代码: #coding:utf-8 ''' Created on 2019年2月18日 @author: Administrator ''' import ConfigParser impo ...

  6. python读取mysql数据_如何将mysql的数据读取python

    展开全部 本文实例为大家2113分享了Python读取MySQL数据库表数据的具体代码,5261供大家参考,具体内容4102如下 环境:Python 3.6 ,Window 64bit 目的:1653 ...

  7. shell取mysql数据_通过shell读取mysql数据——20120417

    [CentOS6.5下通过Shell创建.备份.还原MySQL数据库创建数据库:mysql -uroot -p123456 -e CREATE DATABASE IF NOT EXISTS yourD ...

  8. flink定时读取mysql数据_flink时间系统系列之实例讲解:如何做定时输出

    flink时间系统系列篇幅目录: 六.实例讲解:如何做定时输出 今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理 ...

  9. Pandas 读取MySql 数据(完整版)

    1. 前言 那 MySQL 作为数据记录和处理的常用工具之一,我们如何用 Pandas 进行 MySQL 数据的解析呢?本节课我们首先讲述 PyMySQL 库进行 MySQL 数据库的连接,然后讲述 ...

最新文章

  1. 四、垃圾收集之垃圾收集算法
  2. Emacs for Go
  3. 第 4 章 Hypertable
  4. EditText获得焦点后,如何关闭软键盘
  5. Android控件ActionBar浅析及适配2.x的方法介绍
  6. 一份不大的救命文档,一场时间与生死的接力
  7. 聊聊C语言和指针的本质
  8. C#实现实体类和XML相互转换
  9. java编程二十_Java语言程序设计(二十)编程练习
  10. Netty工作笔记0067---Netty编解码机制简述
  11. mockito模拟依赖注入_Mockito @InjectMocks –模拟依赖注入
  12. 网络中计算机传输信息时所遵从的不同规则,2012年上海市高中学业水平考试信息科技试卷(第6套)...
  13. 一文带你彻底理解Linux的各种终端类型及概念
  14. lumen mysql 事务_数据库事务不执行回滚?
  15. java捕鱼达人代码java捕鱼游戏代码
  16. 在java中调用python程序
  17. 串口 单片机 文件_单片机引脚介绍
  18. Cousera-AndrewNg(吴恩达)机器学习笔记--第二周编程作业(线性回归)
  19. Wifi流程机制分析:WiFi的启动
  20. 运维高手的36项修炼_管理员工36项修炼

热门文章

  1. 在外企上班是一种什么体验?附国内热门外企公司名单!
  2. 一款最好用的windows文件管理器
  3. A股上市公司财报披露时间
  4. Office问题:PowerPoint发现.pptx 中的内容有问题。PowerPoint可尝试修复此演示文稿。
  5. 移动硬盘 linux找不到,求助!linux对usb设备的接入应该是自动的吧,我的移动硬盘找不到...
  6. Oracle Temp临时表空间及其故障处理
  7. 使用docker和jenkins简单部署springboot项目
  8. JS 判断浏览器客户端类型(ipad,iphone,android)
  9. 32位64位Eclipse和jdk对应关系说明【初学者适用】
  10. 计算机怎么设置网络,电脑怎么设置网络