相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录)

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")

把分区设置成1个 结果是Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中

问题:如何让spark将Rdd结果输出到一个文件而不是目录中呢?

Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:

package com.ys.penspark.util;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;/*** @ClassName: HdfsOperate* @Description:* @Author: Administrator* @Date: 2017/6/28*/
public class HdfsOperate implements Serializable {private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);private static Configuration conf = new Configuration();private static BufferedWriter writer = null;//在hdfs的目标位置新建一个文件,得到一个输出流public static void openHdfsFile(String path) throws Exception {FileSystem fs = FileSystem.get(URI.create(path),conf);writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));if(null!=writer){logger.info("[HdfsOperate]>> initialize writer succeed!");}}//往hdfs文件中写入数据public static void writeString(String line) {try {writer.write(line + "\n");}catch(Exception e){logger.error("[HdfsOperate]>> writer a line error:"  ,  e);}}//关闭hdfs输出流public static void closeHdfsFile() {try {if (null != writer) {writer.close();logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");}else{logger.error("[HdfsOperate]>> closeHdfsFile writer is null");}}catch(Exception e){logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);}}}

  先将spark的Rdd重新分区,再将每个分区的数据collectPartitions按行写入hdfs文件中

package com.ys.penspark.util;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;/*** @ClassName: FeatureExtractor* @Description:* @Author: mashiwei* @Date: 2017/6/28*/
public class FeatureExtractor implements Serializable{private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception {StringBuffer sb = new StringBuffer();for (int i = 0; i<= s.schema().fieldNames().length-1;i++) {sb.append(s.schema().fieldNames()[i]);if (i == s.schema().fieldNames().length-1){break;}sb.append(",");}s.show();JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum);//写入hdfs文件位置
//        String destinationPath = "/kettle/penspark/data.txt" ;//创建Hdfs文件,打开Hdfs输出流HdfsOperate.openHdfsFile(out);HdfsOperate.writeString(sb.toString());//分块读取RDD数据并保存到hdfs//如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败for (int i = 0; i < repartitionNum; i++) {int[] index = new int[1];index[0] = i;
//            List<String>[] featureList = rddx.collectPartitions(index);
//            List<String> strs = rddx.collect();List<String>[] featureList = rddx.collectPartitions(index);if (featureList.length != 1) {logger.error("[FeatureExtractor]>> featureList.length is not 1!");}for (String str : featureList[0]) {//写一行到Hdfs文件logger.info("-----"+str);HdfsOperate.writeString(str);}}//关闭Hdfs输出流HdfsOperate.closeHdfsFile();}class ExtractFeatureMap implements Function<Row, String> {@Overridepublic String call(Row line) throws Exception {try {StringBuffer sb = new StringBuffer();int len = line.length();for (int i = 0; i<= len-1; i++){sb.append(line.get(i).toString());if (i == len-1){break;}sb.append(",");}return sb.toString();} catch (Exception e) {logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);}return null;}}public static void main(String[] args) {//        SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
//        JavaSparkContext sc= new JavaSparkContext(conf);StructType Schemafinal = new StructType();Map<String,String> options = new HashMap<String,String>();LinkedList<StructField> obj = new LinkedList<StructField>();StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty());StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty());
//        StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty());
//        StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty());obj.add(structField);obj.add(structField1);
//        obj.add(structField2);
//        obj.add(structField3);Schemafinal = new StructType(obj.toArray(new StructField[obj.size()]));SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]");options.put("delimiter",",");options.put("header","true");JavaSparkContext sc = new JavaSparkContext(conf);@SuppressWarnings("deprecation")SQLContext sqlContext = new SQLContext(sc);SparkSession spark = SparkSession.builder().appName("Pentaho Logic as Spark").config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", "file:///C:/tmp/").getOrCreate();Dataset<Row> tempdf = spark.read().format("com.databricks.spark.csv").options(options).schema(Schemafinal).option("header", true).load("file:///"+"C:\\Users\\Administrator\\Desktop\\测试\\功能开发\\excel.txt");tempdf.show();FeatureExtractor fx = new FeatureExtractor();try {
//            fx.extractFeature(sc,5);fx.extractFeature(tempdf,2,"/kettle/tempData.txt");} catch (Exception e) {e.printStackTrace();}}
}

  数据

name,age
zs, 44
li, 22
ww, 18

  

转载于:https://www.cnblogs.com/xiaoma0529/p/7090912.html

