前面讲过了怎么通过mapreduce把mysql的一张表的数据放到另外一张表中,这次讲的是把mysql的数据读取到hdfs里面去

具体怎么搭建环境我这里就不多说了。参考

通过mapreduce把mysql的一张表的数据导到另外一张表中

也在eclipse里面创建一个mapreduce工程

具体的实现代码

package com.gong.mrmysql;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;/**
* Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中
*                          实际当中,可能只需要从 mysql 读,或者写到 mysql 中。
* date: 2013-7-29 上午2:34:04 <br/>
* @author june
*/
public class Mysql2Mr {// DROP TABLE IF EXISTS `hadoop`.`studentinfo`;// CREATE TABLE studentinfo (// id INTEGER NOT NULL PRIMARY KEY,// name VARCHAR(32) NOT NULL);public static class StudentinfoRecord implements Writable, DBWritable {int id;String name;//构造方法public StudentinfoRecord() { }//Writable接口是对数据流进行操作的,所以输入是DataInput类对象public void readFields(DataInput in) throws IOException {this.id = in.readInt(); //输入流中的读取下一个整数,并返回this.name = Text.readString(in);}public String toString() {return new String(this.id + " " + this.name);}//DBWritable负责对数据库进行操作,所以输出格式是PreparedStatement //PreparedStatement接口继承并扩展了Statement接口,用来执行动态的SQL语句,即包含参数的SQL语句
                @Overridepublic void write(PreparedStatement stmt) throws SQLException {stmt.setInt(1, this.id);stmt.setString(2, this.name);}//DBWritable负责对数据库进行操作,输入格式是ResultSet// ResultSet接口类似于一张数据表,用来暂时存放从数据库查询操作所获得的结果集
                @Overridepublic void readFields(ResultSet result) throws SQLException {this.id = result.getInt(1);this.name = result.getString(2);}//Writable接口是对数据流进行操作的,所以输出是DataOutput类对象
                @Overridepublic void write(DataOutput out) throws IOException {out.writeInt(this.id);Text.writeString(out, this.name);}}// 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:// Caused by: java.lang.NoSuchMethodException: DBInputMapper.<init>()// http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor// 网上脑残式的转帖,没见到一个写对的。。。public static class DBInputMapper extends MapReduceBase implementsMapper<LongWritable, StudentinfoRecord, LongWritable, Text> {public void map(LongWritable key, StudentinfoRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException {collector.collect(new LongWritable(value.id), new Text(value.toString()));}}public static class MyReducer extends MapReduceBase implementsReducer<LongWritable, Text, StudentinfoRecord, Text> {@Overridepublic void reduce(LongWritable key, Iterator<Text> values,OutputCollector<StudentinfoRecord, Text> output, Reporter reporter) throws IOException {String[] splits = values.next().toString().split(" ");StudentinfoRecord r = new StudentinfoRecord();r.id = Integer.parseInt(splits[0]);r.name = splits[1];output.collect(r, new Text(r.name));}}public static void main(String[] args) throws IOException {JobConf conf = new JobConf(Mysql2Mr.class);DistributedCache.addFileToClassPath(new Path("hdfs://192.168.241.13:9000/mysqlconnector/mysql-connector-java-5.1.38-bin.jar"), conf);conf.setMapOutputKeyClass(LongWritable.class);conf.setMapOutputValueClass(Text.class);conf.setOutputKeyClass(LongWritable.class);conf.setOutputValueClass(Text.class);//  conf.setOutputFormat(DBOutputFormat.class);conf.setInputFormat(DBInputFormat.class);// mysql to hdfsconf.set("fs.defaultFS", "hdfs://192.168.241.13:9000");//在配置文件conf中指定所用的文件系统---HDFSconf.setReducerClass(IdentityReducer.class);Path outPath = new Path("hdfs://192.168.241.13:9000/student/out1");FileSystem.get(conf).delete(outPath, true);FileOutputFormat.setOutputPath(conf, outPath);DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.241.13:3306/mrtest","root", "543116");String[] fields = { "id", "name" };// 从 t 表读数据DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);// mapreduce 将数据输出到 t2 表//DBOutputFormat.setOutput(conf, "t2", "id", "name");// FileOutputFormat.setOutputPath(conf, new Path("hdfs://192.168.241.13:9000/student/out1"));
                conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);conf.setMapperClass(DBInputMapper.class);// conf.setReducerClass(MyReducer.class);
JobClient.runJob(conf);}
}

特别要主要的是在主函数里面添加这么一句话

如果不添加这句话的话就不能识别你的hdfs路径了,除了这个方法之外还,不想添加这句话的话还可以把集群的core-site.xml文件直接拷贝一份放到工程的src目录下

这样也是可以的

运行程序

可以看到hdfs的文件上面已经有mysql数据库表的内容了

转载于:https://www.cnblogs.com/braveym/p/9054997.html

