读取本地美国疫情数据存储结果到MySQL

  • 数据分析
  • 代码实现
    • 自定义数据类型
    • 定义类存储输出内容
    • Mapper阶段
    • 自定义分组
    • Reduce阶段
    • Driver阶段
  • 结果

数据分析



字段名分别是:日期,城市,州,邮编,案例,死亡数


  • 需求
    将日期的格式改为年份-月份-日
    只要有任何一个字段是空的,那么就删除这条数据
    计算每月每个州的总案例数和死亡数

代码实现

自定义数据类型


此类的主要作用就是将输入数据实例化,便于处理

import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class JavaBean implements WritableComparable<JavaBean> {private String date;private String city;private String stats;private String fips;private int cases;private int deaths;public int compareTo(JavaBean o) {return 0;}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(date);dataOutput.writeUTF(city);dataOutput.writeUTF(stats);dataOutput.writeUTF(fips);dataOutput.writeInt(cases);dataOutput.writeInt(deaths);}public void readFields(DataInput dataInput) throws IOException {date = dataInput.readUTF();city = dataInput.readUTF();stats = dataInput.readUTF();fips = dataInput.readUTF();cases = dataInput.readInt();deaths = dataInput.readInt();}public void set(String date, String city, String stats, String fips, int cases, int deaths) {this.date = date;this.city = city;this.stats = stats;this.fips = fips;this.cases = cases;this.deaths = deaths;}@Overridepublic String toString() {return "JavaBean{" +"date='" + date + '\'' +", city='" + city + '\'' +", stats='" + stats + '\'' +", fips='" + fips + '\'' +", cases=" + cases +", deaths=" + deaths +'}';}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public String getStats() {return stats;}public void setStats(String stats) {this.stats = stats;}public String getFips() {return fips;}public void setFips(String fips) {this.fips = fips;}public int getCases() {return cases;}public void setCases(int cases) {this.cases = cases;}public int getDeaths() {return deaths;}public void setDeaths(int deaths) {this.deaths = deaths;}
}

定义类存储输出内容

存储输出到MySQL的数据,让数据和列名对应


import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class MysqlBean implements DBWritable, Writable {private String date;private String stats;private String fips;private int cases;private int deaths;public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(date);dataOutput.writeUTF(stats);dataOutput.writeUTF(fips);dataOutput.writeInt(cases);dataOutput.writeInt(deaths);}public void readFields(DataInput dataInput) throws IOException {date = dataInput.readUTF();stats = dataInput.readUTF();fips = dataInput.readUTF();cases = dataInput.readInt();deaths = dataInput.readInt();}public void write(PreparedStatement statement) throws SQLException {statement.setString(1, date);statement.setString(2, stats);statement.setString(3, fips);statement.setInt(4, cases);statement.setInt(5, deaths);}public void readFields(ResultSet resultSet) throws SQLException {date = resultSet.getString(1);stats = resultSet.getString(2);fips = resultSet.getString(3);cases = resultSet.getInt(4);deaths = resultSet.getInt(5);}public void set(String date, String stats, String fips, int cases, int deaths) {this.date = date;this.stats = stats;this.fips = fips;this.cases = cases;this.deaths = deaths;}@Overridepublic String toString() {return "MysqlBean{" +"date='" + date + '\'' +", stats='" + stats + '\'' +", fips='" + fips + '\'' +", cases=" + cases +", deaths=" + deaths +'}';}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public String getStats() {return stats;}public void setStats(String stats) {this.stats = stats;}public String getFips() {return fips;}public void setFips(String fips) {this.fips = fips;}public int getCases() {return cases;}public void setCases(int cases) {this.cases = cases;}public int getDeaths() {return deaths;}public void setDeaths(int deaths) {this.deaths = deaths;}
}

Mapper阶段


split(",", -1)确保即使最后一个为空数据,也可以切割成六列
使用增强for循环去重

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;public class MapTest extends Mapper<LongWritable, Text, JavaBean, NullWritable> {JavaBean k = new JavaBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {try {String[] datas = value.toString().split(",", -1);for (String data : datas) {if (data == null || "".equals(data)) return;}SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy/m/dd");SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-mm-dd");String date = sdf2.format(sdf1.parse(datas[0]));k.set(date, datas[1], datas[2], datas[3], Integer.parseInt(datas[4]), Integer.parseInt(datas[5]));context.write(k, NullWritable.get());} catch (ParseException e) {e.printStackTrace();}}
}

自定义分组


按照月份和州进行分组

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class Group extends WritableComparator {@Overridepublic int compare(WritableComparable a, WritableComparable b) {JavaBean bean1 = (JavaBean) a;JavaBean bean2 = (JavaBean) b;return (bean1.getDate().split("-")[1] + bean1.getStats()).compareTo(bean2.getDate().split("-")[1] + bean2.getStats());}protected Group() {super(JavaBean.class, true);}
}

