Kudu入门和API基础操作

文章目录

  • Kudu入门和API基础操作
    • 为什么使用Kudu作为存储介质
    • 1. Kudu介绍
      • 1.1 背景介绍
      • 1.2 新的硬件设备
      • 1.3 Kudu是什么
      • 1.4 Kudu的应用场景
      • 1.5 Kudu架构
    • 2. Java代码操作Kudu
      • 2.1 构建maven工程
      • 2.2 导入依赖
      • 2.3 创建包结构
      • 2.4 初始化方法
      • 2.5 创建表
      • 2.6 插入数据
      • 2.7 查询数据
      • 2.8 修改数据
      • 2.9 删除数据
      • 2.10 kudu的分区方式(结合Impala讲解)
        • 2.10.1 Hash Partitioning (哈希分区)
        • 2.10.2 Range Partitioning (范围分区)
        • 2.10.3 Multilevel Partitioning (多级分区)
      • 2.11 修改表
    • 3. Spark操作Kudu
      • 3.1 创建表
      • 3.2 DML操作
        • 3.2.1 插入数据insert操作
        • 3.2.2 删除数据delete操作
        • 3.2.3 更新数据upsert操作
      • 3.3 dataFrame操作kudu
        • 3.3.1 DataFrameApi读取kudu表中的数据
        • 3.3.2 DataFrameApi写数据到kudu表中
        • 3.3.3 使用sparksql操作kudu表
      • 3.4 Kudu Native RDD
      • 3.5 修改表

为什么使用Kudu作为存储介质

  • 数据库数据上的快速分析

目前很多业务使用事务型数据库(MySQL、Oracle)做数据分析,把数据写入数据库,然后使用 SQL 进行有效信息提取,当数据规模很小的时候,这种方式确实是立竿见影的,但是当数据量级起来以后,会发现数据库吃不消了或者成本开销太大了,此时就需要把数据从事务型数据库里拷贝出来或者说剥离出来,装入一个分析型的数据库里。发现对于实时性和变更性的需求,目前只有 Kudu 一种组件能够满足需求,所以就产生了这样的一种场景:

MySQL 数据库增、删、改的数据通过 Binlog 实时的被同步到 Kudu 里,同时在 Impala(或者其他计算引擎如 Spark、Hive、Presto、MapReduce)上可以实时的看到。 这种场景也是目前业界使用最广泛的,认可度最高。

  • 用户行为日志的快速分析

对于用户行为日志的实时性敏感的业务,比如电商流量、AB 测试、优惠券的点击反馈、广告投放效果以及秒级导入秒级查询等需求,按 Kudu 出现以前的架构基本上都是这张图的模式:

不仅链路长而且实时性得不到有力保障,有些甚至是 T + 1 的,极大的削弱了业务的丰富度。
引入 Kudu 以后,大家看,数据的导入和查询都是在线实时的:

这种场景目前也是网易考拉和hub在使用的,其中hub甚至把 Kudu 当 HBase 来作点查使用。

小结:

对比以往的大数据存储框架:Hbase、hdfs、redis

Hbase:优点:随机读写,可以对数据进行增删改操作,但是缺点:批量数据分析

Hdfs:优点:可以对批量数据进行分析,缺点:随机读写性能比较低

Redis:优点:数据可以存储到内存和磁盘,适合用于存储相对比较少的数据量,缺点:对内存依赖比较高,支持数据的增删改操作

Kudu是介于Hbase和Hdfs之间的一个框架,同时具备随机读写和批量数据分析,但是随机读写性能不如Hbase,同时批量数据分析性能不如hdfs

1. Kudu介绍

1.1 背景介绍

在Kudu之前,大数据主要以两种方式存储;

  • 静态数据:

    • 以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。

    • 这类存储的局限性是数据无法进行随机的读写。

  • 动态数据:

    • 以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。

    • 这类存储的局限性是批量读取吞吐量远不如HDFS,不适用于批量数据分析的场景。

从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。

