springboot集成hadoop实现hdfs增删改查

maven坐标

             <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-streaming</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-distcp</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><!-- 中文分词器 --><dependency><groupId>cn.bestwu</groupId><artifactId>ik-analyzers</artifactId><version>5.1.0</version></dependency>

配置

hdfs的配置

hdfs:hdfsPath: hdfs://bigdata-master:8020hdfsName: bigdata-master

将fileSystem配置并注册到spring容器

@Slf4j
@Configuration
public class HadoopHDFSConfiguration {@Value("${hdfs.hdfsPath}")private String hdfsPath;@Value("${hdfs.hdfsName}")private String hdfsName;@Beanpublic org.apache.hadoop.conf.Configuration  getConfiguration(){org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();configuration.set("fs.defaultFS", hdfsPath);return configuration;}@Beanpublic FileSystem getFileSystem(){FileSystem fileSystem = null;try {fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);} catch (IOException e) {// TODO Auto-generated catch blocklog.error(e.getMessage());} catch (InterruptedException e) {// TODO Auto-generated catch blocklog.error(e.getMessage());} catch (URISyntaxException e) {// TODO Auto-generated catch blocklog.error(e.getMessage());}return fileSystem;}}

增删改查

public interface HDFSService {// 创建文件夹boolean makeFolder(String path);// 是否存在文件boolean existFile(String path);List<Map<String, Object>> readCatalog(String path);boolean createFile(String path, MultipartFile file);String readFileContent(String path);List<Map<String, Object>> listFile(String path);boolean renameFile(String oldName, String newName);boolean deleteFile(String path);boolean uploadFile(String path, String uploadPath);boolean downloadFile(String path, String downloadPath);boolean copyFile(String sourcePath, String targetPath);byte[] openFileToBytes(String path);BlockLocation[] getFileBlockLocations(String path);}
@Slf4j
@Service
public class HDFSServiceImpl implements HDFSService {private static final int bufferSize = 1024 * 1024 * 64;@Autowiredprivate FileSystem fileSystem;@Overridepublic boolean makeFolder(String path) {boolean target = false;if (StringUtils.isEmpty(path)) {return false;}if (existFile(path)) {return true;}Path src = new Path(path);try {target = fileSystem.mkdirs(src);} catch (IOException e) {log.error(e.getMessage());}return target;}@Overridepublic boolean existFile(String path) {if (StringUtils.isEmpty(path)){return false;}Path src = new Path(path);try {return fileSystem.exists(src);} catch (IOException e) {log.error(e.getMessage());}return false;}@Overridepublic List<Map<String, Object>> readCatalog(String path) {if (StringUtils.isEmpty(path)){return Collections.emptyList();}if (!existFile(path)){log.error("catalog is not exist!!");return Collections.emptyList();}Path src = new Path(path);FileStatus[] fileStatuses = null;try {fileStatuses = fileSystem.listStatus(src);} catch (IOException e) {log.error(e.getMessage());}List<Map<String, Object>> result = new ArrayList<>(fileStatuses.length);if (null != fileStatuses && 0 < fileStatuses.length) {for (FileStatus fileStatus : fileStatuses) {Map<String, Object> cataLogMap = new HashMap<>();cataLogMap.put("filePath", fileStatus.getPath());cataLogMap.put("fileStatus", fileStatus);result.add(cataLogMap);}}return result;}@Overridepublic boolean createFile(String path, MultipartFile file) {boolean target = false;if (StringUtils.isEmpty(path)) {return false;}String fileName = file.getName();Path newPath = new Path(path + "/" + fileName);FSDataOutputStream outputStream = null;try {outputStream = fileSystem.create(newPath);outputStream.write(file.getBytes());target = true;} catch (IOException e) {log.error(e.getMessage());} finally {if (null != outputStream) {try {outputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return target;}@Overridepublic String readFileContent(String path) {if (StringUtils.isEmpty(path)){return null;}if (!existFile(path)) {return null;}Path src = new Path(path);FSDataInputStream inputStream = null;StringBuilder sb = new StringBuilder();try {inputStream = fileSystem.open(src);String lineText = "";while ((lineText = inputStream.readLine()) != null) {sb.append(lineText);}} catch (IOException e) {log.error(e.getMessage());} finally {if (null != inputStream) {try {inputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return sb.toString();}@Overridepublic List<Map<String, Object>> listFile(String path) {if (StringUtils.isEmpty(path)) {return Collections.emptyList();}if (!existFile(path)) {return Collections.emptyList();}List<Map<String,Object>> resultList = new ArrayList<>();Path src = new Path(path);try {RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(src, true);while (fileIterator.hasNext()) {LocatedFileStatus next = fileIterator.next();Path filePath = next.getPath();String fileName = filePath.getName();Map<String, Object> map = new HashMap<>();map.put("fileName", fileName);map.put("filePath", filePath.toString());resultList.add(map);}} catch (IOException e) {log.error(e.getMessage());}return resultList;}@Overridepublic boolean renameFile(String oldName, String newName) {boolean target = false;if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {return false;}Path oldPath = new Path(oldName);Path newPath = new Path(newName);try {target = fileSystem.rename(oldPath, newPath);} catch (IOException e) {log.error(e.getMessage());}return target;}@Overridepublic boolean deleteFile(String path) {boolean target = false;if (StringUtils.isEmpty(path)) {return false;}if (!existFile(path)) {return false;}Path src = new Path(path);try {target = fileSystem.deleteOnExit(src);} catch (IOException e) {log.error(e.getMessage());}return target;}@Overridepublic boolean uploadFile(String path, String uploadPath) {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {return false;}Path clientPath = new Path(path);Path serverPath = new Path(uploadPath);try {fileSystem.copyFromLocalFile(false,clientPath,serverPath);return true;} catch (IOException e) {log.error(e.getMessage(), e);}return false;}@Overridepublic boolean downloadFile(String path, String downloadPath) {if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {return false;}Path clienPath = new Path(path);Path targetPath = new Path(downloadPath);try {fileSystem.copyToLocalFile(false,clienPath, targetPath);return true;} catch (IOException e) {log.error(e.getMessage());}return false;}@Overridepublic boolean copyFile(String sourcePath, String targetPath) {if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {return false;}Path oldPath = new Path(sourcePath);Path newPath = new Path(targetPath);FSDataInputStream inputStream = null;FSDataOutputStream outputStream = null;try {inputStream = fileSystem.open(oldPath);outputStream = fileSystem.create(newPath);IOUtils.copyBytes(inputStream,outputStream,bufferSize,false);return true;} catch (IOException e) {log.error(e.getMessage());} finally {if (null != inputStream) {try {inputStream.close();} catch (IOException e) {log.error(e.getMessage());}}if (null != outputStream) {try {outputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return false;}@Overridepublic byte[] openFileToBytes(String path) {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}Path src = new Path(path);byte[] result = null;FSDataInputStream inputStream = null;try {inputStream = fileSystem.open(src);result = IOUtils.readFullyToByteArray(inputStream);} catch (IOException e) {log.error(e.getMessage());} finally {if (null != inputStream){try {inputStream.close();} catch (IOException e) {log.error(e.getMessage());}}}return result;}@Overridepublic BlockLocation[] getFileBlockLocations(String path) {if (StringUtils.isEmpty(path)) {return null;}if (!existFile(path)) {return null;}BlockLocation[] blocks = null;Path src = new Path(path);try{FileStatus fileStatus = fileSystem.getFileStatus(src);blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());}catch(Exception e){log.error(e.getMessage());}return blocks;}
}

mapReduce

package com.winterchen.hadoopdemo.reduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;/** 继承Reducer类需要定义四个输出、输出类型泛型:* 四个泛型类型分别代表:* KeyIn        Reducer的输入数据的Key,这里是每行文字中的单词"hello"* ValueIn      Reducer的输入数据的Value,这里是每行文字中的次数* KeyOut       Reducer的输出数据的Key,这里是每行文字中的单词"hello"* ValueOut     Reducer的输出数据的Value,这里是每行文字中的出现的总次数*/
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();private List<String> textList = new ArrayList<>();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);String keyStr = key.toString();// 使用分词器,内容已经被统计好了,直接输出即可if (textList.contains(keyStr)) {System.out.println("============ " + keyStr + " 统计分词为: " + sum + " ============");}}
}
package com.winterchen.hadoopdemo.configuration;import com.winterchen.hadoopdemo.HadoopDemoApplication;
import com.winterchen.hadoopdemo.mapper.WordMapper;
import com.winterchen.hadoopdemo.reduce.WordReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.IOException;@Component
public class ReduceJobsConfiguration {@Value("${hdfs.hdfsPath}")private String hdfsPath;/*** 获取HDFS配置信息** @return*/public Configuration getConfiguration() {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", hdfsPath);configuration.set("mapred.job.tracker", hdfsPath);return configuration;}/*** 获取单词统计的配置信息** @param jobName* @param inputPath* @param outputPath* @throws IOException* @throws ClassNotFoundException* @throws InterruptedException*/public void getWordCountJobsConf(String jobName, String inputPath, String outputPath)throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = getConfiguration();Job job = Job.getInstance(conf, jobName);job.setMapperClass(WordMapper.class);job.setCombinerClass(WordReduce.class);job.setJarByClass(HadoopDemoApplication.class);job.setReducerClass(WordReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));job.waitForCompletion(true);}@PostConstructpublic void getPath() {hdfsPath = this.hdfsPath;}public String getHdfsPath() {return hdfsPath;}
}
public interface MapReduceService {void wordCount(String jobName, String inputPath, String outputPath) throws Exception;}
package com.winterchen.hadoopdemo.service.impl;import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;@Service
public class MapReduceServiceImpl implements MapReduceService {@Autowiredprivate HDFSService hdfsService;@Autowiredprivate ReduceJobsConfiguration reduceJobsConfiguration;@Overridepublic void wordCount(String jobName, String inputPath, String outputPath) throws Exception {if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {return;}// 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的if (hdfsService.existFile(outputPath)) {hdfsService.deleteFile(outputPath);}reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);}
}
package com.winterchen.hadoopdemo.service.impl;import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;@Service
public class MapReduceServiceImpl implements MapReduceService {@Autowiredprivate HDFSService hdfsService;@Autowiredprivate ReduceJobsConfiguration reduceJobsConfiguration;@Overridepublic void wordCount(String jobName, String inputPath, String outputPath) throws Exception {if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {return;}// 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的if (hdfsService.existFile(outputPath)) {hdfsService.deleteFile(outputPath);}reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);}
}
@Slf4j
@Api(tags = "map reduce api")
@RestController
@RequestMapping("/api/v1/map-reduce")
public class MapReduceController {@Autowiredprivate MapReduceService mapReduceService;@ApiOperation("count word")@PostMapping("/word/count")public APIResponse wordCount(@ApiParam(name = "jobName", required = true)@RequestParam(name = "jobName", required = true)String jobName,@ApiParam(name = "inputPath", required = true)@RequestParam(name = "inputPath", required = true)String inputPath,@ApiParam(name = "outputPath", required = true)@RequestParam(name = "outputPath", required = true)String outputPath){try {mapReduceService.wordCount(jobName, inputPath, outputPath);return APIResponse.success();} catch (Exception e) {log.error(e.getMessage());return APIResponse.fail(e.getMessage());}}
}

以上就是日常开发中能使用到的基本的功能:hdfs的增删改查,以及MapReduce;

源码地址:

WinterChenS/springboot-learning-experience

springboot集成hadoop实战相关推荐

