Hadoop对关系数据库无非两种操作,即从关系数据库输入到HDFS和从HDFS输出到关系数据库。Hadoop中分别提供了DBInputFormat类和DBOutputFormat类,前者用于从关系数据库输入到HDFS,该类将关系数据库中的一条记录作为向Mapper输入的value值,后者用于将HDFS中的文件输出到关系数据库,该类将Reducer输出的key值存储到数据库。我们只要在主程序中设置job的输入输出格式为这两个类中的一种,就可以让Hadoop从关系数据库输入或者向关系数据库输出。
正如我上面提到的,我们在操作的过程中使用了“记录”这个对象,因此需要写一个类对应到关系数据库中我们要操作的那个表,这个类要实现DBWritable接口和Writable接口,具体参见HadoopAPI。
具体代码参见文档。

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
public class SDBConnInput {
public static class CustomerRecord implements Writable,DBWritable{
String customerID;
String customerName;
String phoneNumber;
public void readFields(ResultSet resultSet)  throws SQLException{
customerID=resultSet.getString(1);
customerName=resultSet.getString(2);
phoneNumber=resultSet.getString(3);
}
public void write(PreparedStatement statement)  throws SQLException{
statement.setString(1, customerID);
statement.setString(2, customerName);
statement.setString(3,phoneNumber);
}
public void readFields(DataInput in) throws IOException{
customerID=in.readUTF();
customerName=in.readUTF();
phoneNumber=in.readUTF();
}
public void write(DataOutput out) throws IOException{
out.writeUTF(customerID);
out.writeUTF(customerName);
out.writeUTF(phoneNumber);
}
public void setCustomerID(String customerID){
this.customerID=customerID;
}
public void setCustomerName(String customerName){
this.customerName=customerName;
}
public void setPhoneNumber(String phoneNumber){
this.phoneNumber=phoneNumber;
}
public String toString(){
return this.customerID+","+this.customerName+","+this.phoneNumber;
}
}
public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{
Text result= new Text();
public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{
result.set(value.toString());
collector.collect(key, result);
}
}
public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{
public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{
String str="";
while(values.hasNext()){
str+=values.next().toString();
}
output.collect(null, new Text(str));
}
}
public static void main(String [] args) throws Exception{
/**
* 从关系数据库读取数据到HDFS
*/
JobConf job = new JobConf();
job.setJarByClass(SDBConnInput.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out"));
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
String fieldNames []={"customerID","customerName","phoneNumber"};
DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames);
job.setMapperClass(MapperClass.class);
job.setReducerClass(ReducerClass.class);
JobClient.runJob(job);
}
}
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.*;
public class SDBConnOutput {
public static class CustomerRecord implements Writable,DBWritable{
String customerID;
String customerName;
String phoneNumber;
public void readFields(ResultSet resultSet)  throws SQLException{
customerID=resultSet.getString(1);
customerName=resultSet.getString(2);
phoneNumber=resultSet.getString(3);
}
public void write(PreparedStatement statement)  throws SQLException{
statement.setString(1, customerID);
statement.setString(2, customerName);
statement.setString(3,phoneNumber);
}
public void readFields(DataInput in) throws IOException{
customerID=in.readUTF();
customerName=in.readUTF();
phoneNumber=in.readUTF();
}
public void write(DataOutput out) throws IOException{
out.writeUTF(customerID);
out.writeUTF(customerName);
out.writeUTF(phoneNumber);
}
public void setCustomerID(String customerID){
this.customerID=customerID;
}
public void setCustomerName(String customerName){
this.customerName=customerName;
}
public void setPhoneNumber(String phoneNumber){
this.phoneNumber=phoneNumber;
}
public String toString(){
return this.customerID+","+this.customerName+","+this.phoneNumber;
}
}
public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{
CustomerRecord customer=new CustomerRecord();
public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter)  throws IOException{
String [] strs=value.toString().split(",");
customer.setCustomerID(strs[0]);
customer.setCustomerName(strs[1]);
customer.setPhoneNumber(strs[2]);
collector.collect( customer,value);
}
}
/**
*将HDFS中的文件输出到数据库
*/
public static void main(String [] args) throws Exception{
/**
* 从关系数据库读取数据到HDFS
*/
JobConf job = new JobConf(SDBConnInput.class);
//DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中
job.setOutputFormat(DBOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt"));
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
String fieldNames []={"customerID","customerName","phoneNumber"};
DBOutputFormat.setOutput(job, "customers", fieldNames);
job.setMapperClass(MapperClass.class);
job.setNumReduceTasks(0);
JobClient.runJob(job);
}
}

注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。

  • mysql-connector-java-5.1.22-bin.jar (813.4 KB)
  • 下载次数: 3

