Spark SQL下的Parquet使用最佳实践和代码实战

 分类:
spark-sql(1) 
  1. 一、Spark SQL下的Parquet使用最佳实践

    1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式:

    a)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);

    b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);

    上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代

    2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、GraphX等)-> Parquet -> 其它各种Data Mining等。

  2. 二、Parquet的精要介绍

    Parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:

    a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

    b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。

    c)只读取需要的列,支持向量运算,能够获取更好的扫描性能。

  3. 设计蓝图

    以上分解似乎完美,一起来看看“设计框架”或“蓝图”。

    算了,不解释了,图,自己看。

    Coding Style

    从Kafka Stream获取数据

    // 从Kafka Stream获取数据JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
    

    写入Parquet

    accessLogsDStream.foreachRDD(rdd -> {// 如果DF不为空,写入(增加模式)到Parquet文件DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);if (df.count() > 0) {df.write().mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());}return null;});
    

    创建Hive表

    使用spark-shell,获取Parquet文件, 写入一个临时表;

    scala代码如下:

    import sqlContext.implicits._val parquetFile = sqlContext.read.parquet("/user/spark/apachelog.parquet")parquetFile.registerTempTable("logs")
    

    复制schema到新表链接到Parquet文件。

    在Hive中复制表,这里你会发现,文件LOCATION位置还是原来的路径,目的就是这个,使得新写入的文件还在Hive模型中。

    我总觉得这个方法有问题,是不是哪位Hive高人指点一下,有没有更好的办法来完成这个工作?

    CREATE EXTERNAL TABLE apachelog LIKE logs STORED AS PARQUET LOCATION '/user/spark/apachelog.parquet';
    

    启动你的SparkThriftServer

    当然,在集群中启用ThriftServer是必须的工作,SparkThriftServer其实暴露的是Hive2服务器,用JDBC驱动就可以访问了。

    我们都想要的结果

    本博客中使用的SQL查询工具是SQuirreL SQL,具体JDBC配置方法请参照前面说的向左向右转。

    结果看似简单,但是经历还是很有挑战的。

    至此,本例已完成。完成代码见 GitHub

    转自:https://blog.sectong.com/blog/spark_to_parquet.html

    APPMain.java

    [java] view plain copy
    1. package com.sectong.spark_to_parquet;
    2. import java.io.IOException;
    3. import java.util.ArrayList;
    4. import java.util.Arrays;
    5. import java.util.HashMap;
    6. import java.util.HashSet;
    7. import java.util.List;
    8. import org.apache.commons.cli.Option;
    9. import org.apache.commons.cli.Options;
    10. import org.apache.spark.SparkConf;
    11. import org.apache.spark.api.java.JavaSparkContext;
    12. import org.apache.spark.api.java.function.Function;
    13. import org.apache.spark.sql.DataFrame;
    14. import org.apache.spark.sql.SQLContext;
    15. import org.apache.spark.sql.SaveMode;
    16. import org.apache.spark.streaming.api.java.JavaDStream;
    17. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    18. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    19. import org.apache.spark.streaming.kafka.KafkaUtils;
    20. import kafka.serializer.StringDecoder;
    21. import scala.Tuple2;
    22. import scala.collection.Seq;
    23. /**
    24. * 运行程序,spark-submit --class "com.sectong.spark_to_parquet.AppMain" --master
    25. * yarn target/park_to_parquet-0.0.1-SNAPSHOT.jar --kafka_broker
    26. * hadoop1:6667,hadoop2:6667 --kafka_topic apache --parquet_file /user/spark/
    27. * --slide_interval 30
    28. */
    29. public class AppMain {
    30. public static final String WINDOW_LENGTH = "window_length";
    31. public static final String SLIDE_INTERVAL = "slide_interval";
    32. public static final String KAFKA_BROKER = "kafka_broker";
    33. public static final String KAFKA_TOPIC = "kafka_topic";
    34. public static final String PARQUET_FILE = "parquet_file";
    35. private static final Options THE_OPTIONS = createOptions();
    36. private static Options createOptions() {
    37. Options options = new Options();
    38. options.addOption(new Option(WINDOW_LENGTH, true, "The window length in seconds"));// 窗口大小
    39. options.addOption(new Option(SLIDE_INTERVAL, true, "The slide interval in seconds"));// 计算间隔
    40. options.addOption(new Option(KAFKA_BROKER, true, "The kafka broker list")); // Kafka队列
    41. options.addOption(new Option(KAFKA_TOPIC, true, "The kafka topic"));// TOPIC
    42. options.addOption(new Option(PARQUET_FILE, true, "The parquet file"));// 写入Parquet文件位置
    43. return options;
    44. }
    45. public static void main(String[] args) throws IOException {
    46. Flags.setFromCommandLineArgs(THE_OPTIONS, args);
    47. // 初始化Spark Conf.
    48. SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark");
    49. JavaSparkContext sc = new JavaSparkContext(conf);
    50. JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
    51. SQLContext sqlContext = new SQLContext(sc);
    52. // 初始化参数
    53. HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(",")));
    54. HashMap<String, String> kafkaParams = new HashMap<String, String>();
    55. kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker());
    56. // 从Kafka Stream获取数据
    57. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
    58. StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
    59. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    60. private static final long serialVersionUID = 5266880065425088203L;
    61. public String call(Tuple2<String, String> tuple2) {
    62. return tuple2._2();
    63. }
    64. });
    65. JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> {
    66. List<ApacheAccessLog> list = new ArrayList<>();
    67. try {
    68. // 映射每一行
    69. list.add(ApacheAccessLog.parseFromLogLine(line));
    70. return list;
    71. } catch (RuntimeException e) {
    72. return list;
    73. }
    74. }).cache();
    75. accessLogsDStream.foreachRDD(rdd -> {
    76. // rdd to DataFrame
    77. DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
    78. // 写入Parquet文件
    79. df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());
    80. return null;
    81. });
    82. // 启动Streaming服务器
    83. jssc.start(); // 启动计算
    84. jssc.awaitTermination(); // 等待终止
    85. }
    86. }

    ApacheAccessLog.java

    [java] view plain copy
    1. package com.sectong.spark_to_parquet;
    2. import java.io.Serializable;
    3. import java.util.regex.Matcher;
    4. import java.util.regex.Pattern;
    5. /**
    6. * 解析Apache log
    7. */
    8. public class ApacheAccessLog implements Serializable {
    9. /**
    10. *
    11. */
    12. private static final long serialVersionUID = 6681372116317508248L;
    13. private String ipAddress;
    14. private String clientIdentd;
    15. private String userID;
    16. private String dateTimeString;
    17. private String method;
    18. private String endpoint;
    19. private String protocol;
    20. private int responseCode;
    21. private long contentSize;
    22. private ApacheAccessLog(String ipAddress, String clientIdentd, String userID, String dateTime, String method,
    23. String endpoint, String protocol, String responseCode, String contentSize) {
    24. this.ipAddress = ipAddress;
    25. this.clientIdentd = clientIdentd;
    26. this.userID = userID;
    27. this.dateTimeString = dateTime;
    28. this.method = method;
    29. this.endpoint = endpoint;
    30. this.protocol = protocol;
    31. this.responseCode = Integer.parseInt(responseCode);
    32. if (contentSize.equals("-")) {
    33. this.contentSize = 0;
    34. } else {
    35. this.contentSize = Long.parseLong(contentSize);
    36. }
    37. }
    38. public String getIpAddress() {
    39. return ipAddress;
    40. }
    41. public String getClientIdentd() {
    42. return clientIdentd;
    43. }
    44. public String getUserID() {
    45. return userID;
    46. }
    47. public String getDateTimeString() {
    48. return dateTimeString;
    49. }
    50. public String getMethod() {
    51. return method;
    52. }
    53. public String getEndpoint() {
    54. return endpoint;
    55. }
    56. public String getProtocol() {
    57. return protocol;
    58. }
    59. public int getResponseCode() {
    60. return responseCode;
    61. }
    62. public long getContentSize() {
    63. return contentSize;
    64. }
    65. public void setIpAddress(String ipAddress) {
    66. this.ipAddress = ipAddress;
    67. }
    68. public void setClientIdentd(String clientIdentd) {
    69. this.clientIdentd = clientIdentd;
    70. }
    71. public void setUserID(String userID) {
    72. this.userID = userID;
    73. }
    74. public void setDateTimeString(String dateTimeString) {
    75. this.dateTimeString = dateTimeString;
    76. }
    77. public void setMethod(String method) {
    78. this.method = method;
    79. }
    80. public void setEndpoint(String endpoint) {
    81. this.endpoint = endpoint;
    82. }
    83. public void setProtocol(String protocol) {
    84. this.protocol = protocol;
    85. }
    86. public void setResponseCode(int responseCode) {
    87. this.responseCode = responseCode;
    88. }
    89. public void setContentSize(long contentSize) {
    90. this.contentSize = contentSize;
    91. }
    92. // Example Apache log line:
    93. // 127.0.0.1 - - [21/Jul/2014:9:55:27 -0800] "GET /home.html HTTP/1.1" 200
    94. // 2048
    95. private static final String LOG_ENTRY_PATTERN =
    96. // 1:IP 2:client 3:user 4:date time 5:method 6:req 7:proto
    97. // 8:respcode 9:size
    98. "(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\S+)";
    99. private static final Pattern PATTERN = Pattern.compile(LOG_ENTRY_PATTERN);
    100. public static ApacheAccessLog parseFromLogLine(String logline) {
    101. Matcher m = PATTERN.matcher(logline);
    102. if (!m.find()) {
    103. // logger.log(Level.ALL, "Cannot parse logline" + logline);
    104. throw new RuntimeException("Error parsing logline");
    105. } else {
    106. return new ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6),
    107. m.group(7), m.group(8), m.group(9));
    108. }
    109. }
    110. }

    Flags.java

    [java] view plain copy
    1. package com.sectong.spark_to_parquet;
    2. import org.apache.commons.cli.CommandLine;
    3. import org.apache.commons.cli.CommandLineParser;
    4. import org.apache.commons.cli.Options;
    5. import org.apache.commons.cli.ParseException;
    6. import org.apache.commons.cli.PosixParser;
    7. import org.apache.spark.streaming.Duration;
    8. public class Flags {
    9. private static Flags THE_INSTANCE = new Flags();
    10. private Duration windowLength;
    11. private Duration slideInterval;
    12. private String kafka_broker;
    13. private String kafka_topic;
    14. private String parquet_file;
    15. private boolean initialized = false;
    16. private Flags() {
    17. }
    18. public Duration getWindowLength() {
    19. return windowLength;
    20. }
    21. public Duration getSlideInterval() {
    22. return slideInterval;
    23. }
    24. public String getKafka_broker() {
    25. return kafka_broker;
    26. }
    27. public String getKafka_topic() {
    28. return kafka_topic;
    29. }
    30. public String getParquetFile() {
    31. return parquet_file;
    32. }
    33. public static Flags getInstance() {
    34. if (!THE_INSTANCE.initialized) {
    35. throw new RuntimeException("Flags have not been initalized");
    36. }
    37. return THE_INSTANCE;
    38. }
    39. public static void setFromCommandLineArgs(Options options, String[] args) {
    40. CommandLineParser parser = new PosixParser();
    41. try {
    42. CommandLine cl = parser.parse(options, args);
    43. // 参数默认值
    44. THE_INSTANCE.windowLength = new Duration(
    45. Integer.parseInt(cl.getOptionValue(AppMain.WINDOW_LENGTH, "30")) * 1000);
    46. THE_INSTANCE.slideInterval = new Duration(
    47. Integer.parseInt(cl.getOptionValue(AppMain.SLIDE_INTERVAL, "5")) * 1000);
    48. THE_INSTANCE.kafka_broker = cl.getOptionValue(AppMain.KAFKA_BROKER, "kafka:9092");
    49. THE_INSTANCE.kafka_topic = cl.getOptionValue(AppMain.KAFKA_TOPIC, "apache");
    50. THE_INSTANCE.parquet_file = cl.getOptionValue(AppMain.PARQUET_FILE, "/user/spark/");
    51. THE_INSTANCE.initialized = true;
    52. } catch (ParseException e) {
    53. THE_INSTANCE.initialized = false;
    54. System.err.println("Parsing failed.  Reason: " + e.getMessage());
    55. }
    56. }
    57. }

