需求描述

我们有两张表“成绩表”和“总分表”,从成绩表中计算出每个学生的总成绩,记录到总分表中。

表结构

//成绩表记录学生id,课程id,这科分数
CREATE TABLE `score` (`id` int(11) NOT NULL AUTO_INCREMENT,`sid` int(11) DEFAULT NULL,`cid` int(11) DEFAULT NULL,`score` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;
//总分表记录学生id和学生总成绩
CREATE TABLE `topscore` (`id` int(11) NOT NULL AUTO_INCREMENT,`sid` int(11) DEFAULT NULL,`totalscore` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; 

成绩如下:

自定义类实现序列化和DBWritable接口

实现序列化方法和数据库读写方法,这里我们读写是两张表。

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;public class MysqlDb_scoreWritable implements Writable,DBWritable {private int id;private int sid;private int cid;private int score;private int totalscore;/***数据序列化*/public void write(DataOutput out) throws IOException {out.writeInt(id);out.writeInt(sid);out.writeInt(cid);out.writeInt(score);out.writeInt(totalscore);}public void readFields(DataInput in) throws IOException {this.id = in.readInt();this.sid = in.readInt();this.cid = in.readInt();this.score = in.readInt();this.totalscore = in.readInt();}/*** 数据库读写* 向topscore中写入值*/public void write(PreparedStatement statement) throws SQLException {statement.setInt(1,sid);statement.setInt(2,totalscore);}//从score中读取成绩public void readFields(ResultSet resultSet) throws SQLException {id = resultSet.getInt(1);sid = resultSet.getInt(2);cid = resultSet.getInt(3);score = resultSet.getInt(4);}public int getId() {return id;}public void setId(int id) {this.id = id;}public int getSid() {return sid;}public void setSid(int sid) {this.sid = sid;}public int getCid() {return cid;}public void setCid(int cid) {this.cid = cid;}public int getScore() {return score;}public void setScore(int score) {this.score = score;}public int getTotalscore() {return totalscore;}public void setTotalscore(int totalscore) {this.totalscore = totalscore;}@Overridepublic String toString() {JSONObject json = new JSONObject();json.put("id",id);json.put("sid",sid);json.put("cid",cid);json.put("score",score);return json.toString();}
}

mapper类

map方法读取mysql表中一行行的数据,map的输入key,value分别是longWritable,上面我们自定义的mysql读写实体类。输入key是学生的id,value是自定义实体类。reduce的时候我们取出学生的各科成绩相加求总成绩。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MysqlDbMapper extends Mapper<LongWritable,MysqlDb_scoreWritable,IntWritable,MysqlDb_scoreWritable> {protected void map(LongWritable key, MysqlDb_scoreWritable value, Context context) throws IOException, InterruptedException {System.out.println(value.toString());int sid = value.getSid();context.write(new IntWritable(sid),value);}
}

reducer类

reduce的时候我们取出学生的各科成绩相加求总成绩。输出key是我们自定义实体类向mysql表中写数据,value为空。

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MysqlDbReducer extends Reducer<IntWritable,MysqlDb_scoreWritable,MysqlDb_scoreWritable,NullWritable> {protected void reduce(IntWritable key, Iterable<MysqlDb_scoreWritable> values, Context context) throws IOException, InterruptedException {int totalScore = 0;for(MysqlDb_scoreWritable m : values){totalScore += m.getScore();}MysqlDb_scoreWritable score = new MysqlDb_scoreWritable();score.setSid(key.get());score.setTotalscore(totalScore);context.write(score,NullWritable.get());}
}

运行app类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;public class DbApp {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(DbApp.class);job.setJobName("mysql read write");job.setMapperClass(MysqlDbMapper.class);job.setReducerClass(MysqlDbReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(MysqlDb_scoreWritable.class);job.setOutputKeyClass(MysqlDb_scoreWritable.class);job.setOutputValueClass(NullWritable.class);//配置数据库信息String driverclass = "com.mysql.jdbc.Driver";String url = "jdbc:mysql://192.168.1.215:3306/school";String username = "root";String passwd = "root";DBConfiguration.configureDB(job.getConfiguration(),driverclass,url,username,passwd);//设置输入内容DBInputFormat.setInput(job,MysqlDb_scoreWritable.class,"select * from score","select count(id) from score");//设置输出内容,第一个string是表名,后面可以跟多个string是表字段名DBOutputFormat.setOutput(job,"topscore","sid","totalscore");job.waitForCompletion(true);}
}

hadoop读写mysql数据库相关推荐

  1. pandas读写MySQL数据库详解及实战

    pandas读写MySQL数据库详解及实战 SQLAlchemy是Python中最有名的ORM工具. 关于ORM: 全称Object Relational Mapping(对象关系映射). 特点是操纵 ...

  2. java读写mysql数据库_Java读写MySQL数据库小实例

     Java读写MySQL数据库小实例 首先需要安装和配置好MySQL数据库.接下来,先创建一个数据库,Java代码连接此数据库,然后读写. 假设基于MySQL命令行创建一个叫做:phildatab ...

  3. 本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

    前一段时间,在上一篇博文中描述了自己抽时间在构建的完全分布式Hadoop环境过程中遇到的一些问题以及构建成功后,通过Eclipse操作HDFS的时候遇到的一些问题,最近又想进一步学习学习Hadoop操 ...

  4. Zeppelin上通过Spark读写mysql数据库

    Zeppelin上通过Spark读写mysql数据库 一.从mysql数据库获取数据 二.把处理后的数据再插入到mysql数据库 一.从mysql数据库获取数据 %spark val df = spa ...

  5. 使用php读写mysql数据库并显示到网页上

    由于工作的原因,需要了解下bs模式下的读写数据库的流程,将试验过程梳理一下. 我采用的是phpstudy搭建数据库,mysql数据库已经搭建完成,名称为2018版本,如下图: 由于我前期安装过apac ...

  6. datagridview控件读写mysql数据库表格的方法_C#读写Access数据库、表格datagridview窗体显示代码实例...

    C#读写Access数据库.表格datagridview窗体显示代码实例 最近项目中用到C#对于Access数据库表读写.mdb操作,学习了下相关的东西,这里先整理C#对于Access数据库的操作,对 ...

  7. C++读写Mysql数据库

    前言: 一直以来操作数据库我用的都是python,第一次使用C++来操作Mysql数据库,中间遇到了许多问题,特别是Mysql版本为64位,但想在x86或win32下开发Mysql的问题.把这些经验进 ...

  8. c语言多线程mysql_多线程读写mysql数据库

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 unsigned int __stdcall scan(PVOID pM) { char ip[20]; strcpy(ip, (char*)pM); M ...

  9. c 多线程mysql_多线程读写mysql数据库

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 unsigned int __stdcall scan(PVOID pM) { char ip[20]; strcpy(ip, (char*)pM); M ...

最新文章

  1. linux shell读取文件
  2. 如何着手学习一个新的PHP框架
  3. 请教个问题,ajax.net 的效率如何
  4. 网卡绑定技术linux c,Linux多网卡绑定
  5. 是啥意思_属猴人:十猴九苦是啥意思 十猴九不全什么意思 为什么
  6. Redhat GRUB配置错误修复
  7. WinXP启动时自动打开上次关机时未关闭的文件夹
  8. cameraraw预设_169个PS预设 ACR一键调出胶片效果 城市黑金效果 复古胶片色调
  9. devsecops automation
  10. html 中的一些知识
  11. Android版本caj阅读器,CAJViewer安卓版
  12. 世界上第一部智能手机27岁了
  13. 用js(javascript)完成点击一个按钮会使相应的div背景颜色发生改变
  14. STM3210B_EVAL U盘功能 USB + SPI +SD 增加对SDHC卡支持
  15. VBS让室友成为你儿子
  16. 电脑bios进入方法介绍
  17. C++面试之Linux操作系统
  18. biostar来电自动开机_bios设置来电自动开机
  19. 中标麒麟系统下(Neokylin7)达梦数据库的安装(DM8)
  20. 勇敢面对人生的苦难(转载自https://www.xuemeiwen.com/)

热门文章

  1. 图灵完备及TypeScript图灵完备性验证
  2. 支付系统设计中,如何防止重复支付?
  3. Flutter自定义圆形选择框
  4. 预约手机在线维修小程序在线预约下订单上门维修师傅在线接单结合拼团砍价
  5. veni vidi vici密钥我的生日ACIGS解密_25人在同一天生日!男女比例又是多少?阳光新生大数据来了!...
  6. 区块链(Blockchain)
  7. 【已解决】将CentOS7系统安装至U盘(九):使用AppImage方式安装图形处理工具Draw.io和Inkscape
  8. MMORPG大型游戏设计与开发(服务器 游戏场景 地图和区域)
  9. 平面设计新手如何快速修好人像图
  10. FusionCharts flash透明度设置