Hadoop与关系数据库相关推荐

  1. Hadoop冷热数据转换工具Sqoop

    Sqoop是Apache基金下的开源项目,目的是完成关系数据库和Hadoop的转化,实现双向导入. 通常的大型数据系统使用之实现较少,主要原因是因为其在CDH3才开始支持,而且该方式是通过JDBC驱动 ...

  2. Hadoop辅助工具——Flume、Sqoop

    前言 在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集.结果数据导出.任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体 ...

  3. Hadoop生态系统各组件功能

    参考: Spark编程基础(Scala版) 林子雨 赖永炫 陶继平 人民邮电出版社出版 2018-07-01 1.HDFS 分布式文件系统 Hadoop分布式文件系统HDFS是针对谷歌分布式文件系统( ...

  4. Hadoop生态系统功能组件,主要包括哪些?

    经过多年的发展,Hadoop生态系统不断完善和成熟,目前已经包括了多个子项目,除了核心的HDFS和MapReduce以外,Hadoop生态系统还包括要ZoopKer.HBase.Hive.Pig.Ma ...

  5. Hadoop笔记-01概述

    文章目录 1 什么是大数据? 1.1 大数据计算模式及代表产品 1.2 云计算与物联网 1.2.1 云计算 1.2.1.1 虚拟化 1.2.1.2 分布式存储 1.2.1.3 分布式计算 1.2.1. ...

  6. Hadoop 和 Spark 知识点整理汇总

    文章目录 前言 一.LINUX 系统常用命令汇总 二.Hadoop 常用命令汇总 三.Hadoop 基本概念 1. Hadoop 特性 2. Hadoop 架构 2.1 Hadoop 集群 2.2 H ...

  7. hadoop生态圈各产品基本概念梳理

    hbase:是一个适合于非结构化数据存储的数据库,是基于列的而不是基于行的模式,HBase利用Hadoop MapReduce来处理HBase中的海量数据. HDFS: 是GFS的一种实现,他的完整名 ...

  8. 急性子的开源​​大数据,第 1 部分: Hadoop 教程:Hello World 与 Java、Pig、Hive、Flume、Fuse、Oozie,以及 Sqoop 与 Informix、DB2 和

    如何开始使用 Hadoop 和自己喜欢的数据库 本文的重点是解释大数据,然后在 Hadoop 中提供简单的工作示例,Hadoop 是在大数据领域的主要开源选手.您会很高兴地听到,Hadoop 并不是 ...

  9. Hadoop离线 day18 sqoop数据迁移和java执行shell命令

    sqoop数据迁移和java执行shell命令 3. sqoop数据迁移 3.1.概述 3.2.sqoop1与sqoop2架构对比 3.3.工作机制 3.4 .sqoop实战及原理 3.4.1 sqo ...

最新文章

  1. 《C程序猿从校园到职场》勘误
  2. cordova 更改app版本_ionic项目中使用cordova-hot-code-push插件
  3. 了解恶意软件和插件!
  4. ES备份工具elasticdump
  5. 原生js实现输入框焦点切换
  6. 【行业专题报告】 汽车、二手车-专题资料
  7. 云计算认证哪个好?考什么内容?
  8. 中英文停止词表(stopword)
  9. netcore 动软三层架构-急速开发框架 winfrom demo
  10. 两种 Type-C 耳机:模拟耳机 数字耳机
  11. 晨枫U盘维护工具V2.0版
  12. Linux用户、组管理
  13. win7怎么看计算机内存不足,Win7电脑提示虚拟内存不足怎么办?Win7电脑虚拟内存不足解决方法...
  14. JavaScript常见的请求头和响应头
  15. Spark入门实战系列--9.Spark图计算GraphX介绍及实例
  16. git tag与git tag -a的不同
  17. 【AS git 报错 Device not configured】
  18. 【Latex】TexLive+VScode+SumatraPDF 配置LaTex编辑环境
  19. ShaderToy上后处理练习1——故障
  20. 手把手教你腾讯云搭建RUOYI系统

热门文章

  1. JAVA|学生类Student
  2. 2021年中国再生塑料行业现状分析:回收利用量及回金额双增长 [图]
  3. Navicat生成数据库的模型并展示属性的中文注释
  4. 机器学习之求解无约束最优化问题方法(手推公式版)
  5. 物联网协议之MQTT源码分析(二)
  6. 感动你我,感动中国,历年感动中国人物评选体育类获奖人物盘点
  7. 干货:复杂网络及其应用简介
  8. 【瑞萨RA_FSP】GPT—— 通用PWM定时器
  9. matlab程序最大cpu利用率,Matlab纯CPU性能测试
  10. 复旦微ZYNQ procise axi读取adc数据