Spark 把RDD数据保存到hdfs单个文件中,而不是目录相关推荐

  1. Scrapy中将数据保存到Excel和MySQL中

    目录标题 1. Excel 1.1 openpyxl 1.1.1 代码说明 1.1.2 注意 1.2 pandas 1.2.1 代码说明 1.2.2 常见错误 1.3 openpyxl和pandas对 ...

  2. 简单的Http请求数据保存到Hdfs

    使用okhttp工具集来开发:(如果文件已经存在会报错) package com.etl;import java.io.IOException;import org.apache.commons.la ...

  3. Python网络爬虫:爬取CSDN热搜数据 并保存到本地文件中

    hello,大家好,我是wangzirui32,今天我们来学习如何爬取CSDN热搜数据,并保存到Excel表格中. 开始学习吧! 学习目录 1. 数据包抓取 2. 编写代码 1. 数据包抓取 打开CS ...

  4. 把数据保存到cook_将用户信息保存到Cookie中

    /** * 把用户保存到Cookie * * @param request * @param response * @param member */ private void rememberPwdA ...

  5. Pandas的学习(读取mongodb数据库集合到DataFrame,将DataFrame类型数据保存到mongodb数据库中)

    1.读取mongodb数据库集合到DataFrame import pymongo import pandas as pdclient = pymongo.MongoClient("数据库连 ...

  6. html 保存xlsx,HTML SaveXLSX按钮防止将数据保存到SlickGrid的XLSX文件中

    我在网页上有一个SlickGrid,我正在尝试添加一个按钮来调用函数CreateXLSX().当我编辑Main.jade的代码,我输入:HTML SaveXLSX按钮防止将数据保存到SlickGrid ...

  7. C++读取txt数据为二维数组 将数据保存到txt文本中

      C++读取txt数据为二维数组 保存txt文本数据     C++文件读写操作有:ofstream,ifstream,fstream: #include <fstream> ofstr ...

  8. vue 将数据保存到vuex中

    在项目中遇到这样一个问题,就是在登入的时候同时需要从后台获取到左边的导航,但是如果使用h5的localStorage来保存导航信息,会出现app加载进去之后localStorage才保存进浏览器,在m ...

  9. Python中用pandas将numpy中的数组数据保存到csv文件

    Python中用pandas将numpy中的数组数据保存到csv文件 本博客转载自:[1]https://blog.csdn.net/grey_csdn/article/details/7018587 ...

最新文章

  1. Unity3D 多平台 预编译 宏定义
  2. Web服务器 Web容器 Servlet容器
  3. 计算机数值方法期末考试,《计算机数值方法》测试题二
  4. python实现一个简单的加法计算器_Python tkinter实现简单加法计算器代码实例
  5. SAP CRM WebClient UI根据扩展字段搜索出结果的实现原理
  6. s3cmd安装及使用
  7. java xml 序列化_java-序列化为XML时忽略父类
  8. 作者:刘诗凯(1983-),男,华为大数据分析产品部主任工程师。
  9. java嵌入式软件开发工程师_嵌入式软件工程师笔试题
  10. 跨境电商开发,源码无加密
  11. iPhone升级系统卡在进度条界面怎么办?
  12. 产品读书.心理学《人际交往心理学》
  13. 关于牛顿迭代求根的笔记
  14. NVl和NVL2的语法使用详解
  15. Linux命令之查看磁盘空间
  16. 远程访问服务器并登录LPAR,远程访问HMC方法之 SSH
  17. android 身高体重设计,Android开发--身高体重指数(BIM)计算--设计用户界面--指定输入类型(InputType)...
  18. Unity+C#开发笔记(一)| 如何动态加载预制件 | ╭(●`∀´●)╯╰(●’◡’●)╮
  19. 重启资源管理器导致托盘图标消失的解决方法
  20. 关于ansys里面的谐分析和瞬态分析结果的讨论

热门文章

  1. 微博运营与微博营销最易犯的20种错误,你犯了吗?
  2. 设置cisco登录时显示的警示性信息-banner
  3. Java VS .Net 程序员的困惑 (转)
  4. 智能提示导致Visual Studio 2010崩溃问题
  5. 解决Couldn't resolve host 'mirrorlist.centos.org
  6. dp --- acdream原创群赛(16) --- B - Apple
  7. [转]Creating Unit Tests for ASP.NET MVC Applications (C#)
  8. 红盟idc技术培训群—安全培训(三)
  9. myeclipse中配置weblogic的开发环境
  10. 电脑启动后,没有桌面