如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP需求,我们定时(通常是 T+1 或者 T+H)将 HBase数据写成静态的文件(如:Parquet)导入到 OLAP引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP分析的场景,但它有如下缺点:

  • 架构复杂。从架构上看,数据在HBase、消息队列、HDFS间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。

  • 时效性低。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。

  • **难以应对后续的更新。**真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。

为了解决上述架构的这些问题,Kudu应运而生。Kudu的定位是Fast Analytics on FastData,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎

从上图可以看出,KUDU 是一个折中的产品,在 HDFS 和 HBase这两个偏科生中平衡了随机读写和批量分析的性能。从 KUDU的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁

1.2 新的硬件设备

内存(RAM)的技术发展非常快,它变得越来越便宜,容量也越来越大。Cloudera的客户数据显示,他们的客户所部署的服务器,2012年每个节点仅有32GBRAM,现如今增长到每个节点有128GB或256GBRAM。存储设备上更新也非常快,在很多普通服务器中部署SSD也是屡见不鲜。HBase、HDFS、以及其他的Hadoop工具都在不断自我完善,从而适应硬件上的升级换代。然而,从根本上,HDFS基于03年GFS,HBase基于05年BigTable,在当时系统瓶颈主要取决于底层磁盘速度。当磁盘速度较慢时,CPU利用率不足的根本原因是磁盘速度导致的瓶颈,当磁盘速度提高了之后,CPU利用率提高,这时候CPU往往成为系统的瓶颈。HBase、HDFS由于年代久远,已经很难从基本架构上进行修改,而Kudu是基于全新的设计,因此可以更充分地利用RAM、I/O资源,并优化CPU利用率。

我们可以理解为:Kudu相比与以往的系统,CPU使用降低了,I/O的使用提高了,RAM的利用更充分了

1.3 Kudu是什么

Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。

Kudu支持水平扩展,并且与Cloudera Impala和ApacheSpark等当前流行的大数据查询和分析工具结合紧密。

1.4 Kudu的应用场景

