Pulsar IO 模块依赖 Pulsar function,部署及架构参考Apache Pulsar 官方链接:

Deploy Pulsar Functions

编译Pulsar-io-iotdb Sink插件

下载pulsar源码。

git clone https://github.com/apache/pulsar.git

将iotdb.tar sink源码解压至pulsar/pulsar-io目录下,在pulsar/pulsar-io/pom.xml文件添加iotdb项,如图所示:

在pulsar目录下 编译iotdb sink。

mvn clean package -pl pulsar-io/iotdb -am -Dmaven.test.skip=true

编译成功后,在pulsar/pulsar-io/iotdb目录下生成target目录,target目录下会有pulsar-io-iotdb-3.0.0.nar插件包。

创建IoTDB容器

# 创建 docker bridge 网络

docker network create --driver=bridge --subnet=172.18.0.0/16 --gateway=172.18.0.1 iotdb

dockerfile

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
version: "3"
services:iodb-confignode:image: apache/iotdb:1.1.0-confignodehostname: iotdb-confignodecontainer_name: iotdb-confignodeenvironment:- cn_internal_address=iotdb-confignode- cn_internal_port=10710- cn_consensus_port=10720- cn_target_config_node_list=iotdb-confignode:10710volumes:- ./data/confignode:/iotdb/data- ./logs/confignode:/iotdb/logsnetworks:iotdb:ipv4_address: 172.18.0.3iotdb-datanode1:image: apache/iotdb:1.1.0-datanodehostname: iotdb-datanode-1container_name: iotdb-datanode-1ports:- "6667:6667"environment:- dn_rpc_address=iotdb-datanode-1- dn_internal_address=iotdb-datanode-1- dn_target_config_node_list=iotdb-confignode:10710- dn_rpc_port=6667- dn_mpp_data_exchange_port=10740- dn_schema_region_consensus_port=10750- dn_data_region_consensus_port=10760volumes:- ./data/datanode1:/iotdb/data/- ./logs/datanode1:/iotdb/logs/networks:iotdb:ipv4_address: 172.18.0.6iotdb-datanode2:image: apache/iotdb:1.1.0-datanodehostname: iotdb-datanode-2container_name: iotdb-datanode-2environment:- dn_rpc_address=iotdb-datanode-2- dn_internal_address=iotdb-datanode-2- dn_target_config_node_list=iotdb-confignode:10710- dn_rpc_port=6667- dn_mpp_data_exchange_port=10740- dn_schema_region_consensus_port=10750- dn_data_region_consensus_port=10760volumes:- ./data/datanode2:/iotdb/data/- ./logs/datanode2:/iotdb/logs/networks:iotdb:ipv4_address: 172.18.0.4networks:iotdb:external: true

运行

docker-compose up –d

,成功会有拉起三个容器:

运行

docker exec -ti iotdb-datanode-1 /iotdb/sbin/start-cli.sh -h iotdb-datanode-1

进入iotdb容器并连接iotdb数据库

创建Pulsar proxy容器

运行

docker run -d --name proxy -p 6650:6650  -p 8080:8080 --network=iotdb apachepulsar/pulsar-all:latest bin/pulsar standalone

IoTDB sink 测试

  1. cd pulsar目录,使用

    docker cp pulsar-io/iotdb/target/pulsar-io-iotdb-3.0.0.nar proxy:/pulsar/connectors 将生成的pulsar-io-iotdb-3.0.0.nar

    包拷贝至pulsar proxy容器内。

  2. 运行
    docker exec -it proxy /bin/sh 

    进入pulsar proxy 容器内部

  3. 创建主题:
    bin/pulsar-admin topics create persistent://public/default/my-topic
  4. 运行sink:
bin/pulsar-admin sinks localrun \--archive connectors/pulsar-io-iotdb-3.0.0.nar \--tenant public \--namespace default \--name iotdb-sink \--sink-config '{"host": "172.18.0.6","port": 6667,"user": "root","password": "root","batchSize": 1,"storageGroup": "root.iotdb1.device1"}' \--inputs my-topic
  1. 另起一个客户端运行

    docker exec -it proxy /bin/sh

    进入pulsar proxy容器内部

  2. 分别发送两次消息:
