Pulsar-io-iotdb Sink 开发步骤
Pulsar IO 模块依赖 Pulsar function,部署及架构参考Apache Pulsar 官方链接:
Deploy Pulsar Functions
编译Pulsar-io-iotdb Sink插件
下载pulsar源码。
git clone https://github.com/apache/pulsar.git
将iotdb.tar sink源码解压至pulsar/pulsar-io目录下,在pulsar/pulsar-io/pom.xml文件添加iotdb项,如图所示:
在pulsar目录下 编译iotdb sink。
mvn clean package -pl pulsar-io/iotdb -am -Dmaven.test.skip=true
编译成功后,在pulsar/pulsar-io/iotdb目录下生成target目录,target目录下会有pulsar-io-iotdb-3.0.0.nar插件包。
创建IoTDB容器
# 创建 docker bridge 网络
docker network create --driver=bridge --subnet=172.18.0.0/16 --gateway=172.18.0.1 iotdb
dockerfile
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
version: "3"
services:iodb-confignode:image: apache/iotdb:1.1.0-confignodehostname: iotdb-confignodecontainer_name: iotdb-confignodeenvironment:- cn_internal_address=iotdb-confignode- cn_internal_port=10710- cn_consensus_port=10720- cn_target_config_node_list=iotdb-confignode:10710volumes:- ./data/confignode:/iotdb/data- ./logs/confignode:/iotdb/logsnetworks:iotdb:ipv4_address: 172.18.0.3iotdb-datanode1:image: apache/iotdb:1.1.0-datanodehostname: iotdb-datanode-1container_name: iotdb-datanode-1ports:- "6667:6667"environment:- dn_rpc_address=iotdb-datanode-1- dn_internal_address=iotdb-datanode-1- dn_target_config_node_list=iotdb-confignode:10710- dn_rpc_port=6667- dn_mpp_data_exchange_port=10740- dn_schema_region_consensus_port=10750- dn_data_region_consensus_port=10760volumes:- ./data/datanode1:/iotdb/data/- ./logs/datanode1:/iotdb/logs/networks:iotdb:ipv4_address: 172.18.0.6iotdb-datanode2:image: apache/iotdb:1.1.0-datanodehostname: iotdb-datanode-2container_name: iotdb-datanode-2environment:- dn_rpc_address=iotdb-datanode-2- dn_internal_address=iotdb-datanode-2- dn_target_config_node_list=iotdb-confignode:10710- dn_rpc_port=6667- dn_mpp_data_exchange_port=10740- dn_schema_region_consensus_port=10750- dn_data_region_consensus_port=10760volumes:- ./data/datanode2:/iotdb/data/- ./logs/datanode2:/iotdb/logs/networks:iotdb:ipv4_address: 172.18.0.4networks:iotdb:external: true
运行
docker-compose up –d
,成功会有拉起三个容器:
运行
docker exec -ti iotdb-datanode-1 /iotdb/sbin/start-cli.sh -h iotdb-datanode-1
进入iotdb容器并连接iotdb数据库
创建Pulsar proxy容器
运行
docker run -d --name proxy -p 6650:6650 -p 8080:8080 --network=iotdb apachepulsar/pulsar-all:latest bin/pulsar standalone
IoTDB sink 测试
- cd pulsar目录,使用
docker cp pulsar-io/iotdb/target/pulsar-io-iotdb-3.0.0.nar proxy:/pulsar/connectors 将生成的pulsar-io-iotdb-3.0.0.nar
包拷贝至pulsar proxy容器内。
- 运行
docker exec -it proxy /bin/sh
进入pulsar proxy 容器内部
- 创建主题:
bin/pulsar-admin topics create persistent://public/default/my-topic
- 运行sink:
bin/pulsar-admin sinks localrun \--archive connectors/pulsar-io-iotdb-3.0.0.nar \--tenant public \--namespace default \--name iotdb-sink \--sink-config '{"host": "172.18.0.6","port": 6667,"user": "root","password": "root","batchSize": 1,"storageGroup": "root.iotdb1.device1"}' \--inputs my-topic
- 另起一个客户端运行
docker exec -it proxy /bin/sh
进入pulsar proxy容器内部
- 分别发送两次消息:
bin/pulsar-client produce my-topic -s "\n" -m {"temperature":18,"timestamp":1686021685001}
bin/pulsar-client produce my-topic -s "\n" -m {"temperature":23,"timestamp":1686021685005}
- 另起客户端,运行
docker exec -ti iotdb-datanode-1 /iotdb/sbin/start-cli.sh -h iotdb-datanode-1
进入iotdb容器并连接iotdb数据库
- 运行
select * from root.iotdb1.device1
可查阅iotdb数据已存在,测试OK.
Pulsar sink 常见命令
Create
创建 sink
Go |
常用参数
- -a,--archive : 指定 sink 的 NAR 包
- --classname : 指定 sink 的类名称
- -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --parallelism : 指定 sink 的并发数
- --sink-config-file : 指定 sink 的 yaml 配置文件
- --tenant : 指定 sink 的租户
Update
更新 sink
Go |
常用参数
- -a,--archive : 指定 sink 的 NAR 包
- --classname : 指定 sink 的类名称
- -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --parallelism : 指定 sink 的并发数
- --sink-config-file : 指定 sink 的 yaml 配置文件
- --tenant : 指定 sink 的租户
Delete
删除 sink
Go |
常用参数
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
List
显示所有 sink
Go |
常用参数
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
Get
显示 sink 的信息
Go |
常用参数
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
Status
显示 sink 的状态
Go |
常用参数
- --instance-id : 指定 sink 的实例 ID
- 如果未指定,则获取所有实例的状态
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
Stop
停止 sink
Go |
常用参数
- --instance-id : 指定 sink 的实例 ID
- 如果未指定,则停止所有实例的状态
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
Start
启动 sink
Go |
常用参数
- --instance-id : 指定 sink 的实例 ID
- 如果未指定,则启动所有实例
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
Restart
重启 sink
Go |
常用参数
- --instance-id : 指定 sink 的实例 ID
- 如果未指定,则获取所有实例的状态
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --tenant : 指定 sink 的租户
Localrun
本地运行
在本地运行一个 Pulsar IO sink connector,方便调试。
Go |
常用参数
- -a,--archive : 指定 source 的 NAR 包
- --classname : 指定 sink 的类名称
- -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
- --name : 指定 sink 的名称
- --namespace : 指定 sink 的命名空间
- --parallelism : 指定 sink 的并发数
- --sink-config-file : 指定 sink 的 yaml 配置文件
- --tenant : 指定 sink 的租户
Pulsar-io-iotdb Sink 开发步骤相关推荐
- Pulsar IO 简介
翻译:StreamNative--Sijia Apache Pulsar 是业界领先的消息系统.使用消息系统时,一个较为常见的问题就是:将数据移入或移出消息平台的最佳方法是什么?当然,用户可以使用 P ...
- hadoop日志数据分析开发步骤及代码
日志数据分析: 1.背景 1.1 hm论坛日志,数据分为两部分组成,原来是一个大文件,是56GB:以后每天生成一个文件,大约是150-200MB之间: 1.2 日志格式是apache common日志 ...
- EOS开发步骤(1) 开发说明
1. 开发步骤 创建钱包 创建帐户 部署token合约,以便区块链准备好创建新的token. 创建新token. 将新token分配给创世帐户(eosio). 在用户之间转移token.(创建交易.创 ...
- Dropwizard入门及开发步骤
Dropwizard介绍 Dropwizard结构的服务组成 开发步骤 Dropwizard介绍 Dropwizard是一个微服务框架, 是各项技术的一个集成封装.它包含了以下组件: 嵌入式Jetty ...
- 数据库MySQL基础---JDBC开发步骤--JDBC封装工具类--PreparedStatement实现CRUD操作
JDBC简介 1.JDBC定义Java数据库连接(Java Database Connectivity,简称JDBC):是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询 ...
- EJB3.0高速入门项目开发步骤
EJB3.0开发步骤 1. 开发环境 IDE开发工具:Eclipse Java EE IDE for Web Developers EJB容器:jboss-4.2.3.GA 后台数据库:MysQL ...
- 谷歌了java集成开发_Spring整合Kaptcha谷歌验证码工具的开发步骤
开发步骤: 1.加入依赖 com.google.code.kaptcha kaptcha 2.3 国内镜像无法下载该依赖,需要手动通过jar包在本地仓库安装一个依赖. 安装命令: mvn instal ...
- 05:JDBC的开发步骤,及其抽取的JDBCUtils工具类
1.1.JDBC是什么? JDBC是一种用于执行SQL语句的Java API.(Java Data Base Connectivity,java数据库连接),是Java访问数据库的标准规范,可以为不同 ...
- dropwizard 连接mysql_Dropwizard入门及开发步骤
Dropwizard介绍 Dropwizard是一个微服务框架, 是各项技术的一个集成封装.它包含了以下组件: - 嵌入式Jetty,一个应用程序被打包成一个Jar文件,并开始自已嵌入的Jetty容器 ...
最新文章
- Opencv中Homography
- excel中日期转成java_用Java程序将日期转换为序列号,就像在Excel中一样
- linux xampp常见问题
- iphone 使用委托(delegate)在不同的窗口之间传递数据
- t-mobile频段_T-Mobile再次被黑客入侵:超过200万个帐号和地址可能泄漏
- Java 8 Friday:不再需要ORM
- python登录网页账号密码_遇到需要登录的网站怎么办?学好python,用这3招轻松搞定...
- L2-007 家庭房产(并查集)
- ***基于协同过滤,NMF和Baseline的推荐算法
- 计算机涉及数学知识点,2019计算机考研数学知识点解读:一元函数积分学
- SDAU信息学院LaTeX模板使用指南
- 维和医疗分队患者信息管理系统的开发与研究
- HTML+CSS+JavaScript实现放大镜效果
- 陕西师范大学第七届程序设计竞赛网络同步赛 D	ZQ的睡前故事(java)
- Linux内核信号杀死内核线程,linux内核线程对信号的处理过程.
- iOS - 一份参考简历,请注意查收!
- UBOOT源码分析的第一阶段start.S分析(3)
- JS中的数组转变成JSON格式字符串的方法
- DIN卡轨式安装工业宽温8口百兆工业级以太网交换机
- easyexcel的使用-个人笔记