Spark Parquet使用
Spark SQL下的Parquet使用最佳实践和代码实战
一、Spark SQL下的Parquet使用最佳实践
1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式:
a)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);
b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);
上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代
2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、GraphX等)-> Parquet -> 其它各种Data Mining等。
二、Parquet的精要介绍
Parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:
a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。
c)只读取需要的列,支持向量运算,能够获取更好的扫描性能。
设计蓝图
以上分解似乎完美,一起来看看“设计框架”或“蓝图”。
算了,不解释了,图,自己看。
Coding Style
从Kafka Stream获取数据
// 从Kafka Stream获取数据JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
写入Parquet
accessLogsDStream.foreachRDD(rdd -> {// 如果DF不为空,写入(增加模式)到Parquet文件DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);if (df.count() > 0) {df.write().mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());}return null;});
创建Hive表
使用spark-shell,获取Parquet文件, 写入一个临时表;
scala代码如下:
import sqlContext.implicits._val parquetFile = sqlContext.read.parquet("/user/spark/apachelog.parquet")parquetFile.registerTempTable("logs")
复制schema到新表链接到Parquet文件。
在Hive中复制表,这里你会发现,文件LOCATION位置还是原来的路径,目的就是这个,使得新写入的文件还在Hive模型中。
我总觉得这个方法有问题,是不是哪位Hive高人指点一下,有没有更好的办法来完成这个工作?
CREATE EXTERNAL TABLE apachelog LIKE logs STORED AS PARQUET LOCATION '/user/spark/apachelog.parquet';
启动你的SparkThriftServer
当然,在集群中启用ThriftServer是必须的工作,SparkThriftServer其实暴露的是Hive2服务器,用JDBC驱动就可以访问了。
我们都想要的结果
本博客中使用的SQL查询工具是SQuirreL SQL,具体JDBC配置方法请参照前面说的向左向右转。
结果看似简单,但是经历还是很有挑战的。
至此,本例已完成。完成代码见 GitHub
转自:https://blog.sectong.com/blog/spark_to_parquet.html
APPMain.java
[java] view plain copy- package com.sectong.spark_to_parquet;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import org.apache.commons.cli.Option;
- import org.apache.commons.cli.Options;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.SaveMode;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import kafka.serializer.StringDecoder;
- import scala.Tuple2;
- import scala.collection.Seq;
- /**
- * 运行程序,spark-submit --class "com.sectong.spark_to_parquet.AppMain" --master
- * yarn target/park_to_parquet-0.0.1-SNAPSHOT.jar --kafka_broker
- * hadoop1:6667,hadoop2:6667 --kafka_topic apache --parquet_file /user/spark/
- * --slide_interval 30
- */
- public class AppMain {
- public static final String WINDOW_LENGTH = "window_length";
- public static final String SLIDE_INTERVAL = "slide_interval";
- public static final String KAFKA_BROKER = "kafka_broker";
- public static final String KAFKA_TOPIC = "kafka_topic";
- public static final String PARQUET_FILE = "parquet_file";
- private static final Options THE_OPTIONS = createOptions();
- private static Options createOptions() {
- Options options = new Options();
- options.addOption(new Option(WINDOW_LENGTH, true, "The window length in seconds"));// 窗口大小
- options.addOption(new Option(SLIDE_INTERVAL, true, "The slide interval in seconds"));// 计算间隔
- options.addOption(new Option(KAFKA_BROKER, true, "The kafka broker list")); // Kafka队列
- options.addOption(new Option(KAFKA_TOPIC, true, "The kafka topic"));// TOPIC
- options.addOption(new Option(PARQUET_FILE, true, "The parquet file"));// 写入Parquet文件位置
- return options;
- }
- public static void main(String[] args) throws IOException {
- Flags.setFromCommandLineArgs(THE_OPTIONS, args);
- // 初始化Spark Conf.
- SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark");
- JavaSparkContext sc = new JavaSparkContext(conf);
- JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
- SQLContext sqlContext = new SQLContext(sc);
- // 初始化参数
- HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(",")));
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker());
- // 从Kafka Stream获取数据
- JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
- StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- private static final long serialVersionUID = 5266880065425088203L;
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
- JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> {
- List<ApacheAccessLog> list = new ArrayList<>();
- try {
- // 映射每一行
- list.add(ApacheAccessLog.parseFromLogLine(line));
- return list;
- } catch (RuntimeException e) {
- return list;
- }
- }).cache();
- accessLogsDStream.foreachRDD(rdd -> {
- // rdd to DataFrame
- DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
- // 写入Parquet文件
- df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());
- return null;
- });
- // 启动Streaming服务器
- jssc.start(); // 启动计算
- jssc.awaitTermination(); // 等待终止
- }
- }
ApacheAccessLog.java
[java] view plain copy- package com.sectong.spark_to_parquet;
- import java.io.Serializable;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- /**
- * 解析Apache log
- */
- public class ApacheAccessLog implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 6681372116317508248L;
- private String ipAddress;
- private String clientIdentd;
- private String userID;
- private String dateTimeString;
- private String method;
- private String endpoint;
- private String protocol;
- private int responseCode;
- private long contentSize;
- private ApacheAccessLog(String ipAddress, String clientIdentd, String userID, String dateTime, String method,
- String endpoint, String protocol, String responseCode, String contentSize) {
- this.ipAddress = ipAddress;
- this.clientIdentd = clientIdentd;
- this.userID = userID;
- this.dateTimeString = dateTime;
- this.method = method;
- this.endpoint = endpoint;
- this.protocol = protocol;
- this.responseCode = Integer.parseInt(responseCode);
- if (contentSize.equals("-")) {
- this.contentSize = 0;
- } else {
- this.contentSize = Long.parseLong(contentSize);
- }
- }
- public String getIpAddress() {
- return ipAddress;
- }
- public String getClientIdentd() {
- return clientIdentd;
- }
- public String getUserID() {
- return userID;
- }
- public String getDateTimeString() {
- return dateTimeString;
- }
- public String getMethod() {
- return method;
- }
- public String getEndpoint() {
- return endpoint;
- }
- public String getProtocol() {
- return protocol;
- }
- public int getResponseCode() {
- return responseCode;
- }
- public long getContentSize() {
- return contentSize;
- }
- public void setIpAddress(String ipAddress) {
- this.ipAddress = ipAddress;
- }
- public void setClientIdentd(String clientIdentd) {
- this.clientIdentd = clientIdentd;
- }
- public void setUserID(String userID) {
- this.userID = userID;
- }
- public void setDateTimeString(String dateTimeString) {
- this.dateTimeString = dateTimeString;
- }
- public void setMethod(String method) {
- this.method = method;
- }
- public void setEndpoint(String endpoint) {
- this.endpoint = endpoint;
- }
- public void setProtocol(String protocol) {
- this.protocol = protocol;
- }
- public void setResponseCode(int responseCode) {
- this.responseCode = responseCode;
- }
- public void setContentSize(long contentSize) {
- this.contentSize = contentSize;
- }
- // Example Apache log line:
- // 127.0.0.1 - - [21/Jul/2014:9:55:27 -0800] "GET /home.html HTTP/1.1" 200
- // 2048
- private static final String LOG_ENTRY_PATTERN =
- // 1:IP 2:client 3:user 4:date time 5:method 6:req 7:proto
- // 8:respcode 9:size
- "(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\S+)";
- private static final Pattern PATTERN = Pattern.compile(LOG_ENTRY_PATTERN);
- public static ApacheAccessLog parseFromLogLine(String logline) {
- Matcher m = PATTERN.matcher(logline);
- if (!m.find()) {
- // logger.log(Level.ALL, "Cannot parse logline" + logline);
- throw new RuntimeException("Error parsing logline");
- } else {
- return new ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6),
- m.group(7), m.group(8), m.group(9));
- }
- }
- }
Flags.java
[java] view plain copy- package com.sectong.spark_to_parquet;
- import org.apache.commons.cli.CommandLine;
- import org.apache.commons.cli.CommandLineParser;
- import org.apache.commons.cli.Options;
- import org.apache.commons.cli.ParseException;
- import org.apache.commons.cli.PosixParser;
- import org.apache.spark.streaming.Duration;
- public class Flags {
- private static Flags THE_INSTANCE = new Flags();
- private Duration windowLength;
- private Duration slideInterval;
- private String kafka_broker;
- private String kafka_topic;
- private String parquet_file;
- private boolean initialized = false;
- private Flags() {
- }
- public Duration getWindowLength() {
- return windowLength;
- }
- public Duration getSlideInterval() {
- return slideInterval;
- }
- public String getKafka_broker() {
- return kafka_broker;
- }
- public String getKafka_topic() {
- return kafka_topic;
- }
- public String getParquetFile() {
- return parquet_file;
- }
- public static Flags getInstance() {
- if (!THE_INSTANCE.initialized) {
- throw new RuntimeException("Flags have not been initalized");
- }
- return THE_INSTANCE;
- }
- public static void setFromCommandLineArgs(Options options, String[] args) {
- CommandLineParser parser = new PosixParser();
- try {
- CommandLine cl = parser.parse(options, args);
- // 参数默认值
- THE_INSTANCE.windowLength = new Duration(
- Integer.parseInt(cl.getOptionValue(AppMain.WINDOW_LENGTH, "30")) * 1000);
- THE_INSTANCE.slideInterval = new Duration(
- Integer.parseInt(cl.getOptionValue(AppMain.SLIDE_INTERVAL, "5")) * 1000);
- THE_INSTANCE.kafka_broker = cl.getOptionValue(AppMain.KAFKA_BROKER, "kafka:9092");
- THE_INSTANCE.kafka_topic = cl.getOptionValue(AppMain.KAFKA_TOPIC, "apache");
- THE_INSTANCE.parquet_file = cl.getOptionValue(AppMain.PARQUET_FILE, "/user/spark/");
- THE_INSTANCE.initialized = true;
- } catch (ParseException e) {
- THE_INSTANCE.initialized = false;
- System.err.println("Parsing failed. Reason: " + e.getMessage());
- }
- }
- }
三、代码实战
Java版本:
package com.dt.spark.SparkApps.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class SparkSQLParquetOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLParquetOps");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame usersDF = sqlContext.read().parquet("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\users.parquet");
/**
* 注册成为临时表以供后续的SQL查询操作
*/
usersDF.registerTempTable("users");
/**
* 进行数据的多维度分析
*/
DataFrame result = sqlContext.sql("select * from users");
JavaRDD<String> resultRDD = result.javaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) throws Exception {
return "The name is : " + row.getAs("name");
}
});
/**
* 第六步:对结果进行处理,包括由DataFrame转换成为RDD<Row>,以及结构持久化
*/
List<String> listRow = resultRDD.collect();
for(String row : listRow){
System.out.println(row);
}
}
}
Schema Merging
Java版本:
package com.dt.spark.SparkApps.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class SchemaOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByProgramatically");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// Create a simple DataFrame, stored into a partition directory
JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1,2,3,4,5));
PairFunction<Integer,Integer,Integer> df2 = new PairFunction<Integer,Integer,Integer>() {
@Override
public Tuple2 call(Integer x) throws Exception {
return new Tuple2(x,x * 2);
}
};
JavaPairRDD<Integer,Integer> pairs = lines.mapToPair(df2);
/**
* 第一步:在RDD的基础上创建类型为Row的RDD
*/
JavaRDD<Row> personsRDD = pairs.map(new Function<Tuple2<Integer, Integer>, Row>() {
@Override
public Row call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2);
}
});
/**
* 第二步:动态构造DataFrame的元数据,一般而言,有多少列,以及每列的具体类型可能来自于JSON文件
* 也可能来自于数据库。
* 指定类型
*/
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("single",DataTypes.IntegerType,true));
structFields.add(DataTypes.createStructField("double",DataTypes.IntegerType,true));
/**
* 构建StructType用于最后DataFrame元数据的描述
*/
StructType structType = DataTypes.createStructType(structFields);
/**
* 第三步:基于以后的MetaData以及RDD<Row>来构建DataFrame
*/
DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType);
personsDF.write().parquet("data/test_table/key=1");
// Create a simple DataFrame, stored into a partition directory
JavaRDD<Integer> lines1 = sc.parallelize(Arrays.asList(6,7,8,9,10));
PairFunction<Integer,Integer,Integer> df3 = new PairFunction<Integer,Integer,Integer>() {
@Override
public Tuple2 call(Integer x) throws Exception {
return new Tuple2(x,x * 2);
}
};
JavaPairRDD<Integer,Integer> pairs1 = lines.mapToPair(df2);
/**
* 第一步:在RDD的基础上创建类型为Row的RDD
*/
JavaRDD<Row> personsRDD1 = pairs1.map(new Function<Tuple2<Integer, Integer>, Row>() {
@Override
public Row call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2);
}
});
/**
* 第二步:动态构造DataFrame的元数据,一般而言,有多少列,以及每列的具体类型可能来自于JSON文件
* 也可能来自于数据库。
* 指定类型
*/
List<StructField> structFields1 = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("single",DataTypes.IntegerType,true));
structFields.add(DataTypes.createStructField("triple",DataTypes.IntegerType,true));
/**
* 构建StructType用于最后DataFrame元数据的描述
*/
StructType structType1 = DataTypes.createStructType(structFields);
/**
* 第三步:基于以后的MetaData以及RDD<Row>来构建DataFrame
*/
DataFrame personsDF1 = sqlContext.createDataFrame(personsRDD1,structType1);
personsDF1.write().parquet("data/test_table/key=2");
DataFrame df4 = sqlContext.read().option("mergeSchema","true").parquet("data/test_table");
df4.printSchema();
}
}
输出结果如下:
root
|--single: integer (nullable = true)
|--double: integer (nullable = true)
|--single2: integer (nullable = true)
|--triple: integer (nullable = true)
|--key: integer (nullable = true)
Scala版本:
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
一:Spark SQL下的Parquet使用最佳实践
1,过去整个业界对大数据的分析的技术栈的Pipeline一般分为一下两种方式:
A)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL) -> HDFS Parquet -> SparkSQL/impala -> Result Service(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);
B)Data Source -> Real time update data to HBase/DB -> Export to Parquet -> SparkSQL/impala -> Result Service(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);
上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代。
2,期待的方式:Data Source -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、Graphx等)-> Parquet -> 其他各种Data Mining等。
二:Parquet的精要介绍
摘自官网:
Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.
1,Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:
A.可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
B.压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
C.只读取需要的列,支持向量运算,能够获取更好的扫描性能。
三:Spark SQL下的Parquet意义再思考
1,如果说HDFS是大数据时代文件系统的事实标准的话,Parquet就是大数据时代存储格式的事实标准;
2,速度更快:从使用Spark SQL操作普通文件CSV和Parquet文件的速度对比上来看,绝大多数情况下使用Parquet会比使用CSV等普通文件速度提升10倍左右;(在一些普通文件系统无法再Spark上成功运行程序的情况下,使用Parquet很多时候都可以成功运行);
3,Parquet的压缩技术非常稳定出色,在Spark SQL中对压缩技术的处理可能无法正常的完成工作(例如会导致Lost Task,Lost Exexutor),但是此时如果使用Parquet就可以正常的完成;
4,极大的减少磁盘I/O,通常情况下能够减少75%的存储空间,由此可以极大地减少Spark SQL处理数据的时候的数据输入内容,尤其是在Spark 1.6.x中下推过滤器在一些情况下可以极大的进一步减少磁盘的I/O和内存的占用;
5,Spark 1.6.x+Parquet极大的提升了数据扫描的吞吐量,这极大的提高了数据的查找速度,Spark 1.6和Spark 1.5相比较而言提升了1倍的速度,在Spark 1.6.x中操作Parquet时候CPU的使用也进行了极大的优化,有效的降低了CPU的使用;
6,采用Parquet可以极大的优化Spark的调度和执行,我们测试表面Spark如果采用Parquet可以有效的减少Stage的执行消耗,同时可以优化执行路径;
四:Spark SQL下的Parquet内幕解密
1,列式存储是以什么基本格式来存储数据的?表现上是树状结构,在内部有元数据的Table;
2,在具体的Parquet文件存储的时候有三个组成部分:
A)Storage Format:Parquet定义了具体的数据内部的类型和存储格式;
B)Object Model Converters:Parquet中负责计算框架中数据对象和Parquet文件中具体数据类型的映射;
C)Object Models:在Parquet中具有自己的Object Model定义的存储格式,例如说Avro具有自己的Object Model,但是Parquet在处理相关的格式的数据的时候使用自己的Object Model来存储;
映射完成后Parquet会进行自己的Column Encoding,然后存储成为Parquet格式的文件
3,Modules
The parquet-format project contains format specifications and Thrift definitions of metadata required to properly read Parquet files.
The parquet-mr project contains multiple sub-modules, which implement the core components of reading and writing a nested, column-oriented data stream, map this core onto the parquet format, and provide Hadoop Input/Output Formats, Pig loaders, and other java-based utilities for interacting with Parquet.
The parquet-compatibility project contains compatibility tests that can be used to verify that implementations in different languages can read and write each other’s files.
4,举例说明:
message AddressBook {required string owner;repeated string ownerPhoneNumbers;repeated group contacts {required string name;optional string phoneNumber;}}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
required(出现1次),optional(出现0次或者1次),repeated(出现0次或者多次)
这个schema中每条记录表示一个人的AddressBook。有且只有一个owner,owner可以有0个或者多个ownerPhoneNumbers,owner可以有0个或者多个contacts。每个contact有且只有一个name,这个contact的phoneNumber可有可无。
第一点:就存储数据本身而言,只考虑叶子节点,我们的叶子节点owner、ownerPhoneNumber、name、phoneNumber;
第二点:在逻辑上而言Schema实质上是一个Table:
AddressBook owner ownerphonenumber contacts name phonenumber 第三点:对于一个Parquet文件而言,数据会被分成Row Group(里面包含很多Column,每个Column具有几个非常重要的特性例如Repetition Level、Definition Level);
第四点:Column在Parquet中是以Page的方式存在的,Page中有Repetition Level、Definition Level等内容;
第五点:Row Group在Parquet中是数据读写的缓存单元,所以对Row Group的设置会极大的影响Parquet的使用速度和效率,所以如果是分析日志的话,我们一般建议把Row Group的缓存大小配置成大约256MB,很多人的配置都是大约1G,如果想最大化的运行效率强烈建议HDFS的Block大小和Row Group一致;
第六点:在实际存储的把一个树状结构,通过巧妙的编码算法,转换成二维表结构
Repetition Level Definition Level Value 1 2 132990600 0 1 “spark” 0 0 NULL Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目,最新的版本是1.8.0。
列式存储
列式存储和行式存储相比有哪些优势呢?
- 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
- 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
- 只读取需要的列,支持向量运算,能够获取更好的扫描性能。
当时Twitter的日增数据量达到压缩之后的100TB+,存储在HDFS上,工程师会使用多种计算框架(例如MapReduce, Hive, Pig等)对这些数据做分析和挖掘;日志结构是复杂的嵌套数据类型,例如一个典型的日志的schema有87列,嵌套了7层。所以需要设计一种列式存储格式,既能支持关系型数据(简单数据类型),又能支持复杂的嵌套类型的数据,同时能够适配多种数据处理框架。
关系型数据的列式存储,可以将每一列的值直接排列下来,不用引入其他的概念,也不会丢失数据。关系型数据的列式存储比较好理解,而嵌套类型数据的列存储则会遇到一些麻烦。如图1所示,我们把嵌套数据类型的一行叫做一个记录(record),嵌套数据类型的特点是一个record中的column除了可以是Int, Long, String这样的原语(primitive)类型以外,还可以是List, Map, Set这样的复杂类型。在行式存储中一行的多列是连续的写在一起的,在列式存储中数据按列分开存储,例如可以只读取A.B.C这一列的数据而不去读A.E和A.B.D,那么如何根据读取出来的各个列的数据重构出一行记录呢?
图1 行式存储和列式存储
Google的Dremel系统解决了这个问题,核心思想是使用“record shredding and assembly algorithm”来表示复杂的嵌套数据类型,同时辅以按列的高效压缩和编码技术,实现降低存储空间,提高IO效率,降低上层应用延迟。Parquet就是基于Dremel的数据模型和算法实现的。
Parquet适配多种计算框架
Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:
查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
数据模型: Avro, Thrift, Protocol Buffers, POJOs
那么Parquet是如何与这些组件协作的呢?这个可以通过图2来说明。数据从内存到Parquet文件或者反过来的过程主要由以下三个部分组成:
1, 存储格式(storage format)
parquet-format项目定义了Parquet内部的数据类型、存储格式等。
2, 对象模型转换器(object model converters)
这部分功能由parquet-mr项目来实现,主要完成外部对象模型与Parquet内部数据类型的映射。
例如parquet-mr项目里的parquet-pig项目就是负责把内存中的Pig Tuple序列化并按列存储成Parquet格式,以及反过来把Parquet文件的数据反序列化成Pig Tuple。
Parquet数据模型
理解Parquet首先要理解这个列存储格式的数据模型。我们以一个下面这样的schema和数据为例来说明这个问题。
message AddressBook {required string owner;repeated string ownerPhoneNumbers;repeated group contacts {required string name;optional string phoneNumber;} }
这个schema中每条记录表示一个人的AddressBook。有且只有一个owner,owner可以有0个或者多个ownerPhoneNumbers,owner可以有0个或者多个contacts。每个contact有且只有一个name,这个contact的phoneNumber可有可无。这个schema可以用图3的树结构来表示。
每个schema的结构是这样的:根叫做message,message包含多个fields。每个field包含三个属性:repetition, type, name。repetition可以是以下三种:required(出现1次),optional(出现0次或者1次),repeated(出现0次或者多次)。type可以是一个group或者一个primitive类型。
Parquet格式的数据类型没有复杂的Map, List, Set等,而是使用repeated fields 和 groups来表示。例如List和Set可以被表示成一个repeated field,Map可以表示成一个包含有key-value 对的repeated field,而且key是required的。
图3 AddressBook的树结构表示
Parquet文件的存储格式
那么如何把内存中每个AddressBook对象按照列式存储格式存储下来呢?
在Parquet格式的存储中,一个schema的树结构有几个叶子节点,实际的存储中就会有多少column。例如上面这个schema的数据存储实际上有四个column,如图4所示。
图4 AddressBook实际存储的列
Parquet文件在磁盘上的分布情况如图5所示。所有的数据被水平切分成Row group,一个Row group包含这个Row group对应的区间内的所有列的column chunk。一个column chunk负责存储某一列的数据,这些数据是这一列的Repetition levels, Definition levels和values(详见后文)。一个column chunk是由Page组成的,Page是压缩和编码的单元,对数据模型来说是透明的。一个Parquet文件最后是Footer,存储了文件的元数据信息和统计信息。Row group是数据读写时候的缓存单元,所以推荐设置较大的Row group从而带来较大的并行度,当然也需要较大的内存空间作为代价。一般情况下推荐配置一个Row group大小1G,一个HDFS块大小1G,一个HDFS文件只含有一个块。
图5 Parquet文件格式在磁盘的分布
拿我们的这个schema为例,在任何一个Row group内,会顺序存储四个column chunk。这四个column都是string类型。这个时候Parquet就需要把内存中的AddressBook对象映射到四个string类型的column中。如果读取磁盘上的4个column要能够恢复出AddressBook对象。这就用到了我们前面提到的 “record shredding and assembly algorithm”。
Striping/Assembly算法
Definition Level
Repetition Level
下面用AddressBook的例子来说明Striping和assembly的过程。
对于每个column的最大的Repetion Level和 Definition Level如图6所示。
图6 AddressBook的Max Definition Level和Max Repetition Level
AddressBook {owner: "Julien Le Dem",ownerPhoneNumbers: "555 123 4567",ownerPhoneNumbers: "555 666 1337",contacts: {name: "Dmitriy Ryaboy",phoneNumber: "555 987 6543",},contacts: {name: "Chris Aniszczyk"} } AddressBook {owner: "A. Nonymous" }
下面我们拿掉其他三个column只看contacts.phoneNumber这个column,把上面的两条record简化成下面的样子:
AddressBook {contacts: {phoneNumber: "555 987 6543"}contacts: {} } AddressBook { }
如果我们要把这个column写到磁盘上,磁盘上会写入这样的数据(图8):
注意:NULL实际上不会被存储,如果一个column value的Definition Level小于该column最大Definition Level的话,那么就表示这是一个空值。
下面是从磁盘上读取数据并反序列化成AddressBook对象的过程:
1,读取第一个三元组R=0, D=2, Value=”555 987 6543”
R=0 表示是一个新的record,要根据schema创建一个新的nested record直到Definition Level=2。
R=1 表示不是一个新的record,是上一个record中一个新的contacts。
D=1 表示contacts定义了,但是contacts的下一个级别也就是phoneNumber没有被定义,所以创建一个空的contacts。
R=0 表示一个新的record,根据schema创建一个新的nested record直到Definition Level=0,也就是创建一个AddressBook根节点。
上面演示了一个column的写入和重构,那么在不同column之间是怎么跳转的呢,这里用到了有限状态机的知识,详细介绍可以参考Dremel。
数据压缩算法
列式存储给数据压缩也提供了更大的发挥空间,除了我们常见的snappy, gzip等压缩方法以外,由于列式存储同一列的数据类型是一致的,所以可以使用更多的压缩算法。
压缩算法
使用场景
Run Length Encoding
重复数据
Delta Encoding
有序数据集,例如timestamp,自动生成的ID,以及监控的各种metrics
Dictionary Encoding
小规模的数据集合,例如IP地址
Prefix Encoding
Delta Encoding for strings
性能
Parquet列式存储带来的性能上的提高在业内已经得到了充分的认可,特别是当你们的表非常宽(column非常多)的时候,Parquet无论在资源利用率还是性能上都优势明显。具体的性能指标详见参考文档。
Spark已经将Parquet设为默认的文件存储格式,Cloudera投入了很多工程师到Impala+Parquet相关开发中,Hive/Pig都原生支持Parquet。Parquet现在为Twitter至少节省了1/3的存储空间,同时节省了大量的表扫描和反序列化的时间。这两方面直接反应就是节约成本和提高性能。
如果说HDFS是大数据时代文件系统的事实标准的话,Parquet就是大数据时代存储格式的事实标准。
参考文档
- http://parquet.apache.org/
- https://blog.twitter.com/2013/dremel-made-simple-with-parquet
- http://blog.cloudera.com/blog/2015/04/using-apache-parquet-at-appnexus/
- http://blog.cloudera.com/blog/2014/05/using-impala-at-scale-at-allstate/
Spark Parquet使用相关推荐
- Spark 实战 - 3.一文搞懂 parquet
一.引用 parquet 文件常见于 Spark.Hive.Streamin.MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文 ...
- Spark SQL 外部数据源
一.简介 1.1 多数据源支持 Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景. CSV JSON Parquet ORC JD ...
- zeppelin spark mysql_Zeppelin0.5.6使用spark解释器
Zeppelin为0.5.6 Zeppelin默认自带本地spark,可以不依赖任何集群,下载bin包,解压安装就可以使用. 使用其他的spark集群在yarn模式下. 配置: vi zeppelin ...
- 【详谈 Delta Lake 】系列技术专题 之 特性(Features)
简介: 本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 的系列技术文章.众所周知,Databricks 主导着开源大数据社区 Apache Spark.Delta L ...
- 开源关系型数据库架构
前言 我们把主要的开源关系型数据库分为三类,来分别了解一下它们的架构和设计,并了解一下它们各自的优缺点. OLTP,在线事务处理,是传统的关系型数据库的主要应用场景 OLAP,在线分析处理,是当今大数 ...
- sparksql java_JAVASparkSQL
1.SparkSQL基础 import java.util.ArrayList; import java.util.List; import java.util.Arrays; import java ...
- Apache iceberg:Netflix 数据仓库的基石
Apache Iceberg 是一种用于跟踪超大规模表的新格式,是专门为对象存储(如S3)而设计的. 本文将介绍为什么 Netflix 需要构建 Iceberg,Apache Iceberg 的高层次 ...
- 2022 世界人工智能大会|人工智能与开源技术先锋论坛成功举办
2022 世界人工智能大会于9月1-3日在上海成功召开.由世界人工智能大会组委会办公室指导,上海市人工智能行业协会支持,Kyligence 承办的人工智能与开源技术先锋论坛于9月3日成功举办. Kyl ...
- Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
最新文章
- 基于 abp vNext 和 .NET Core 开发博客项目 - Blazor 实战系列(三)
- 别再搜集面经啦!小夕教你斩下NLP算法岗offer!
- 论文浅尝 | 中科院百度微软等学者最新综述论文40+最新方法阐述知识图谱提升推荐系统准确性与可解释性...
- Flutter 开发应用第一个页面
- 海尔计算机类,海尔计算机类笔题
- STM32 SPI通信(读写flash)
- resnet10 从tk1 上移植到linux 上 上遇到的问题
- Vue 项目创建并发布
- 毕业七年 摸爬滚打的心得体会
- [ExtJS5学习笔记]第22 Extjs5正在使用beforeLabelTpl添加所需的配置选项标注星号标记...
- 网络状态检测的利器 - ss命令
- 新海诚没有参与制作的作品_还未开始!新海诚的下一部作品还是白纸
- 无设备云控系统(ipad协议)
- 动物网页html5设置思想,网站设计思想
- 变换Transformation(1)(笔记)
- Window: win10 如何更改管理员名称(修改登录用户名)
- 云计算学习笔记——计算机网络基础及计算机网络参考模型
- IT:成为服务经纪人的未来
- 手机损坏的数据导出方法
- 《RabbitMQ实战指南》读书笔记
热门文章
- css鼠标滑过按钮出现flash闪光效果
- IDEA创建maven项目没有src/main/java目录问题
- Pyinstaller将yolov5的detect.py封装成detect.exe,并用C++调用
- 聊一聊 C++ 中的 namespace
- 参会人员如何快速入场?高科技来帮忙
- 【人脸识别】Haar分类器五官定位【含GUI Matlab源码 686期】
- Git 入门教程,10000 字详解
- python repair修复功能在哪_linux下repair filesystem模式修复方法
- RT-Thread编程手册
- JavaScript shells