通过mapreduce把mysql的数据读取到hdfs相关推荐

  1. python读取mysql数据_如何将mysql的数据读取python

    展开全部 本文实例为大家2113分享了Python读取MySQL数据库表数据的具体代码,5261供大家参考,具体内容4102如下 环境:Python 3.6 ,Window 64bit 目的:1653 ...

  2. cdh mysql sqoop 驱动_[bigdata-003]在cdh 5.7下 用sqoop1将mysql数据库数据导入到hdfs的方式...

    1. 假设,myql安装在bigdata2上.我们要在bigdata3上执行sqoop1. 2. 首先,要在mysql上创建一个'b3'@'%'的账户,这个账户限定只能从外部ip地址访问mysql. ...

  3. 【MapReduce】MapReduce读写MySQL数据

    MapReduce读写MySQL数据 数据 代码实现 自定义类来接收源数据 自定义类型来存储结果数据 Mapper阶段 Reducer阶段 Driver阶段 上传运行 打包 上传集群运行 使用MapR ...

  4. php mysql 读取中文数据的函数_php读取mysql中文数据出现乱码的解决方法

    以下是对php读取mysql中文数据出现乱码问题的解决方法进行了介绍,需要的朋友可以过来参考下 1.PHP页面语言本身的编码类型不合适,这时候,你直接在脚本中写的中文肯定是乱码,不用说数据库了 解决方 ...

  5. 随机从mysql中读取_如何实现MySQL表数据随机读取?从mysql表中读取随机数据

    文章转自 http://blog.efbase.org/2006/10/16/244/ 如何实现MySQL表数据随机读取?从mysql表中读取随机数据?以前在群里讨论过这个问题,比较的有意思.mysq ...

  6. python链接mysql 判断是否成功_python连接mysql数据库并读取数据的实现

    1.安装pymysql包 pip install pymysql 注: MySQLdb只支持python2,pymysql支持python3 2.连接数据 import pymysql import ...

  7. mapreduce mysql_MapReduce直接连接MySQL获取数据

    MySQL中数据: mysql> select * from linuxidc_tbls; +---------------------+----------------+ | TBL_NAME ...

  8. Spark连接MySQL数据库并读取数据

    (作者:陈玓玏) 打开pyspark,带驱动的那种 用命令行启动pyspark时需要加上jdbc的驱动路径: pyspark --driver-class-path D:/Users/chendile ...

  9. 【python量化交易学习】从tushare获取股票交易数据,存入后再从mysql或excel读取数据,筛选股票,用pyecharts画出K线图。

    选定日期,筛选涨幅达到10%的股票,并画出K线图.观察涨停后股票走势. 由于创业板涨停板为20%,科创板20%,北交所30%.因此筛选出的涨停股票不完全准确.考虑到目前市场打板主要集中在10%的主板股 ...

  10. oracle从mysql抓数据_关于oracle数据库读取数据的三种方式

    打开oracle sqldeveloper,连接到HR模式下的数据库,在SQL工作表中,执行如下语句: CREATE TABLE WANG( Name  varchar2(6), ID     num ...

最新文章

  1. github心得体会
  2. Android学习笔记(一) - 如果我们来设计Android
  3. 模型难复现不一定是作者的错,最新研究发现模型架构要背锅丨CVPR 2022
  4. Python学习(四)列表与列表操作
  5. 线性表应用之线性表算法设计六大经典案例
  6. [emacs] python代码折叠
  7. docker与mmdetection
  8. 阿里云贾少天:大规模云服务器高效使用及管理实践
  9. Python 内置函数介绍
  10. mysql版本升级对数据的影响_MySQL升级
  11. 用于深度学习的演化神经AutoML
  12. 不同网络情况的安防摄像头如何通过手机进行直播?
  13. 手把手教您搭建一个跨境电商平台
  14. Android.bp入门教程
  15. 我的世界跨服聊天MySQL_我的世界BungeeCord搭建教程什么是跨服群组搭建
  16. Jzoj5450【NOIP2017提高A组冲刺11.4】Neutral
  17. power supply frameware 框架
  18. NSPredicate 模糊、精确、查询
  19. dell微型计算机换内存条,视频:小巧灵活 戴尔OptiPlex 7060微型机解析
  20. 动态输入数据并生成表格,带删除操作javascript

热门文章

  1. C#程序关闭时怎么关闭子线程
  2. centos虚拟机克隆
  3. SVN missing 解决
  4. 以下内容为Stackoverflow上整理以作纪录
  5. [转]解决Sublime Text 2中文显示乱码问题
  6. 【机器学习案例】酒店用机器学习,预测哪些客人会放鸽子
  7. 本周论文推荐(12.14-12.20)
  8. 300小时人工智能学习视频课程,从数理基础到爬虫实战!
  9. 每日算法系列【LeetCode 319】灯泡开关
  10. 深度学习2.0-33.BatchNorm