bin/pulsar-client produce my-topic -s "\n" -m {"temperature":18,"timestamp":1686021685001}
bin/pulsar-client produce my-topic -s "\n" -m {"temperature":23,"timestamp":1686021685005}
  1. 另起客户端,运行

    docker exec -ti iotdb-datanode-1 /iotdb/sbin/start-cli.sh -h iotdb-datanode-1

    进入iotdb容器并连接iotdb数据库

  2. 运行
    select * from root.iotdb1.device1

可查阅iotdb数据已存在,测试OK.

Pulsar sink 常见命令

Create

创建 sink

Go
$ bin/pulsar-admin sink create <options>

常用参数

  • -a,--archive : 指定 sink 的 NAR 包
  • --classname : 指定 sink 的类名称
  • -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --parallelism : 指定 sink 的并发数
  • --sink-config-file : 指定 sink 的 yaml 配置文件
  • --tenant : 指定 sink 的租户

Update

更新 sink

Go
$ bin/pulsar-admin sink update <options>

常用参数

  • -a,--archive : 指定 sink 的 NAR 包
  • --classname : 指定 sink 的类名称
  • -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --parallelism : 指定 sink 的并发数
  • --sink-config-file : 指定 sink 的 yaml 配置文件
  • --tenant : 指定 sink 的租户

Delete

删除 sink

Go
$ bin/pulsar-admin sink delete <options>

常用参数

  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

List

显示所有 sink

Go
$ bin/pulsar-admin sink list <options>

常用参数

  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

Get

显示 sink 的信息

Go
$ bin/pulsar-admin sink get <options>

常用参数

  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

Status

显示 sink 的状态

Go
$ bin/pulsar-admin sink status <options>

常用参数

  • --instance-id :  指定 sink 的实例 ID
  • 如果未指定,则获取所有实例的状态
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

Stop

停止 sink

Go
$ bin/pulsar-admin sink stop <options>

常用参数

  • --instance-id :  指定 sink 的实例 ID
  • 如果未指定,则停止所有实例的状态
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

Start

启动 sink

Go
$ bin/pulsar-admin sink start <options>

常用参数

  • --instance-id :  指定 sink 的实例 ID
  • 如果未指定,则启动所有实例
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

Restart

重启 sink

Go
$ bin/pulsar-admin sink restart <options>

常用参数

  • --instance-id :  指定 sink 的实例 ID
  • 如果未指定,则获取所有实例的状态
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --tenant : 指定 sink 的租户

Localrun

本地运行

在本地运行一个 Pulsar IO sink connector,方便调试。

Go
$ bin/pulsar-admin sink localrun <options>

常用参数

  • -a,--archive : 指定 source 的 NAR 包
  • --classname : 指定 sink 的类名称
  • -i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
  • --name : 指定 sink 的名称
  • --namespace : 指定 sink 的命名空间
  • --parallelism : 指定 sink 的并发数
  • --sink-config-file : 指定 sink 的 yaml 配置文件
  • --tenant : 指定 sink 的租户

