hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库
自定义类
package DBOutFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class MysqlDBOutPutFormat implements DBWritable, Writable {
private String address ;
private String type ;
private String name ;
private String divce ;
public MysqlDBOutPutFormat(){}
public MysqlDBOutPutFormat(String address,String type,String name,String divce){
this.address = address ;
this.type = type ;
this.name = name ;
this.divce = divce ;
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1,address);
statement.setString(2,type);
statement.setString(3,name);
statement.setString(4,divce);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.address = resultSet.getString(1);
this.type = resultSet.getString(2);
this.name = resultSet.getString(3);
this.divce = resultSet.getString(4);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(address);
out.writeUTF(type);
out.writeUTF(name);
out.writeUTF(divce);
}
@Override
public void readFields(DataInput in) throws IOException {
this.address = in.readUTF() ;
this.type = in.readUTF() ;
this.name = in.readUTF() ;
this.divce = in.readUTF() ;
}
}
mapreducer 示例代码
package DBOutFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class reduce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String input = "data1/mysql.txt" ;
final Configuration co = new Configuration() ;
DBConfiguration.configureDB(co,
"com.mysql.jdbc.Driver",
"jdbc:mysql://11.11.11.2:3306/su?characterEncoding=UTF-8",
"root",
"root"
); //获取 Job 对象
final Job job = Job.getInstance(co);
//设置class
job.setJarByClass(reduce.class);
//设置mapper 和 Reduce
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//设置 Mapper 阶段输出数据的key 和value
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer 阶段输出数据的key 和value
job.setOutputKeyClass(MysqlDBOutPutFormat.class);
job.setOutputValueClass(NullWritable.class);
//设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(input));
//job输出发生变化 ,不能使用默认的 Fileoutputformat
job.setOutputFormatClass(DBOutputFormat.class);
String[] fields = {"address","type","name","divce"};
DBOutputFormat.setOutput(job,"zyplc",fields);
//提交 job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
public static class MyMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
public static class MyReducer extends Reducer{
@Override
protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
String[] info = value.toString().split(",") ;
if(info.length==4){
context.write(new MysqlDBOutPutFormat(info[0].trim(),info[1].trim(),info[2].trim(),info[3].trim()),NullWritable.get());
}
}
}
}
}
hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库相关推荐
- excel 导入mysql_如何将Excel文件导入MySQL数据库
本文实例为大家分享了Excel文件导入MySQL数据库的方法,供大家参考,具体内容如下 1.简介 本博客给大家分享一个实用的小技能,我们在使用数据库时常常需要将所需的Excel数据添加进去,如果按照传 ...
- excel数据怎么导入mysql_如何将Excel文件导入MySQL数据库
如何将Excel文件导入MySQL数据库 本文实例为大家分享了Excel文件导入MySQL数据库的方法,供大家参考,具体内容如下 1.简介 本博客给大家分享一个实用的小技能,我们在使用数据库时常常需要 ...
- gff3转mysql_科学网-把GFF3文件导入MySQL数据库-闫双勇的博文
什么是GFF3?这个一种序列注释文件的格式,基因组注释数据常常会用这种格式来记录序列注释信息,关于这种格式的更多信息,可以在这里学习:http://www.sequenceontology.org/g ...
- gff3转mysql_科学网—把GFF3文件导入MySQL数据库 - 闫双勇的博文
什么是GFF3?这个一种序列注释文件的格式,基因组注释数据常常会用这种格式来记录序列注释信息,关于这种格式的更多信息,可以在这里学习:http://www.sequenceontology.org/g ...
- python亿级mysql数据库导出_Python之csv文件从MySQL数据库导入导出的方法
Python之csv文件从MySQL数据库导入导出的方法 发布时间:2020-10-26 07:39:02 来源:脚本之家 阅读:53 作者:张行之 Python从MySQL数据库中导出csv文件处理 ...
- 使用命令导入sql文件到mysql数据库时报Failed to open file错误的解决方案
使用命令导入sql文件到mysql数据库时报Failed to open file错误的解决方案 参考文章: (1)使用命令导入sql文件到mysql数据库时报Failed to open file错 ...
- excel数据库_将excel文件导入mysql数据库教程(PHP实现)
点击蓝字关注我们!每天获取最新的编程小知识! 源 / php中文网 源 / www.php.cn 在这篇文章中,我将给大家介绍如何使用PHP将excel文件导入mysql数据库.有时候我们需 ...
- csv导入mysql php实现_PHP实现csv文件导入mysql数据库的方法
这篇文章主要介绍了PHP编程实现csv文件导入mysql数据库的方法,涉及php文件读取.转换.数据库的连接.插入等相关操作技巧,需要的朋友可以参考下 具体如下: config.db.php内容如下: ...
- 收藏!用Python一键批量将任意结构的CSV文件导入MySQL数据库。
Python有很多库可以对CSV文件和Excel文件进行自动化和规模化处理.但是,使用数据库可以将计算机完成任务的能力提升成千上万倍! 那么问题来了,如果有很多个文件需要导入数据库,一个一个操作效率太 ...
最新文章
- 设置socket.Receive()的等待时延
- C#写的windows应用程序打包
- GitHub 开源神器:堪称作业终结者!
- 3G助推智慧医疗 看病将更加“智能化”
- Struts+Spring+Hibernate练习(完整)
- Flink 里程碑版本即将发布,快点入手
- python中的多线程求值串行和并行_python多线程和多进程——python并行编程实验
- 【简易教程】人体时钟 ホネホネ・クロック [時計・ブログパーツ]
- 徐家骏:华为十年感悟
- 图像预处理流程与方法
- 英威腾GD200A系列变频器实现多段速控制的相关参数设置及接线
- 线性代数知识点总结——矩阵乘法、矩阵运算与性质、矩阵微积分
- 数据表格之多表头设置
- Nesssus Pro 8.13.1下载安装
- 基因编辑最新研究进展(2022年3月)
- 【Git】Git pull 拉代码卡在Unpacking objects
- 欧式空间与酉空间——概念区分
- 网通相中中国联通GSM网络 联通暂无意租售
- AngularJS 概述
- Far planner 部署真实小车 树莓派部署lego_loam
热门文章
- java 钉钉获取用户信息,JAVA maven项目如何使用钉钉SDK来获取token、用户
- 方向梯度直方图(Histogram Of Gradient)详解
- Cpp 对象模型探索 / 类静态成员函数的调用方式
- 32位十六进制浮点数转换为十进制浮点数的方法
- 启明云端WT32-CAM操作视频,让你快速上手ESP32camera应用
- wifi一阵一阵卡_家里wifi总是过一会就卡一下然后又好了
- Linux设备驱动归纳总结(一):内核的相关基础概念
- RF无线电射频接口静电保护方案图
- 百度 Serverless 架构揭秘与应用实践
- 如何修改Series和DataFrame类型中的元素值_Redis的HSCAN命令中COUNT参数的失效场景