  1. SpringBoot笔记:SpringBoot集成JWT实战

    文章目录 JWT 简介 概念 JWT 的认证流程 优缺点 JWT 消息构成 header playload signature SpringBoot 集成 JWT 实战 maven 依赖 JwtUti ...

  2. SpringBoot集成neo4j实战

    文章目录 1.图数据库Neo4j介绍 1.1 什么是图数据库(graph database) 1.2 为什么需要图数据库 1.3 Neo4j特点和优势 Neo4j的特点 Neo4j的优点 1.4 Ne ...

  3. Springboot集成Hadoop+Hbase实现企业能源消耗监测大数据分析系统

    企业硬件设备较多,不利于快速发现设备故障及能源消耗异常.依托于hadoop.hbase搭建大数据分析平台,采用Springboot开发框架搭建一套完善的企业能源监控检测数据分析可视化平台.本次毕设程序 ...

  4. SpringBoot集成Redis实战——步骤、坑点、解决方案

    背景 回顾项目中的TODO工作,发现留了一条待办项,即对Redis配置参数的具体含义的了解.开发平台研发期间,由于时间紧张,对于Redis,没有进行相对充分的技术预研,集成的比较粗放,虽然目标达成了, ...

  5. dubbo web工程示例_dubbo实战之二:与SpringBoot集成

    欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类和汇总,及配套源码,涉及Java.Docker.Kubernetes.DevO ...

  6. SpringBoot 集成 layering-cache 实现两级缓存调研与实践

    前言 对于系统查多改少的数据,可以通过缓存来提升系统的访问性能.一般情况下我们会采用 Redis ,但是如果仅仅依赖 Redis 很容易出现缓存雪崩的情况.为了防止缓存雪崩可以通过 Redis 高可用 ...

  7. kafka(组件分析 整合springboot集成 实战)

    kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...

  8. SpringBoot集成Elasticsearch7.4 实战(一)

    在网上已经有好多关于Elasticsearch的介绍,就不在翻来覆去讲一些基本概念,大家感兴趣的可以自己去找一些资料巩固下.这次只为了顾及众多首次接触Elasticsearch,案例都讲的很浅显,还有 ...

  9. 从ElasticSearch 认识到实战(SpringBoot集成ES)

    ElasticSearch 认识到实战 目录 搜索引擎介绍 ElasticSearch知识 安装 使用restful风格查询ES SpringBoot配置ES SpringBoot集成使用 一.搜索引 ...

最新文章

  1. Ubuntu终端远程工具
  2. 苹果员工“神操作”:自建网站揭露公司性骚扰和歧视事件
  3. Java 常用API的运用,效率及技巧
  4. linux目录和文件管理命令
  5. OpenCV:OpenCV图像旋转的代码
  6. 补习系列(18)-springboot H2 迷你数据库
  7. redis-使用问题
  8. 怎么把计算机模式重置,电脑怎么还原出厂模式
  9. Myrrix——基于Mahout的开源推荐系统
  10. USC ECG Learning Center/ ECG Glossary
  11. 如何在Mac OS X上安装 Ruby运行环境
  12. excel wind插件使用_这些超实用的Excel插件,你要是都知道,确定老司机无疑了
  13. java以及JavaScript的香港身份证验证方法。
  14. 戴尔游匣7559更换C面和D面以及升级内存硬盘教程
  15. 谷歌云指南_Google材料设计指南的10个重要要点
  16. 鸿蒙系统怎么安装网易云音乐,网易云音乐鸿蒙版 - 魔法系统之家下载
  17. 解决苹果手机绑定小米手环4支付宝,蓝牙无法连接问题。
  18. 【推荐】2022年物流快递行业市场行情分析投资趋势产业发展前景调研究报告(附件中为网盘地址,报告持续更新)
  19. web安全:XSS测试平台使用教程
  20. 小节点也能引爆活动!2021四月活动指导方案

热门文章

  1. 相位差和相移理论知识概括
  2. 工具篇 之 Mac 安装 JDK 1.8 并配置环境变量
  3. 3D动态烟花--HTML
  4. 线程传值数据丢失_开放线程:如何防止数据丢失
  5. 第三周作业 产品同质化问题
  6. matplotlib.pyplot.pie()绘制饼图
  7. FT2232作为JTAG烧录器的使用步骤详解
  8. oracle exp 详解,oracle exp 详解
  9. vue入门(一)----工程vue_sell
  10. matlab ccd采集,CCD数据采集.doc