一、测试环境

名称
cpu Intel® Core™ i5-1035G1 CPU @ 1.00GHz
操作系统 CentOS Linux release 7.9.2009 (Core)
内存 4G
逻辑核数 3
Gbase-8a节点-IP 192.168.142.11
zookeeper节点-IP 192.168.142.10
kafka节点-IP 192.168.142.10
Gbase-8a数据库版本 8.6.2.43-R33.132743
zookeeper版本 3.4.9
kafka版本 2.11-0.10.2.1

二、安装ZooKeeper

1、解压

[root@czg0 install]# tar -xvf zookeeper-3.4.9.tar.gz

2、zoo.cfg

[root@czg0 zookeeper-3.4.9]# mkdir log[root@czg0 zookeeper-3.4.9]# mkdir data[root@czg0 zookeeper-3.4.9]# cd conf/[root@czg0 conf]# ll
总用量 12
-rw-rw-r-- 1 gbase gbase  535 8月  23 2016 configuration.xsl
-rw-rw-r-- 1 gbase gbase 2161 8月  23 2016 log4j.properties
-rw-rw-r-- 1 gbase gbase  922 8月  23 2016 zoo_sample.cfg[root@czg0 conf]# cp zoo_sample.cfg zoo.cfg [root@czg0 conf]# cat zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/install/zookeeper-3.4.9/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
dataLogDir=/opt/install/zookeeper-3.4.9/log
server.0=czg0:2888:3888
参数名 描述
dataDir zookeeper 数据存放目录。
clientPort zookeeper 服务监听端口号。
server zookeeper 集群部署IP。

3、myid

[root@czg0 conf]# cat /opt/install/zookeeper-3.4.9/data/myid
0

4、启动服务

[root@czg0 bin]# ./zkServer.sh start[root@czg0 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/install/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone

三、安装Kafka

1、解压

[root@czg0 install]# tar -xvf kafka_2.11-0.10.2.1.tgz

2、修改配置文件

(1)server.properties

[root@czg0 config]# cat server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://czg0:9092# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads handling network requests
num.network.threads=3# The number of threads doing disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma seperated list of directories under which to store log files
log.dirs=/opt/install/kafka_2.11-0.10.2.1/log# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=czg0:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
参数名 描述
broker.id 该值是 broker 节点在集群的 id 值,要求多节点部署的时候,可以不连续,但必须保证唯一。
log.dirs 该值是指定broker保存数据的目录,可以指定多个目录,多目录使用逗号分隔,不允许目录前后有空格。多节点部署时,尽量使用一个目录存储数据。
zookeeper.connect 该值是指定 broker 连接 zookeeper 服务的,多 zookeeper 服务间使用逗号分隔,不允许服务间有空格。否则会导致连接失败。端口也要与上一章节配置的 zookeeper 的服务端口一致。

(2)zookeeper.properties

[root@czg0 config]# cat zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/install/zookeeper-3.4.9/data
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=czg0:2888:3888

3、启动 Kafka

