springboot集成hadoop实战
springboot集成hadoop实现hdfs增删改查
maven坐标
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-streaming</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-distcp</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><!-- 中文分词器 --><dependency><groupId>cn.bestwu</groupId><artifactId>ik-analyzers</artifactId><version>5.1.0</version></dependency>
配置
hdfs的配置
hdfs:hdfsPath: hdfs://bigdata-master:8020hdfsName: bigdata-master
将fileSystem配置并注册到spring容器
@Slf4j
@Configuration
public class HadoopHDFSConfiguration {@Value("${hdfs.hdfsPath}")private String hdfsPath;@Value("${hdfs.hdfsName}")private String hdfsName;@Beanpublic org.apache.hadoop.conf.Configuration getConfiguration(){org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();configuration.set("fs.defaultFS", hdfsPath);return configuration;}@Beanpublic FileSystem getFileSystem(){FileSystem fileSystem = null;try {fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);} catch (IOException e) {// TODO Auto-generated catch blocklog.error(e.getMessage());} catch (InterruptedException e) {// TODO Auto-generated catch blocklog.error(e.getMessage());} catch (URISyntaxException e) {// TODO Auto-generated catch blocklog.error(e.getMessage());}return fileSystem;}}
增删改查
public interface HDFSService {// 创建文件夹boolean makeFolder(String path);// 是否存在文件boolean existFile(String path);List<Map<String, Object>> readCatalog(String path);boolean createFile(String path, MultipartFile file);String readFileContent(String path);List<Map<String, Object>> listFile(String path);boolean renameFile(String oldName, String newName);boolean deleteFile(String path);boolean uploadFile(String path, String uploadPath);boolean downloadFile(String path, String downloadPath);boolean copyFile(String sourcePath, String targetPath);byte[] openFileToBytes(String path);BlockLocation[] getFileBlockLocations(String path);}
@Slf4j
@Service
public class HDFSServiceImpl implements HDFSService {private static final int bufferSize = 1024 * 1024 * 64;@Autowiredprivate FileSystem fileSystem;@Overridepublic boolean makeFolder(String path) {boolean target = false;if (StringUtils.isEmpty(path)) {return false;}if (existFile(path)) {return true;}Path src = new Path(path);try {target = fileSystem.mkdirs(src);} catch (IOException e) {log.error(e.getMessage());}return target;}@Overridepublic boolean existFile(String path) {if (StringUtils.isEmpty(path)){return false;}Path src = new Path(path);try {return fileSystem.exists(src);} catch (IOException e) {log.error(e.getMessage());}return false;}@Overridepublic List<Map<String, Object>> readCatalog(String path) {if (StringUtils.isEmpty(path)){return Collections.emptyList();}if (!existFile(path)){log.error("catalog is not exist!!");return Collections.emptyList();}Path src = new Path(path);FileStatus[] fileStatuses = null;try {fileStatuses = fileSystem.listStatus(src);} catch (IOException e) {log.error(e.getMessage());}List<Map<String, Object>> result = new ArrayList<>(fileStatuses.length);if (null != fileStatuses && 0 < fileStatuses.length) {for (FileStatus fileStatus : fileStatuses) {Map<String, Object> cataLogMap = new HashMap<>();cataLogMap.put("filePath", fileStatus.getPath());cataLogMap.put("fileStatus", fileStatus);result.add(cataLogMap);}}return result;}@Overridepublic boolean createFile(String path, MultipartFile file) {boolean target = false;if (StringUtils.isEmpty(path)) {return false;}String fileName = file.getName();Path newPath = new Path(path + "/" + fileName);FSDataOutputStream outputStream = null;try {outputStream = fileSystem.create(newPath);outputStream.write(file.getBytes());target = true;} catch (IOException e) {log.error(e.getMessage());} finally {if (null != outputStream) {try {outputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return target;}@Overridepublic String readFileContent(String path) {if (StringUtils.isEmpty(path)){return null;}if (!existFile(path)) {return null;}Path src = new Path(path);FSDataInputStream inputStream = null;StringBuilder sb = new StringBuilder();try {inputStream = fileSystem.open(src);String lineText = "";while ((lineText = inputStream.readLine()) != null) {sb.append(lineText);}} catch (IOException e) {log.error(e.getMessage());} finally {if (null != inputStream) {try {inputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return sb.toString();}@Overridepublic List<Map<String, Object>> listFile(String path) {if (StringUtils.isEmpty(path)) {return Collections.emptyList();}if (!existFile(path)) {return Collections.emptyList();}List<Map<String,Object>> resultList = new ArrayList<>();Path src = new Path(path);try {RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(src, true);while (fileIterator.hasNext()) {LocatedFileStatus next = fileIterator.next();Path filePath = next.getPath();String fileName = filePath.getName();Map<String, Object> map = new HashMap<>();map.put("fileName", fileName);map.put("filePath", filePath.toString());resultList.add(map);}} catch (IOException e) {log.error(e.getMessage());}return resultList;}@Overridepublic boolean renameFile(String oldName, String newName) {boolean target = false;if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {return false;}Path oldPath = new Path(oldName);Path newPath = new Path(newName);try {target = fileSystem.rename(oldPath, newPath);} catch (IOException e) {log.error(e.getMessage());}return target;}@Overridepublic boolean deleteFile(String path) {boolean target = false;if (StringUtils.isEmpty(path)) {return false;}if (!existFile(path)) {return false;}Path src = new Path(path);try {target = fileSystem.deleteOnExit(src);} catch (IOException e) {log.error(e.getMessage());}return target;}@Overridepublic boolean uploadFile(String path, String uploadPath) {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {return false;}Path clientPath = new Path(path);Path serverPath = new Path(uploadPath);try {fileSystem.copyFromLocalFile(false,clientPath,serverPath);return true;} catch (IOException e) {log.error(e.getMessage(), e);}return false;}@Overridepublic boolean downloadFile(String path, String downloadPath) {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {return false;}Path clienPath = new Path(path);Path targetPath = new Path(downloadPath);try {fileSystem.copyToLocalFile(false,clienPath, targetPath);return true;} catch (IOException e) {log.error(e.getMessage());}return false;}@Overridepublic boolean copyFile(String sourcePath, String targetPath) {if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {return false;}Path oldPath = new Path(sourcePath);Path newPath = new Path(targetPath);FSDataInputStream inputStream = null;FSDataOutputStream outputStream = null;try {inputStream = fileSystem.open(oldPath);outputStream = fileSystem.create(newPath);IOUtils.copyBytes(inputStream,outputStream,bufferSize,false);return true;} catch (IOException e) {log.error(e.getMessage());} finally {if (null != inputStream) {try {inputStream.close();} catch (IOException e) {log.error(e.getMessage());}}if (null != outputStream) {try {outputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return false;}@Overridepublic byte[] openFileToBytes(String path) {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}Path src = new Path(path);byte[] result = null;FSDataInputStream inputStream = null;try {inputStream = fileSystem.open(src);result = IOUtils.readFullyToByteArray(inputStream);} catch (IOException e) {log.error(e.getMessage());} finally {if (null != inputStream){try {inputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return result;}@Overridepublic BlockLocation[] getFileBlockLocations(String path) {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}BlockLocation[] blocks = null;Path src = new Path(path);try{FileStatus fileStatus = fileSystem.getFileStatus(src);blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());}catch(Exception e){log.error(e.getMessage());}return blocks;}
}
mapReduce
package com.winterchen.hadoopdemo.reduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;/** 继承Reducer类需要定义四个输出、输出类型泛型:* 四个泛型类型分别代表:* KeyIn Reducer的输入数据的Key,这里是每行文字中的单词"hello"* ValueIn Reducer的输入数据的Value,这里是每行文字中的次数* KeyOut Reducer的输出数据的Key,这里是每行文字中的单词"hello"* ValueOut Reducer的输出数据的Value,这里是每行文字中的出现的总次数*/
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();private List<String> textList = new ArrayList<>();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);String keyStr = key.toString();// 使用分词器,内容已经被统计好了,直接输出即可if (textList.contains(keyStr)) {System.out.println("============ " + keyStr + " 统计分词为: " + sum + " ============");}}
}
package com.winterchen.hadoopdemo.configuration;import com.winterchen.hadoopdemo.HadoopDemoApplication;
import com.winterchen.hadoopdemo.mapper.WordMapper;
import com.winterchen.hadoopdemo.reduce.WordReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.IOException;@Component
public class ReduceJobsConfiguration {@Value("${hdfs.hdfsPath}")private String hdfsPath;/*** 获取HDFS配置信息** @return*/public Configuration getConfiguration() {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", hdfsPath);configuration.set("mapred.job.tracker", hdfsPath);return configuration;}/*** 获取单词统计的配置信息** @param jobName* @param inputPath* @param outputPath* @throws IOException* @throws ClassNotFoundException* @throws InterruptedException*/public void getWordCountJobsConf(String jobName, String inputPath, String outputPath)throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = getConfiguration();Job job = Job.getInstance(conf, jobName);job.setMapperClass(WordMapper.class);job.setCombinerClass(WordReduce.class);job.setJarByClass(HadoopDemoApplication.class);job.setReducerClass(WordReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));job.waitForCompletion(true);}@PostConstructpublic void getPath() {hdfsPath = this.hdfsPath;}public String getHdfsPath() {return hdfsPath;}
}
public interface MapReduceService {void wordCount(String jobName, String inputPath, String outputPath) throws Exception;}
package com.winterchen.hadoopdemo.service.impl;import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;@Service
public class MapReduceServiceImpl implements MapReduceService {@Autowiredprivate HDFSService hdfsService;@Autowiredprivate ReduceJobsConfiguration reduceJobsConfiguration;@Overridepublic void wordCount(String jobName, String inputPath, String outputPath) throws Exception {if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {return;}// 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的if (hdfsService.existFile(outputPath)) {hdfsService.deleteFile(outputPath);}reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);}
}
package com.winterchen.hadoopdemo.service.impl;import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;@Service
public class MapReduceServiceImpl implements MapReduceService {@Autowiredprivate HDFSService hdfsService;@Autowiredprivate ReduceJobsConfiguration reduceJobsConfiguration;@Overridepublic void wordCount(String jobName, String inputPath, String outputPath) throws Exception {if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {return;}// 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的if (hdfsService.existFile(outputPath)) {hdfsService.deleteFile(outputPath);}reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);}
}
@Slf4j
@Api(tags = "map reduce api")
@RestController
@RequestMapping("/api/v1/map-reduce")
public class MapReduceController {@Autowiredprivate MapReduceService mapReduceService;@ApiOperation("count word")@PostMapping("/word/count")public APIResponse wordCount(@ApiParam(name = "jobName", required = true)@RequestParam(name = "jobName", required = true)String jobName,@ApiParam(name = "inputPath", required = true)@RequestParam(name = "inputPath", required = true)String inputPath,@ApiParam(name = "outputPath", required = true)@RequestParam(name = "outputPath", required = true)String outputPath){try {mapReduceService.wordCount(jobName, inputPath, outputPath);return APIResponse.success();} catch (Exception e) {log.error(e.getMessage());return APIResponse.fail(e.getMessage());}}
}
以上就是日常开发中能使用到的基本的功能:hdfs的增删改查,以及MapReduce;
源码地址:
WinterChenS/springboot-learning-experience
springboot集成hadoop实战相关推荐
- SpringBoot笔记:SpringBoot集成JWT实战
文章目录 JWT 简介 概念 JWT 的认证流程 优缺点 JWT 消息构成 header playload signature SpringBoot 集成 JWT 实战 maven 依赖 JwtUti ...
- SpringBoot集成neo4j实战
文章目录 1.图数据库Neo4j介绍 1.1 什么是图数据库(graph database) 1.2 为什么需要图数据库 1.3 Neo4j特点和优势 Neo4j的特点 Neo4j的优点 1.4 Ne ...
- Springboot集成Hadoop+Hbase实现企业能源消耗监测大数据分析系统
企业硬件设备较多,不利于快速发现设备故障及能源消耗异常.依托于hadoop.hbase搭建大数据分析平台,采用Springboot开发框架搭建一套完善的企业能源监控检测数据分析可视化平台.本次毕设程序 ...
- SpringBoot集成Redis实战——步骤、坑点、解决方案
背景 回顾项目中的TODO工作,发现留了一条待办项,即对Redis配置参数的具体含义的了解.开发平台研发期间,由于时间紧张,对于Redis,没有进行相对充分的技术预研,集成的比较粗放,虽然目标达成了, ...
- dubbo web工程示例_dubbo实战之二:与SpringBoot集成
欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类和汇总,及配套源码,涉及Java.Docker.Kubernetes.DevO ...
- SpringBoot 集成 layering-cache 实现两级缓存调研与实践
前言 对于系统查多改少的数据,可以通过缓存来提升系统的访问性能.一般情况下我们会采用 Redis ,但是如果仅仅依赖 Redis 很容易出现缓存雪崩的情况.为了防止缓存雪崩可以通过 Redis 高可用 ...
- kafka(组件分析 整合springboot集成 实战)
kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...
- SpringBoot集成Elasticsearch7.4 实战(一)
在网上已经有好多关于Elasticsearch的介绍,就不在翻来覆去讲一些基本概念,大家感兴趣的可以自己去找一些资料巩固下.这次只为了顾及众多首次接触Elasticsearch,案例都讲的很浅显,还有 ...
- 从ElasticSearch 认识到实战(SpringBoot集成ES)
ElasticSearch 认识到实战 目录 搜索引擎介绍 ElasticSearch知识 安装 使用restful风格查询ES SpringBoot配置ES SpringBoot集成使用 一.搜索引 ...
最新文章
- Ubuntu终端远程工具
- 苹果员工“神操作”:自建网站揭露公司性骚扰和歧视事件
- Java 常用API的运用,效率及技巧
- linux目录和文件管理命令
- OpenCV:OpenCV图像旋转的代码
- 补习系列(18)-springboot H2 迷你数据库
- redis-使用问题
- 怎么把计算机模式重置,电脑怎么还原出厂模式
- Myrrix——基于Mahout的开源推荐系统
- USC ECG Learning Center/ ECG Glossary
- 如何在Mac OS X上安装 Ruby运行环境
- excel wind插件使用_这些超实用的Excel插件,你要是都知道,确定老司机无疑了
- java以及JavaScript的香港身份证验证方法。
- 戴尔游匣7559更换C面和D面以及升级内存硬盘教程
- 谷歌云指南_Google材料设计指南的10个重要要点
- 鸿蒙系统怎么安装网易云音乐,网易云音乐鸿蒙版 - 魔法系统之家下载
- 解决苹果手机绑定小米手环4支付宝,蓝牙无法连接问题。
- 【推荐】2022年物流快递行业市场行情分析投资趋势产业发展前景调研究报告(附件中为网盘地址,报告持续更新)
- web安全:XSS测试平台使用教程
- 小节点也能引爆活动!2021四月活动指导方案