Spark Parquet使用相关推荐

  1. Spark 实战 - 3.一文搞懂 parquet

    一.引用 parquet 文件常见于 Spark.Hive.Streamin.MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文 ...

  2. Spark SQL 外部数据源

    一.简介 1.1 多数据源支持 Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景. CSV JSON Parquet ORC JD ...

  3. zeppelin spark mysql_Zeppelin0.5.6使用spark解释器

    Zeppelin为0.5.6 Zeppelin默认自带本地spark,可以不依赖任何集群,下载bin包,解压安装就可以使用. 使用其他的spark集群在yarn模式下. 配置: vi zeppelin ...

  4. 【详谈 Delta Lake 】系列技术专题 之 特性(Features)

    简介: 本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 的系列技术文章.众所周知,Databricks 主导着开源大数据社区 Apache Spark.Delta L ...

  5. 开源关系型数据库架构

    前言 我们把主要的开源关系型数据库分为三类,来分别了解一下它们的架构和设计,并了解一下它们各自的优缺点. OLTP,在线事务处理,是传统的关系型数据库的主要应用场景 OLAP,在线分析处理,是当今大数 ...

  6. sparksql java_JAVASparkSQL

    1.SparkSQL基础 import java.util.ArrayList; import java.util.List; import java.util.Arrays; import java ...

  7. Apache iceberg:Netflix 数据仓库的基石

    Apache Iceberg 是一种用于跟踪超大规模表的新格式,是专门为对象存储(如S3)而设计的. 本文将介绍为什么 Netflix 需要构建 Iceberg,Apache Iceberg 的高层次 ...

  8. 2022 世界人工智能大会|人工智能与开源技术先锋论坛成功举办

    2022 世界人工智能大会于9月1-3日在上海成功召开.由世界人工智能大会组委会办公室指导,上海市人工智能行业协会支持,Kyligence 承办的人工智能与开源技术先锋论坛于9月3日成功举办. Kyl ...

  9. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