Kudu的很多特性跟HBase很像,它支持索引键的查询和修改。Cloudera曾经想过基于Hbase进行修改,然而结论是对HBase的改动非常大,Kudu的数据模型和磁盘存储都与Hbase不同。HBase本身成功的适用于大量的其它场景,因此修改HBase很可能吃力不讨好。最后Cloudera决定开发一个全新的存储系统。

  • Strong performance for both scan and random access to help customerssimplify complex hybrid architectures(适用于那些既有随机访问,也有批量数据扫描的复合场景

  • High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高计算量的场景

  • High IO efficiency in order to leverage modern persistent storage(使用了高性能的存储设备,包括使用更多的内存

  • The ability to upDATE data in place, to avoid extraneous processing and data movement(支持数据更新,避免数据反复迁移

  • The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的实时数据备份和查询

1.5 Kudu架构

下图显示了一个具有三个 master 和多个 tablet server 的 Kudu 集群,每个服务器都支持多个 tablet。

它说明了如何使用 Raft 共识来允许 master 和 tablet server 的 leader 和follow。

此外,tabletserver 可以成为某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色显示,而 follower 则显示为蓝色。

下面是一些基本概念:

角色 作用
Master 集群中的老大,负责集群管理、元数据管理等功能
Tablet Server 集群中的小弟,负责数据存储,并提供数据读写服务 一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。 只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。
Table(表) 一张table是数据存储在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主键)。table 被分成称为 tablets 的 segments。
Tablet 一个 tablet 是一张 table连续的segment,tablet是kudu表的水平分区,类似于google Bigtable的tablet,或者HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。

2. Java代码操作Kudu

2.1 构建maven工程

2.2 导入依赖

<repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
</repositories><dependencies><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client</artifactId><version>1.9.0-cdh6.2.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client-tools</artifactId><version>1.9.0-cdh6.2.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 --><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-spark2_2.11</artifactId><version>1.9.0-cdh6.2.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.0</version></dependency>
</dependencies>

2.3 创建包结构

包名 说明
cn.qike 代码所在的包目录

2.4 初始化方法

package cn.itcast;import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;public class TestKudu {//定义KuduClient客户端对象private static KuduClient kuduClient;//定义表名private static String tableName = "person";/*** 初始化方法*/@Beforepublic void init() {//指定master地址String masterAddress = "node2.itcast.cn";//创建kudu的数据库连接kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();}//构建表schema的字段信息//字段名称   数据类型     是否为主键public ColumnSchema newColumn(String name, Type type, boolean isKey) {ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);column.key(isKey);return column.build();}
}

2.5 创建表

/**  使用junit进行测试** 创建表* @throws KuduException*/
@Test
public void createTable() throws KuduException {//设置表的schemaList<ColumnSchema> columns = new LinkedList<ColumnSchema>();columns.add(newColumn("CompanyId", Type.INT32, true));columns.add(newColumn("WorkId", Type.INT32, false));columns.add(newColumn("Name", Type.STRING, false));columns.add(newColumn("Gender", Type.STRING, false));columns.add(newColumn("Photo", Type.STRING, false));Schema schema = new Schema(columns);//创建表时提供的所有选项CreateTableOptions tableOptions = new CreateTableOptions();//设置表的副本和分区规则LinkedList<String> list = new LinkedList<String>();list.add("CompanyId");//设置表副本数tableOptions.setNumReplicas(1);//设置range分区//tableOptions.setRangePartitionColumns(list);//设置hash分区和分区的数量tableOptions.addHashPartitions(list, 3);try {kuduClient.createTable("person", schema, tableOptions);} catch (Exception e) {e.printStackTrace();}kuduClient.close();
}

2.6 插入数据

/*** 向表中加载数据* @throws KuduException*/
@Test
public void loadData() throws KuduException {//打开表KuduTable kuduTable = kuduClient.openTable(tableName);//创建KuduSession对象 kudu必须通过KuduSession写入数据KuduSession kuduSession = kuduClient.newSession();//采用flush方式 手动刷新kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);kuduSession.setMutationBufferSpace(3000);//准备数据for(int i=1; i<=10; i++){Insert insert = kuduTable.newInsert();//设置字段的内容insert.getRow().addInt("CompanyId",i);insert.getRow().addInt("WorkId",i);insert.getRow().addString("Name","lisi"+i);insert.getRow().addString("Gender","male");insert.getRow().addString("Photo","person"+i);kuduSession.flush();kuduSession.apply(insert);}kuduSession.close();kuduClient.close();
}

2.7 查询数据

 /*** 查询表数据* @throws KuduException*/
@Test
public void queryData() throws KuduException {//打开表KuduTable kuduTable = kuduClient.openTable(tableName);//获取scanner扫描器KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);KuduScanner scanner = scannerBuilder.build();//遍历while(scanner.hasMoreRows()){RowResultIterator rowResults = scanner.nextRows();while (rowResults.hasNext()){RowResult result = rowResults.next();int companyId = result.getInt("CompanyId");int workId = result.getInt("WorkId");String name = result.getString("Name");String gender = result.getString("Gender");String photo = result.getString("Photo");System.out.print("companyId:"+companyId+" ");System.out.print("workId:"+workId+" ");System.out.print("name:"+name+" ");System.out.print("gender:"+gender+" ");System.out.println("photo:"+photo);}}//关闭scanner.close();kuduClient.close();
}

2.8 修改数据

/*** 修改数据* @throws KuduException*/
@Test
public void upDATEData() throws KuduException {//打开表KuduTable kuduTable = kuduClient.openTable(tableName);//构建kuduSession对象KuduSession kuduSession = kuduClient.newSession();//设置刷新数据模式,自动提交kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);//更新数据需要获取UpDATE对象UpDATE upDATE = kuduTable.newUpDATE();//获取row对象PartialRow row = upDATE.getRow();//设置要更新的数据信息row.addInt("CompanyId",1);row.addString("Name","kobe");//操作这个upDATE对象kuduSession.apply(upDATE);kuduSession.close();
}

2.9 删除数据

/*** 删除表中的数据*/
@Test
public void deleteData() throws KuduException {//打开表KuduTable kuduTable = kuduClient.openTable(tableName);KuduSession kuduSession = kuduClient.newSession();//获取Delete对象Delete delete = kuduTable.newDelete();//构建要删除的行对象PartialRow row = delete.getRow();//设置删除数据的条件row.addInt("CompanyId",2);kuduSession.flush();kuduSession.apply(delete);kuduSession.close();kuduClient.close();
}

2.10 kudu的分区方式(结合Impala讲解)

为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tabletservers 上。行总是属于单个tablet 。将行分配给 tablet的方法由在表创建期间设置的表的分区决定。

kudu提供了3种分区方式。

2.10.1 Hash Partitioning (哈希分区)

哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一;哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。

/*** 测试分区:* hash分区*/
@Test
public void testHashPartition() throws KuduException {//设置表的schemaLinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();columnSchemas.add(newColumn("CompanyId", Type.INT32,true));columnSchemas.add(newColumn("WorkId", Type.INT32,false));columnSchemas.add(newColumn("Name", Type.STRING,false));columnSchemas.add(newColumn("Gender", Type.STRING,false));columnSchemas.add(newColumn("Photo", Type.STRING,false));//创建schemaSchema schema = new Schema(columnSchemas);//创建表时提供的所有选项CreateTableOptions tableOptions = new CreateTableOptions();//设置副本数tableOptions.setNumReplicas(1);//设置范围分区的规则LinkedList<String> parcols = new LinkedList<String>();parcols.add("CompanyId");//设置按照那个字段进行range分区tableOptions.addHashPartitions(parcols,6);try {kuduClient.createTable("dog",schema,tableOptions);} catch (KuduException e) {e.printStackTrace();}kuduClient.close();
}
2.10.2 Range Partitioning (范围分区)

范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.

/*** 测试分区:* RangePartition*/
@Test
public void testRangePartition() throws KuduException {//设置表的schemaLinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();columnSchemas.add(newColumn("CompanyId", Type.INT32,true));columnSchemas.add(newColumn("WorkId", Type.INT32,false));columnSchemas.add(newColumn("Name", Type.STRING,false));columnSchemas.add(newColumn("Gender", Type.STRING,false));columnSchemas.add(newColumn("Photo", Type.STRING,false));//创建schemaSchema schema = new Schema(columnSchemas);//创建表时提供的所有选项CreateTableOptions tableOptions = new CreateTableOptions();//设置副本数tableOptions.setNumReplicas(1);//设置范围分区的规则LinkedList<String> parcols = new LinkedList<String>();parcols.add("CompanyId");//设置按照那个字段进行range分区tableOptions.setRangePartitionColumns(parcols);/*** range*  0 < value < 10* 10 <= value < 20* 20 <= value < 30* ........* 80 <= value < 90* */int count=0;for(int i =0;i<10;i++){//范围开始PartialRow lower = schema.newPartialRow();lower.addInt("CompanyId",count);//范围结束PartialRow upper = schema.newPartialRow();count +=10;upper.addInt("CompanyId",count);//设置每一个分区的范围tableOptions.addRangePartition(lower,upper);}try {kuduClient.createTable("student",schema,tableOptions);} catch (KuduException e) {e.printStackTrace();}kuduClient.close();
}
2.10.3 Multilevel Partitioning (多级分区)

Kudu 允许一个表在单个表上组合多级分区。

当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点 需求.

/*** 测试分区:* 多级分区* Multilevel Partition* 混合使用hash分区和range分区** 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,* hash分区和range分区结合,可以极大的提升kudu的性能*/
@Test
public void testMultilevelPartition() throws KuduException {//设置表的schemaLinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();columnSchemas.add(newColumn("CompanyId", Type.INT32,true));columnSchemas.add(newColumn("WorkId", Type.INT32,false));columnSchemas.add(newColumn("Name", Type.STRING,false));columnSchemas.add(newColumn("Gender", Type.STRING,false));columnSchemas.add(newColumn("Photo", Type.STRING,false));//创建schemaSchema schema = new Schema(columnSchemas);//创建表时提供的所有选项CreateTableOptions tableOptions = new CreateTableOptions();//设置副本数tableOptions.setNumReplicas(1);//设置范围分区的规则LinkedList<String> parcols = new LinkedList<String>();parcols.add("CompanyId");//hash分区tableOptions.addHashPartitions(parcols,5);//range分区int count=0;for(int i=0;i<10;i++){PartialRow lower = schema.newPartialRow();lower.addInt("CompanyId",count);count+=10;PartialRow upper = schema.newPartialRow();upper.addInt("CompanyId",count);tableOptions.addRangePartition(lower,upper);}try {kuduClient.createTable("cat",schema,tableOptions);} catch (KuduException e) {e.printStackTrace();}kuduClient.close();
}

2.11 修改表

package cn.itcast.kudu;import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;import java.util.List;/*** 修改表操作*/
public class AlterTable {//定义kudu的客户端对象private static KuduClient kuduClient;//定义一张表名称private static String tableName = "person";/*** 初始化操作*/@Beforepublic void init() {//指定kudu的master地址String masterAddress = "node2.itcast.cn";//创建kudu的数据库连接kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();}/*** 添加列*/@Testpublic void alterTableAddColumn() {AlterTableOptions alterTableOptions = new AlterTableOptions();alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());try {kuduClient.alterTable(tableName, alterTableOptions);} catch (KuduException e) {e.printStackTrace();}}/*** 删除列*/@Testpublic void alterTableDeleteColumn(){AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");try {kuduClient.alterTable(tableName, alterTableOptions);} catch (KuduException e) {e.printStackTrace();}}/*** 添加分区列*/@Testpublic void alterTableAddRangePartition(){int lowerValue = 110;int upperValue = 120;try {KuduTable kuduTable = kuduClient.openTable(tableName);List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);boolean flag = true;for (Partition rangePartition : rangePartitions) {int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");if(startKey == lowerValue){flag = false;}}if(flag) {PartialRow lower = kuduTable.getSchema().newPartialRow();lower.addInt("Id", lowerValue);PartialRow upper = kuduTable.getSchema().newPartialRow();upper.addInt("Id", upperValue);kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));}else{System.out.println("分区已经存在,不能重复创建!");}} catch (KuduException e) {e.printStackTrace();} catch (Exception exception) {exception.printStackTrace();}}/*** 删除表* @throws KuduException*/@Testpublic void dropTable() throws KuduException {kuduClient.deleteTable(tableName);}
}

3. Spark操作Kudu

  • Spark与KUDU集成支持:

    • DDL操作(创建/删除)

    • 本地Kudu RDD

    • Native Kudu数据源,用于DataFrame集成

    • 从kudu读取数据

    • 从Kudu执行插入/更新/ upsert /删除

    • 谓词下推

    • Kudu和Spark SQL之间的模式映射

    • 到目前为止,我们已经听说过几个上下文,例如SparkContext,SQLContext,HiveContext,SparkSession,现在,我们将使用Kudu引入一个KuduContext。这是可以在Spark应用程序中广播的主要可序列化对象。此类代表在Spark执行程序中与Kudu Java客户端进行交互。

    • KuduContext提供执行DDL操作所需的方法,与本机Kudu RDD的接口,对数据执行更新/插入/删除,将数据类型从Kudu转换为Spark等。

3.1 创建表

  • 定义kudu的表需要分成5个步骤:

    • 提供表名

    • 提供schema

    • 提供主键

    • 定义重要选项;例如:定义分区的schema

    • 调用create Table api

  • 代码开发

package cn.itcastimport java.util
import cn.itcast.SparkKuduDemo.TABLE_NAME
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object SparkKuduTest {def main(args: Array[String]): Unit = {//构建sparkConf对象val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")//构建SparkSession对象val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()//获取sparkContext对象val sc: SparkContext = sparkSession.sparkContextsc.setLogLevel("warn")//构建KuduContext对象val kuduContext = new KuduContext("node2.itcast.cn:7051", sc)//1.创建表操作createTable(kuduContext)/*** 创建表** @param kuduContext* @return*/def createTable(kuduContext: KuduContext) = {//如果表不存在就去创建if (!kuduContext.tableExists(TABLE_NAME)) {//构建创建表的表结构信息,就是定义表的字段和类型val schema: StructType = StructType(StructField("userId", StringType, false) ::StructField("name", StringType, false) ::StructField("age", IntegerType, false) ::StructField("sex", StringType, false) :: Nil)//指定表的主键字段val keys = List("userId")//指定创建表所需要的相关属性val options: CreateTableOptions = new CreateTableOptions//定义分区的字段val partitionList = new util.ArrayList[String]partitionList.add("userId")//添加分区方式为hash分区options.addHashPartitions(partitionList, 6)//创建表kuduContext.createTable(TABLE_NAME, schema, keys, options)}}}
}
}
定义表时要注意的是Kudu表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方 法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对 象;(要使“asJava”方法可用,请记住导入JavaConverters库。) 创建表后,通过将浏览器指向http//master主机名:8051/tables
  • 来查看Kudu主UI可以找到创建的表,通过单击表ID,能够看到表模式和分区信息。
./media/image5.png
~   1550729068964
点击Table id 可以观察到表的schema等信息:
./media/image6.png

~ 1550729141209

3.2 DML操作

Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成. 包括:

  • INSERT -将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。
    使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述
    的INSERT_IGNORE。

  • INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。

  • DELETE - 从Kudu表中删除DataFrame中的行

  • UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。

  • UPDATE - 更新dataframe中的行

3.2.1 插入数据insert操作

先创建一张表,然后把数据插入到表中

package cn.itcastimport java.utilimport cn.itcast.SparkKuduDemo.{TABLE_NAME, itcast}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object SparkKuduTest {//定义样例类case class itcast(id:Int, name:String, age:Int, sex:Int)def main(args: Array[String]): Unit = {//构建sparkConf对象val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")//构建SparkSession对象val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()//获取sparkContext对象val sc: SparkContext = sparkSession.sparkContextsc.setLogLevel("warn")//构建KuduContext对象val kuduContext = new KuduContext("node2.itcast.cn:7051", sc)//1.创建表操作createTable(kuduContext)/*** 创建表** @param kuduContext* @return*/def createTable(kuduContext: KuduContext) = {//如果表不存在就去创建if (!kuduContext.tableExists(TABLE_NAME)) {//构建创建表的表结构信息,就是定义表的字段和类型val schema: StructType = StructType(StructField("userId", StringType, false) ::StructField("name", StringType, false) ::StructField("age", IntegerType, false) ::StructField("sex", StringType, false) :: Nil)//指定表的主键字段val keys = List("userId")//指定创建表所需要的相关属性val options: CreateTableOptions = new CreateTableOptions//定义分区的字段val partitionList = new util.ArrayList[String]partitionList.add("userId")//添加分区方式为hash分区options.addHashPartitions(partitionList, 6)//创建表kuduContext.createTable(TABLE_NAME, schema, keys, options)}}/*** 2)加载数据* @param session* @param sc* @param kuduContext*/def inserData(session: SparkSession, sc: SparkContext, kuduContext: KuduContext): Unit = {//定义数据val data = List(itcast(1, "tom", 30, 1), itcast(2, "mark", 26, 0))val itcastRDD = sc.makeRDD(data)import session.implicits._val dataFrame: DataFrame = itcastRDD.toDFkuduContext.insertRows(dataFrame, TABLE_NAME)}}
}
3.2.2 删除数据delete操作
/*** 4)删除数据* @param session* @param kuduContext*/
def deleteData(session: SparkSession, kuduContext: KuduContext): Unit = {//定义数据val data = List(itcast(1, "tom", 50, 1), itcast(2, "mark", 30, 0))import session.implicits._val dataFrame: DataFrame = data.toDF().select("id")kuduContext.deleteRows(dataFrame, TABLE_NAME)
}
3.2.3 更新数据upsert操作
/*** 3)修改数据* @param session* @param kuduContext*/
def upDATEData(session: SparkSession, kuduContext: KuduContext): Unit = {//定义数据val data = List(itcast(1, "tom", 50, 1), itcast(2, "mark", 30, 0))import session.implicits._val dataFrame: DataFrame = data.toDF()kuduContext.upDATERows(dataFrame, TABLE_NAME)
}

3.3 dataFrame操作kudu

3.3.1 DataFrameApi读取kudu表中的数据

虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列表。

  • 代码示例
/*** 使用DataFrameApi读取kudu表中的数据* @param sparkSession* @param kuduMaster* @param tableName*/
def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {//定义map集合,封装kudu的master地址和要读取的表名val options = Map("kudu.master" -> kuduMaster,"kudu.table" -> tableName)sparkSession.read.options(options).kudu.show()
}
3.3.2 DataFrameApi写数据到kudu表中

在通过DataFrame API编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。

  • 代码示例
/*** 6)DataFrameApi写数据到kudu表中*/
def dataFrame2Kudu(session: SparkSession, kuduContext: KuduContext): Unit ={val data = List(itcast(3, "canglaoshi", 14, 0), itcast(4, "xiaowang", 18, 1))import  session.implicits._val dataFrame = data.toDF//目前,在kudu中,数据的写入只支持append追加dataFrame.write.mode("append").options(kuduOptions).kudu//查看结果//导包import org.apache.kudu.spark.kudu._//加载表的数据,导包调用kudu方法,转换为dataFrame,最后在使用show方法显示结果sparkSession.read.options(kuduOptions).kudu.show()
}
3.3.3 使用sparksql操作kudu表

可以选择使用SparkSQL直接使用INSERT语句写入Kudu表;与’append’类似,INSERT语句实际上将默认使用
UPSERT语义处理;

  • 代码示例
/*** 使用sparksql操作kudu表* @param sparkSession* @param sc* @param kuduMaster* @param tableName*/
def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {//定义map集合,封装kudu的master地址和表名val options = Map("kudu.master" -> kuduMaster,"kudu.table" -> tableName)val data = List(itcast(10, "小张", 30, 0), itcast(11, "小王", 40, 0))import sparkSession.implicits._val dataFrame: DataFrame = sc.parallelize(data).toDF//把dataFrame注册成一张表dataFrame.createTempView("temp1")//获取kudu表中的数据,然后注册成一张表sparkSession.read.options(options).kudu.createTempView("temp2")//使用sparkSQL的insert操作插入数据sparkSession.sql("insert into table temp2 select * from temp1")sparkSession.sql("select * from temp2 where age >30").show()
}

3.4 Kudu Native RDD

Spark与Kudu的集成同时提供了kudu RDD.

  • 代码示例
val columnsList = List("id", "name", "age", "sex")
val rowRDD: RDD[Row] = kuduContext.kuduRDD(sc, TABLE_NAME, columnsList)
rowRDD.foreach(println(_))
sc.stop()
//session.read.options(kuduOptions).kudu.show()

3.5 修改表

/*** 添加列* @param kuduContext*/
def addColumn(kuduContext: KuduContext): Unit ={val alterTableOptions: AlterTableOptions = new AlterTableOptionsalterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build)try {kuduContext.syncClient.alterTable(tableName, alterTableOptions)} catch {case ex:Exception => ex.printStackTrace()}
}

Kudu入门和API基础操作相关推荐

  1. 计算机基础入门操,计算机基础操作入门

    <计算机基础操作入门>由会员分享,可在线阅读,更多相关<计算机基础操作入门(40页珍藏版)>请在人人文库网上搜索. 1.1 计计 算算 机机 操操 作作 入入 门门 课堂主要内 ...

  2. Shader Forge 入门学习(一) 基础操作

    引言:失踪人口回归,最近几个月刚刚毕业,进入社会,对着未来有着些许迷茫,但起风了,唯有努力生存!近日学习Shader Forge,记录下来,共同进步!内容主要包括ShaderForge的操作设置,并配 ...

  3. 王者荣耀五周年,带你入门Python爬虫基础操作(102个英雄+326款皮肤)

    简单的目录 1.概述 2.网页分析 2.1.html页面源数据 2.2.json源数据 3.数据请求 4.数据解析 4.1.html数据解析 4.1.1.bs4 4.1.2.xpath 4.2.jso ...

  4. 王者荣耀五周年,带你入门Python爬虫基础操作!

    1.概述 <王者荣耀>上线至今5个年头了,作为这些年国内最热门的手游(没有之一),除了带来游戏娱乐之外,我们在这五周年之际,试着从他们的官网找点乐趣,学习一下Python爬虫的一些简单基础 ...

  5. 百度地图API基础操作--百度鹰眼篇

    久等了,鹰眼教程差点忘了写,嘿嘿,现在补上,其实鹰眼在深度运用时还是挺难搞的,会遇到很多坑,这次由于时间有限,忙里偷闲只出了一个基本功能,包含轨迹上传.历史轨迹获取及绘制,深度运用的话,如果大家有需要 ...

  6. Bamboo入门教程及基础操作

    买Bamboo上CSDN,特殊折扣购买通道:http://bss.csdn.net/module/btc/atlassian/prduct_detail?project=445&module= ...

  7. Linux入门学习——ssh基础操作

    接下来的学习中我们就会需要用到两个虚拟机互相的互动,如果想让两个不同的主机能相互连接互动的话,首先就需要调整两个主机的ip地址,那什么是ip呢,ip也叫网络之间互联的协议.也就是为计算机网络相互连接进 ...

  8. java 百度鹰眼sdk,百度地图API基础操作--百度鹰眼

    [实例简介] 博文地址:http://blog.csdn.net/qq_23931287/article/details/77684033 [实例截图] [核心代码] 6bd90028-b72d-48 ...

  9. JMX 入门(一)基础操作

    JMX 官方教程:http://docs.oracle.com/javase/tutorial/jmx/index.html 这篇博客参考官方教程以及个人的理解,通过实际的代码和操作来学会使用 JMX ...

最新文章

  1. 编译器错误信息: CS0016
  2. Oracle INTERVAL DAY TO SECOND数据类型
  3. VC的若干实用小技巧
  4. sql 2005判断某个表或某个表中的列是否存在
  5. 稀疏自编码器_基于tensorflow实现稀疏自编码和在推荐中的应用
  6. 【python】抄写大神的百度贴吧代码
  7. bootstrap 模态框无法使用_模态窗 Modal Window - 产品中的??注意力设计
  8. 28 CO配置-控制-产品成本控制-成本对象控制-期末结算-定义行标识
  9. 报告:中国人对AI的乐观程度远超其他国家
  10. 南昌大学计算机接收调剂的条件,关于2018年河南昌大学学硕士研究生接收调剂程序及要求的须知详情...
  11. ubuntu安装最新版apktool(最新版)反编译工具
  12. sun键盘没有stop键_【转帖】SUN基础知识
  13. Python利用结巴分词进行中文分词
  14. 基于 Flink 的 PB 级数据即席查询实践
  15. 职场7条小tips,一定有一条说到你心坎里
  16. 【Python量化】蒙特卡洛模拟法进行期权定价
  17. 关于解决无线拨号(PPPOE)上网的若干问题(WISP)
  18. 青岛科技大学和青岛大学计算机专业,青岛科技大学和青岛大学哪个好呢?优势的专业分别是什么?...
  19. uniapp开发微信公众号(h5)项目如何引入微信jssdk,分享,扫一扫
  20. WINDOWS更改无线网卡MAC地址的方法

热门文章

  1. 开发平台(ISV接入)
  2. 2019年10月8日股市走势预测——02
  3. SSM 企业权限管理系统 项目实战
  4. 为什么很多人都不喜欢做程序员?
  5. 坐拥上亿流量,星空华文能否在港股舞台上脱颖而出?
  6. linux进阶14——僵尸进程和孤儿进程
  7. 其他干货——大气人计算机必备技能——不同编程语言比较(Matlab, python, fortran, NCL, IDL)
  8. JAVA类型与JDBC类型对应表
  9. 基于Flink+Alink构建全端亿级实时用户画像系统
  10. 用Swift实现淘宝和大众点评的下拉刷新