Pulsar-io-iotdb Sink 开发步骤相关推荐

  1. Pulsar IO 简介

    翻译:StreamNative--Sijia Apache Pulsar 是业界领先的消息系统.使用消息系统时,一个较为常见的问题就是:将数据移入或移出消息平台的最佳方法是什么?当然,用户可以使用 P ...

  2. hadoop日志数据分析开发步骤及代码

    日志数据分析: 1.背景 1.1 hm论坛日志,数据分为两部分组成,原来是一个大文件,是56GB:以后每天生成一个文件,大约是150-200MB之间: 1.2 日志格式是apache common日志 ...

  3. EOS开发步骤(1) 开发说明

    1. 开发步骤 创建钱包 创建帐户 部署token合约,以便区块链准备好创建新的token. 创建新token. 将新token分配给创世帐户(eosio). 在用户之间转移token.(创建交易.创 ...

  4. Dropwizard入门及开发步骤

    Dropwizard介绍 Dropwizard结构的服务组成 开发步骤 Dropwizard介绍 Dropwizard是一个微服务框架, 是各项技术的一个集成封装.它包含了以下组件: 嵌入式Jetty ...

  5. 数据库MySQL基础---JDBC开发步骤--JDBC封装工具类--PreparedStatement实现CRUD操作

    JDBC简介 1.JDBC定义Java数据库连接(Java Database Connectivity,简称JDBC):是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询 ...

  6. EJB3.0高速入门项目开发步骤

    EJB3.0开发步骤 1.   开发环境 IDE开发工具:Eclipse Java EE IDE for Web Developers EJB容器:jboss-4.2.3.GA 后台数据库:MysQL ...

  7. 谷歌了java集成开发_Spring整合Kaptcha谷歌验证码工具的开发步骤

    开发步骤: 1.加入依赖 com.google.code.kaptcha kaptcha 2.3 国内镜像无法下载该依赖,需要手动通过jar包在本地仓库安装一个依赖. 安装命令: mvn instal ...

  8. 05:JDBC的开发步骤,及其抽取的JDBCUtils工具类

    1.1.JDBC是什么? JDBC是一种用于执行SQL语句的Java API.(Java Data Base Connectivity,java数据库连接),是Java访问数据库的标准规范,可以为不同 ...

  9. dropwizard 连接mysql_Dropwizard入门及开发步骤

    Dropwizard介绍 Dropwizard是一个微服务框架, 是各项技术的一个集成封装.它包含了以下组件: - 嵌入式Jetty,一个应用程序被打包成一个Jar文件,并开始自已嵌入的Jetty容器 ...

最新文章

  1. Opencv中Homography
  2. excel中日期转成java_用Java程序将日期转换为序列号,就像在Excel中一样
  3. linux xampp常见问题
  4. iphone 使用委托(delegate)在不同的窗口之间传递数据
  5. t-mobile频段_T-Mobile再次被黑客入侵:超过200万个帐号和地址可能泄漏
  6. Java 8 Friday:不再需要ORM
  7. python登录网页账号密码_遇到需要登录的网站怎么办?学好python,用这3招轻松搞定...
  8. L2-007 家庭房产(并查集)
  9. ***基于协同过滤,NMF和Baseline的推荐算法
  10. 计算机涉及数学知识点,2019计算机考研数学知识点解读:一元函数积分学
  11. SDAU信息学院LaTeX模板使用指南
  12. 维和医疗分队患者信息管理系统的开发与研究
  13. HTML+CSS+JavaScript实现放大镜效果
  14. 陕西师范大学第七届程序设计竞赛网络同步赛 D ZQ的睡前故事(java)
  15. Linux内核信号杀死内核线程,linux内核线程对信号的处理过程.
  16. iOS - 一份参考简历,请注意查收!
  17. UBOOT源码分析的第一阶段start.S分析(3)
  18. JS中的数组转变成JSON格式字符串的方法
  19. DIN卡轨式安装工业宽温8口百兆工业级以太网交换机
  20. easyexcel的使用-个人笔记

热门文章

  1. 使用小程序实现图表(圆饼图、柱状图、折线图)
  2. 数据度量消费贷款—消费者的考虑因素
  3. 微信小程序自定义导航栏机型适配--底部Tabbar--view高度--底部按钮适配
  4. TracePro | 建立光源
  5. Linux7安装硬盘显示错误,从硬盘安装linux7.3出现的问题...
  6. 双目相机计算稠密深度点云(二)
  7. 制作一张支持中文的Knoppix CD
  8. 锁存器、触发器和寄存器区别对比-基础小知识(十)
  9. Java开发面试简历这么写,提高命中率
  10. OpenCV模板匹配算法详解