[root@czg0 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

检查进程是否存在

[root@czg0 kafka_2.11-0.10.2.1]# ps -ef|grep kafka

检查日志是否报错

[root@czg0 kafka_2.11-0.10.2.1]# cat logs/server.log

4、创建主题

[root@czg0 bin]# ./kafka-topics.sh --create --zookeeper czg0:2181 -partitions 1 --replication-factor 1 --topic sun
Created topic "sun".

5、查看主题

[root@czg0 bin]# ./kafka-topics.sh --list --zookeeper czg0:2181
sun

6、测试发布订阅主题

(1)发布主题

会话一

[root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic sun
Hello !!!
Sun

(2)订阅主题

会话二

[root@czg0 bin]# ./kafka-console-consumer.sh --bootstrap-server czg0:9092 --topic sunHello !!!
Sun

如果想从最开始的消息查看

[root@czg0 bin]# ./kafka-console-consumer.sh --bootstrap-server czg0:9092 --topic sun --from-beginningczg
zxj
hahaHello!!!
Sun
Czg
Zxj
Hello !!!
Sun

四、Load订阅Topic消费数据

1、Load语法

语法树大家可以参考之前的文章南大通用数据库-Gbase-8a-学习-14-LOAD加载数据

kafka://broker/topic[?[duration=XX][&][partition=partitionid|offset][#frombeginning]
参数名 描述
broker kafka的IP和端口,比如10.0.2.201:9092
topic kafka的数据源的topic名字,注意不要有横线,只包含常见的字母数字和下划线,不要有特殊字符,横线等。
duration 获取数据提交间隔。当达到该时间后,将提交这部分数据,保存到数据库。一般加载建议,30-300秒都是合适的。太短的间隔会导致数据库磁盘负载增加。
partition kafka里的分区,partitionid 分区编号,offset:偏移量。 最初的数据从0开始,但存在老化删除的情况。
#frombeginning 整个topic从头开始。注意不一定是0,因为有老化。也请注意这个参数和partition的区别,那个是指定某个partition从头开始,这个用来指定整个topic从头开始。不能出现2次,因为从语义上是冲突的。

2、创建测试表

gbase> create table sunczg (a int, b varchar(10));
Query OK, 0 rows affected (Elapsed: 00:00:01.85)gbase> desc sunczg;
+-------+-------------+------+-----+---------+-------+
| Field | Type        | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| a     | int(11)     | YES  |     | NULL    |       |
| b     | varchar(10) | YES  |     | NULL    |       |
+-------+-------------+------+-----+---------+-------+
2 rows in set (Elapsed: 00:00:00.00)

3、发布测试主题

[root@czg0 bin]# ./kafka-topics.sh --create --zookeeper czg0:2181 -partitions 1 --replication-factor 1 --topic moon
Created topic "moon".[root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic moon
1,czg
2,zxj
3,moon

4、加载数据

(1)从开始加载数据

gbase> load data infile 'kafka://czg0:9092/moon?duration=1000#frombeginning' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 3 rows affected (Elapsed: 00:00:02.81)
Task 2621747 finished, Loaded 3 records, Skipped 0 recordsgbase> select * from sunczg;
+------+------+
| a    | b    |
+------+------+
|    1 | czg  |
|    2 | zxj  |
|    3 | moon |
+------+------+
3 rows in set (Elapsed: 00:00:00.00)

(2)按照分片加载数据

我多发布了一条数据:4,sun
我这边加载的是0号分片,其中有数据。

gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=0|0' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 4 rows affected (Elapsed: 00:00:10.91)
Task 2621748 finished, Loaded 4 records, Skipped 0 recordsgbase> select * from sunczg;
+------+------+
| a    | b    |
+------+------+
|    1 | czg  |
|    2 | zxj  |
|    3 | moon |
|    1 | czg  |
|    2 | zxj  |
|    3 | moon |
|    4 | sun  |
+------+------+
7 rows in set (Elapsed: 00:00:00.12)

加载的1号分片,其中没有数据。

gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=1|0' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 0 rows affected (Elapsed: 00:00:10.70)
Task 2621750 finished, Loaded 0 records, Skipped 0 records

(3)分片偏移量测试

gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=0|1' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 3 rows affected (Elapsed: 00:00:11.53)
Task 2621749 finished, Loaded 3 records, Skipped 0 recordsgbase>
gbase> select * from sunczg;
+------+------+
| a    | b    |
+------+------+
|    1 | czg  |
|    2 | zxj  |
|    3 | moon |
|    1 | czg  |
|    2 | zxj  |
|    3 | moon |
|    4 | sun  |
|    2 | zxj  |
|    3 | moon |
|    4 | sun  |
+------+------+
10 rows in set (Elapsed: 00:00:00.27)

偏移量是1,跳过第一条数据,从第二条数据开始加载,也就是说,从2 |,zxj开始加载数据。

(4)增量加载数据

发布新数据8,lulu。
注意:不指定offset则不会重复消费。

[root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic moon
1,czg
2,zxj
3,moon
4,sun
5,haha
5,haha
6,test
7,happy
8,lulu
gbase> select * from sunczg;
+------+-------+
| a    | b     |
+------+-------+
|    1 | czg   |
|    2 | zxj   |
|    3 | moon  |
|    1 | czg   |
|    2 | zxj   |
|    3 | moon  |
|    4 | sun   |
|    2 | zxj   |
|    3 | moon  |
|    4 | sun   |
|    1 | czg   |
|    2 | zxj   |
|    3 | moon  |
|    4 | sun   |
|    5 | haha  |
|    5 | haha  |
|    6 | test  |
|    7 | happy |
+------+-------+
18 rows in set (Elapsed: 00:00:00.11)gbase> load data infile 'kafka://czg0:9092/moon?duration=10000' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 1 row affected (Elapsed: 00:00:11.14)
Task 2621765 finished, Loaded 1 records, Skipped 0 recordsgbase> select * from sunczg;
+------+-------+
| a    | b     |
+------+-------+
|    1 | czg   |
|    2 | zxj   |
|    3 | moon  |
|    1 | czg   |
|    2 | zxj   |
|    3 | moon  |
|    4 | sun   |
|    2 | zxj   |
|    3 | moon  |
|    4 | sun   |
|    1 | czg   |
|    2 | zxj   |
|    3 | moon  |
|    4 | sun   |
|    5 | haha  |
|    5 | haha  |
|    6 | test  |
|    7 | happy |
|    8 | lulu  |
+------+-------+
19 rows in set (Elapsed: 00:00:00.00)

5、查看元数据加载

查看gclusterdb库的topic名_库名_表名。

gbase> select * from gclusterdb.moon_czg_sunczg;
+---------+------------------+---------------------+
| scn     | partition_offset | commit_time         |
+---------+------------------+---------------------+
| 2621747 | 0:3              | 2022-08-26 15:04:03 |
| 2621748 | 0:4              | 2022-08-26 15:06:28 |
| 2621749 | 0:4              | 2022-08-26 15:07:03 |
| 2621750 | 1:0              | 2022-08-26 15:07:39 |
| 2621760 | 0:6              | 2022-08-26 15:25:09 |
| 2621762 | 0:7              | 2022-08-26 15:27:05 |
| 2621764 | 0:8              | 2022-08-26 15:28:02 |
| 2621765 | 0:9              | 2022-08-26 15:29:10 |
+---------+------------------+---------------------+
8 rows in set (Elapsed: 00:00:00.00)

注意点:
当一张表从复数个kafka订阅相同主题时,会导致上面的元数据表污染,建议topic名字不重复唯一。

6、删除元数据表后果(不建议)

(1)删除表

gbase> drop table gclusterdb.moon_czg_sunczg;
Query OK, 0 rows affected (Elapsed: 00:00:00.37)

(2)加载

会重新加载所有发布的数据。

gbase> load data infile 'kafka://czg0:9092/moon?duration=10000' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 9 rows affected (Elapsed: 00:00:12.86)
Task 2621768 finished, Loaded 9 records, Skipped 0 records

(3)查看元数据表

元数据表会根据之前说过的规则重新创建表。

gbase> select * from gclusterdb.moon_czg_sunczg;
+---------+------------------+---------------------+
| scn     | partition_offset | commit_time         |
+---------+------------------+---------------------+
| 2621768 | 0:9              | 2022-08-26 15:40:44 |
+---------+------------------+---------------------+
1 row in set (Elapsed: 00:00:00.09)

五、定时订阅Kafka主题

1、创建Kafka定时任务

(1)定时任务语法

create kafka consumer CONSUMER_NAME loader topic TOPIC_NAME brokers  'BROKER[,BROKERn..]' duration DURATION_TIME_MS into table [DBNAME.]TABLENAME;
参数名 描述
CONSUMER_NAME 自定义消费者名字
TOPIC_NAME Kafka主题
BROKER Kafka的IP,如果有多个,用逗号分割
DURATION_TIME_MS 消费Kafka数据间隔,单位是毫秒。建议30-300秒以上。
DBNAME 数据库名字
TABLENAME 数据表名字

(2)创建定时任务

gbase> create kafka consumer kc_gbase8a loader topic moon brokers  'czg0:9092' duration 3000 into table czg.sunczg;
Query OK, 0 rows affected (Elapsed: 00:00:03.33)
create consumer done.

(3)开启定时任务(报错)

后续有解决办法再分享出来。

gbase> start kafka consumer kc_gbase8a;
ERROR 1707 (HY000): gcluster command error: leader thread not started.

南大通用数据库-Gbase-8a-学习-19-Gbase8a从Kafka订阅Topic消费数据相关推荐

  1. linux安装南大通用数据库 GBase 8s V8.8

    linux安装南大通用数据库 GBase 8s V8.8 1.操作系统.数据库 2.下载链接 3.安装文档 4.安装前准备 4.1.以root用户创建 gbasedbt 组和用户 4.2.创建 GBa ...

  2. 南大通用数据库Gbase 8s遇到的坑

    南大通用数据库Gbase 8s的坑 1.没有批量插入,只能在xml循环拼接完整的INSERT语句,而不能拼接多个values 2.关键字段不能用于列查询,如timestamp,需要用表名点列名 3.在 ...

  3. springbootJpa 连接南大通用数据库Gbase配置

    第一步:添加连接驱动包以及hibernate方言包 驱动包下载链接:GBase8sV8.8连接驱动包.zip-其它文档类资源-CSDN下载 第二步:配置连接信息 spring:profiles: de ...

  4. 南大通用数据库-Gbase-8a-学习-21-Oracle到Gbase8a迁移工具orato8a

    一.测试环境 名称 值 cpu Intel® Core™ i5-1035G1 CPU @ 1.00GHz 操作系统 CentOS Linux release 7.9.2009 (Core) 内存 4G ...

  5. 南大通用数据库(2):客户端的使用(GBase 企业管理器的使用)

    在安装好南大通用服务端之后,安装好客户端之后,测试企业管理器的使用. 服务端是VMware 虚拟机 Centos7.6, IP 是192.168.80.129, PORT是5258 客户端是Win10 ...

  6. 南大通用数据库-Gbase-8a-学习-17-Gbase8a集群版本升级

    一.测试环境 名称 值 cpu Intel® Core™ i5-1035G1 CPU @ 1.00GHz 操作系统 CentOS Linux release 7.9.2009 (Core) 内存 4G ...

  7. 南大通用数据库-Gbase-8a-学习-34-gcdump(导出数据库对象定义)

    目录 一.测试环境 二.介绍 三.命令帮助 四.参数介绍 1.--print-defaults (1)含义 (2)例子 2.--connect_timeout (1)含义 (2)例子 3.-A, -- ...

  8. 南大通用数据库-Gbase-8a-学习-13-配置ODBC数据源(Linux、Win)

    一.测试环境 名称 值 cpu Intel® Core™ i5-1035G1 CPU @ 1.00GHz 操作系统 CentOS Linux release 7.9.2009 (Core) Gbase ...

  9. 南大通用数据库迁移工具使用指南

    一.工具简介 GBase Migration Toolkit 是南大通用自主研发的数据对象迁移工具,能够以图形化界面形式,实现以任务为单位对不同数据库之间库元数据和用户数据的自动迁移. 该工具基于C/ ...

最新文章

  1. 工厂三兄弟之简单工厂模式(二)
  2. 使用 yo 命令行向导给 SAP UI5 应用添加一个新的视图
  3. MapDB的使用实战[基于Java的数据库]
  4. 卡尔曼滤波matlab_汽车毫米波雷达距离测量中的一种扩展卡尔曼滤波实现
  5. vue打包后路径404问题解决方法
  6. leetcode 93.复原IP地址 dfs解法
  7. 使用gc、objgraph干掉python内存泄露与循环引用!
  8. 文件夹html文件批量替换,DirRplacer(文件批量替换工具)
  9. 关于两个HC05通讯的实际操作流程
  10. (Java)L1-039 古风排版
  11. 【笔记】项目工作中总结(三)
  12. Pikachu靶场全关详细教学(一)
  13. c语言函数传递坐标值,C语言中,函数间数据传递的方式有三种,即用返回值、参数、全局变量。...
  14. 7张图带你轻松入门RocketMQ
  15. SQL-SQL函数(二)Scalar 函数
  16. python是商业组织吗_基于Python的电子商务系统的弹性架构与思考
  17. linux查看设备和硬盘序列号 ip mac地址
  18. 小学妹听了都说棒的:国王试毒酒问题
  19. 水平耀斑_将带有油漆和透明度键的耀斑添加到您的C#项目中
  20. Cloudxns倒下,7月17日关闭域名解析,解析任务交给谁?

热门文章

  1. 无线传输介质+综合布控系统
  2. 偶尔会有的一点感受(一)
  3. Reinforement Learning-chapter1
  4. costmap_2d(1)
  5. 淘宝/天猫上传图片到淘宝 API 调用说明 请求示例
  6. 无线风力报警仪塔吊安全助手使用的意义
  7. 搬家完毕,感谢你的关注~
  8. 学计算机 数学日记,我的数学日记
  9. Chromium,WebRTC本地视频前处理
  10. 高并发架构——软负载均衡器Haproxy介绍以及安装