Reduce阶段


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class RedTest extends Reducer<JavaBean, NullWritable, MysqlBean, NullWritable> {MysqlBean k = new MysqlBean();int sum_cases;int sum_deaths;@Overrideprotected void reduce(JavaBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {for (NullWritable v : values) {sum_cases += key.getCases();sum_deaths += key.getDeaths();}String date = key.getDate().substring(0, key.getDate().length() - 3);k.set(date, key.getStats(), key.getFips(), sum_cases, sum_deaths);context.write(k,NullWritable.get());sum_deaths = 0;sum_cases = 0;}
}

Driver阶段


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.BasicConfigurator;public class DriTest {public static void main(String[] args) throws Exception {BasicConfigurator.configure();Configuration conf = new Configuration();DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/data?useUnicode=true&characterEncoding=UTF-8", "root", "123456");Job job = Job.getInstance(conf);job.setJarByClass(DriTest.class);job.setMapperClass(MapTest.class);job.setMapOutputKeyClass(JavaBean.class);job.setMapOutputValueClass(NullWritable.class);job.setReducerClass(RedTest.class);job.setOutputKeyClass(MysqlBean.class);job.setOutputValueClass(NullWritable.class);job.setGroupingComparatorClass(Group.class);String f1[] = {"date", "state", "fips", "cases", "deaths"};FileInputFormat.setInputPaths(job, "D:\\MP\\美国疫情\\input");DBOutputFormat.setOutput(job, "cases", f1);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

结果

【MapReuce】读取本地美国疫情数据存储结果到MySQL相关推荐

  1. java接收并存储文件_客户端读取本地文件的数据,发送到服务器,服务器接收并存储到文件中...

    只实现了传送文件的功能. 编写Socket客户端和服务器程序,客户端读取本地文件的数据,发送到服务器,服务器接收并存储到文件中. 消息格式: 字段 长度(单位字节) 内容 -------------- ...

  2. JS读取本地CSV文件数据

    JS读取本地CSV文件数据 文件中的部分数据如图 需求是需要提取出文件的数据 使用到的模块是 Papa Parse 1. 依赖安装 yarn add papaparse papaparse的基本使用可 ...

  3. java sql变更存储,MySQL更改数据库数据存储目录,mysql数据存储

    MySQL更改数据库数据存储目录,mysql数据存储 MySQL数据库默认的数据库文件位于 /var/lib/mysql 下,有时候由于存储规划等原因,需要更改 MySQL 数据库的数据存储目录.下文 ...

  4. web端读取本地excel表数据

    听说发文有积分,我就来试试了,请大家不要喷我,我只是个孩子. 使用的js插件是js-xlsx,下载地址:https://github.com/SheetJS/js-xlsx,大家可以自行下载. var ...

  5. MySQL 数据存储和优化------MySQL架构原理 ---- (架构---索引---事务---锁---集群---性能---分库分表---实战---运维)持续更新

    Mysql架构体系全系列文章主目录(进不去说明还没写完)https://blog.csdn.net/grd_java/article/details/123033016 本文只是整个系列笔记的第一章: ...

  6. Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)

    两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...

  7. php数据存储mysql_php – 在MySQL中存储路线数据的最佳方式

    我正在开发一个应用程序,它要求我存储一些位置的方向,下面是我试图存储的数据的示例: 方向1 从西部:乘528 East(Beechline),经过机场出口,然后从13号出口驶入Narcoossee R ...

  8. 爬虫数据存储—数据库和MySQL

    爬虫数据存储-数据库 一.什么是数据库? 数据库是一个以某种有组织的方式存储的数据集合.简单来说,我们可以将数据库想象为一个文件柜,文件柜里面有很多文件,这些文件我们称之为表. 举一个实际例子:比如说 ...

  9. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  10. vue 项目如何读取本地json文件数据

    在项目根目录有一个static文件夹目录,将所需要的json文件放在该static目录下,使用axios发起get请求获取对应的json文件数据 import axios from 'axios' / ...

最新文章

  1. 【原创】jquery常见的条件判断类型
  2. Linux初学(CnetOS7 Linux)之切换命令模式和图形模式的方法
  3. 如何由jdk的安装版本改成非安装版本
  4. 蓝桥杯-字串统计(java)
  5. 输入法画面_搜狗输入法去广告版,流畅再无弹窗打扰
  6. 8问8答,一篇文章读懂空间音效
  7. Java中字符串连接符(+)和append的区别
  8. linux 内核同步--理解原子操作、自旋锁、信号量(可睡眠)、读写锁、RCU锁、PER_CPU变量、内存屏障
  9. python中for循环怎么打开_详解Python中for循环的使用
  10. 如何使用postman带Token测试接口?
  11. 阿里云短信验证码平台使用demo
  12. 而立之年,学习编程,
  13. 访问chm文件出现 已取消到该网页的导航的解决方法
  14. 腾讯云服务器入门使用流程 新手必看教程
  15. 一个程序员的奋斗经历
  16. storm DRPC例子
  17. Nginx 配置旧域名重定向到新域名
  18. 根据excel模板导出excel
  19. Shell编程之if简单判断两个数字大小
  20. windows下的BT服务器搭建方案

热门文章

  1. 星星之火-57:前传接口 CPRI的速率、能力、小区带宽之间的映射关系
  2. 火星坐标-84坐标-百度地图坐标相互转换
  3. npm发布vue组件库
  4. 微信小程序map组件 markers 展示当前位置修改标记点图标
  5. PHP正则淘口令,Flutter代码锦囊---淘口令复制弹窗
  6. OSI七层网络协议及TCP/UDP、C/S架构详解
  7. 【gp数据库】你可能不知道却超级实用的函数
  8. 三菱FX5U和变频器走485通讯连接,程序是FB块写好的,硬件一样可以直接调用
  9. rtk采点后如何导入cad_SMT贴片机是如何编程的
  10. Java线程的状态及主要转化方法