最新文章

  1. 基于 abp vNext 和 .NET Core 开发博客项目 - Blazor 实战系列(三)
  2. 别再搜集面经啦!小夕教你斩下NLP算法岗offer!
  3. 论文浅尝 | 中科院百度微软等学者最新综述论文40+最新方法阐述知识图谱提升推荐系统准确性与可解释性...
  4. Flutter 开发应用第一个页面
  5. 海尔计算机类,海尔计算机类笔题
  6. STM32 SPI通信(读写flash)
  7. resnet10 从tk1 上移植到linux 上 上遇到的问题
  8. Vue 项目创建并发布
  9. 毕业七年 摸爬滚打的心得体会
  10. [ExtJS5学习笔记]第22 Extjs5正在使用beforeLabelTpl添加所需的配置选项标注星号标记...
  11. 网络状态检测的利器 - ss命令
  12. 新海诚没有参与制作的作品_还未开始!新海诚的下一部作品还是白纸
  13. 无设备云控系统(ipad协议)
  14. 动物网页html5设置思想,网站设计思想
  15. 变换Transformation(1)(笔记)
  16. Window: win10 如何更改管理员名称(修改登录用户名)
  17. 云计算学习笔记——计算机网络基础及计算机网络参考模型
  18. IT:成为服务经纪人的未来
  19. 手机损坏的数据导出方法
  20. 《RabbitMQ实战指南》读书笔记

热门文章

  1. css鼠标滑过按钮出现flash闪光效果
  2. IDEA创建maven项目没有src/main/java目录问题
  3. Pyinstaller将yolov5的detect.py封装成detect.exe,并用C++调用
  4. 聊一聊 C++ 中的 namespace
  5. 参会人员如何快速入场?高科技来帮忙
  6. 【人脸识别】Haar分类器五官定位【含GUI Matlab源码 686期】
  7. Git 入门教程,10000 字详解
  8. python repair修复功能在哪_linux下repair filesystem模式修复方法
  9. RT-Thread编程手册
  10. JavaScript shells