kafka streams

MapR生态系统软件包2.0(MEP)随附了一些与MapR流有关的新功能:

  • 用于MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供了RESTful接口,使其易于使用和产生消息以及执行管理操作。
  • Kafka Connect for MapR Streams是一个实用程序,用于在MapR Streams与Apache Kafka和其他存储系统之间流式传输数据。

MapR生态系统软件包(MEP)是一种与核心升级脱钩的生态系统升级工具,可让您独立于MapR融合数据平台升级工具。 您可以在本文中了解有关MEP 2.0的更多信息 。

在此博客中,我们描述了如何使用Kafka REST代理向MapR流发布消息和从MapR流中使用消息。 REST代理是MapR融合数据平台的重要补充,它允许任何编程语言使用MapR流。

MapR Streams工具随附的Kafka REST Proxy可以与MapR Streams(默认)以及Apache Kafka(在混合模式下)一起使用。 在本文中,我们将重点介绍MapR流。

先决条件

  • 具有MEP 2.0的MapR融合数据平台5.2

    • 使用MapR Streams工具
  • curl,wget或任何HTTP / REST客户端工具

创建MapR流和主题

流是主题的集合,您可以通过以下方式将其分组管理:

  1. 设置适用于该流中所有主题的安全策略
  2. 为流中创建的每个新主题设置默认的分区数
  3. 设置流中每个主题中消息的生存时间

您可以在文档中找到有关MapR Streams概念的更多信息。

在您的MapR群集或沙盒上,运行以下命令:

$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

启动Kafka控制台的生产者和消费者

打开两个终端窗口,并使用以下命令运行使用者的Kafka实用程序:

消费者

  • 主题传感器-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
  • 主题传感器二进制
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

这两个终端窗口将允许您查看有关不同主题的消息。

使用Kafka REST代理

端点/ topics / [topic_name]允许您获取有关该主题的一些信息。 在MapR Streams中,主题是路径标识的流的一部分; 要通过REST API访问主题,您必须输入完整路径并在URL中进行编码; 例如:

  • / apps / iot-stream:sensor-json将使用%2Fapps%2Fiot-stream%3Asensor-json进行编码

运行以下命令,以获取有关sensor-json主题的信息:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

注意:为简单起见,我从运行Kafka REST代理的节点上运行命令,因此可以使用localhost

您可以通过添加Python命令,以漂亮的方式打印JSON,例如:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

默认流

如上所述,流路径是您必须在命令中使用的主题名称的一部分。 但是,可以将MapR Kafka REST代理配置为使用默认流。 对于此配置,您应该在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下属性:

  • stream.default.stream = / apps / iot-stream

更改Kafka REST代理配置时,必须使用maprcli或MCS重新启动服务。

使用streams.default.stream属性的主要原因是简化应用程序使用的URL。 例如:

  • 通过streams.default.stream ,可以使用curl -X GET http:// localhost:8082 / topics /
  • 如果没有此配置,或者要使用特定的流,则必须在URL中指定它: http:// localhost:8082 / topics /%2Fapps%2Fiot-stream%3Asensor-json

在本文中,所有URL都包含编码的流名称,因此您可以在不更改配置的情况下开始使用Kafka REST代理,也可以将其用于其他流。

用于MapR流的Kafka REST代理允许应用程序将消息发布到MapR流。 消息可以作为JSON或二进制内容(base64编码)发送。

要发送JSON消息:

  • 查询应该是HTTP POST
  • 内容类型应为:application / vnd.kafka.json.v1 + json
  • 身体:
{"records":[{"value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"}  }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

您应该在/ apps / iot-stream:sensor-json使用者正在运行的终端窗口中看到打印的消息。

发送二进制消息:

  • 查询应该是HTTP POST
  • 内容类型应为:application / vnd.kafka.binary.v1 + json
  • 身体:
{"records":[{"value":"SGVsbG8gV29ybGQ="}]
}

请注意, SGVsbG8gV29ybGQ =是在Base64中编码的字符串“ Hello World”。

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您应该在/ apps / iot-stream:sensor-binary使用者正在运行的终端窗口中看到打印的消息。

发送多个消息:

HTTP正文的记录字段允许您发送多个消息; 例如,您可以发送:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"}  }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"}  } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

该命令将发送2条消息,并将偏移量增加2。您可以通过在JSON数组中添加新元素来对二进制内容执行相同的操作; 例如:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您可能知道,可以将密钥绑定到消息,以确保所有具有相同密钥的消息都将到达同一分区。 为此,将key属性添加到消息中,如下所示:

{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

现在,您知道如何使用REST代理将消息发布到MapR Streams主题,让我们看看如何使用消息。

消费信息

REST代理还可以用于消费主题消息。 为此,您需要:

  1. 创建使用者实例。
  2. 使用第一个调用返回的URL来阅读消息。
  3. 如果需要,请删除使用者实例。

创建使用者实例

以下请求创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

服务器的响应如下所示:

{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer"
}

请注意,我们已经使用/ consumers / [topic_name]创建使用者。 后续请求将使用base_uri从主题获取消息。 与任何MapR Streams / Kafka使用者一样, auto.offset.reset定义其行为。 在此示例中,该值设置为最早 ,这意味着使用者将从头开始阅读消息。 您可以在MapR Streams文档中找到有关使用者配置的更多信息。

消费信息

要使用消息,只需将MapR Streams主题添加到使用者实例的URL。

以下请求使用了该主题的消息:

curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

此调用返回JSON文档中的消息:

[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3}
]

每次对API的调用都会根据上一次调用的偏移量返回发布的新消息。

请注意,消费者将被销毁:

  • Consumer.instance.timeout.ms设置了一些空闲时间(默认值设置为300000ms / 5分钟)后,使用REST API调用销毁了该空闲时间(请参见下文)。

消费二进制格式的消息

如果需要使用二进制消息,则方法是相同的:您需要更改格式和Accept标头。

调用此URL为二进制主题创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

然后使用消息,accept标头设置为application / vnd.kafka.binary.v1 + json

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

该调用返回JSON文档中的消息,并且该值在Base64中编码:

[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2}
]

删除使用者实例

如前所述,将根据REST Proxy的consumer.instance.timeout.ms配置自动销毁使用者 。 也可以使用使用者实例URI和HTTP DELETE调用销毁实例,如下所示:

curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

结论

在本文中,您学习了如何将Kafka REST代理用于MapR流,该代理允许任何应用程序使用在MapR融合数据平台中发布的消息。

您可以在MapR文档和以下资源中找到有关Kafka REST代理的更多信息:

  • MapR Streams入门
  • Ted Dunning和Ellen Friedman撰写的“流传输体系结构:使用Apache Kafka和MapR流的新设计”电子书

翻译自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams-2.html

kafka streams

kafka streams_Kafka REST Proxy MapR Streams入门相关推荐

  1. kafka streams_Kafka REST Proxy for MapR Streams入门

    kafka streams 介绍 MapR生态系统软件包2.0(MEP)随附了一些与MapR流有关的新功能: 用于MapR Streams的Kafka REST代理为MapR Streams和Kafk ...

  2. Kafka REST Proxy for MapR Streams入门

    介绍 MapR生态系统软件包2.0(MEP)随附了一些与MapR流有关的新功能: MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供RESTful接口,以 ...

  3. kafka connect_Kafka Connect在MapR上

    kafka connect 在本周的白板演练中,MapR的高级产品营销经理Ankur Desai描述了Apache Kafka Connect和REST API如何简化和提高在处理来自包括旧数据库或数 ...

  4. Kafka教程(一)基础入门:基本概念、安装部署、运维监控、命令行使用

    Kafka教程(一)基础入门 1.基本概念 背景 领英->Apache 分布式.消息发布订阅系统 角色 存储系统 消息系统 流处理平台-Kafka Streaming 特点 高吞吐.低延迟 cg ...

  5. 【python+flume+kafka+spark streaming】编写word_count入门示例

    一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...

  6. 如何使用Kafka API入门Spark流和MapR流

    这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息. Spark Streaming是核心Spark API的扩展,可实 ...

  7. spark和kafka_如何使用Kafka API入门Spark流和MapR流

    spark和kafka 这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息. Spark Streaming是核心Spa ...

  8. Kafka Connect在MapR上

    在本周的白板演练中,MapR的高级产品营销经理Ankur Desai描述了Apache Kafka Connect和REST API如何简化和提高在处理来自包括旧数据库或数据仓库在内的各种数据源的流数 ...

  9. ip integrator_使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构

    ip integrator " Lambda体系结构是一种数据处理体系结构,旨在通过利用批处理和流处理方法来处理大量数据. 这种体系结构方法尝试通过使用批处理提供批处理数据的全面而准确的视图 ...

最新文章

  1. iOS一些实用的技巧
  2. R语言基本描述性统计量函数
  3. Linux IPv6 地址配置
  4. 软考程序员Java答题速成_软考程序员考试试题解答方法与技巧
  5. Canny边缘检测原理及C#程序实现
  6. 小鑫の日常系列故事(六)——奇遇记_JAVA
  7. js发送get、post请求的方法简介
  8. 玩转oracle 11g(7):导出导入数据库
  9. 使用BIND安装智能DNS服务器(一)---基本的主从DNS服务器搭建
  10. python匿名函数里用for_请问这段Python代码如何用匿名函数简化?
  11. k8s ubuntu cni_手把手教你使用RKE快速部署K8S集群并部署Rancher HA
  12. Win10本地账户怎么更改为Microsoft账户
  13. [翻译]在jQuery 1.5中使用deferred对象
  14. finally 嵌套_学习 Rust【2】减少代码嵌套
  15. 树莓派3代linux,树莓派 3B 入门 ARMv8 Arch Linux
  16. 三容水箱液位控制系统_光电液位传感器在饮水机中的应用解决方案
  17. hfss螺旋平面_微波射频网HFSS平面螺旋天线设计
  18. 中国土地市场网数据爬取
  19. 云呐IT服务台在企业IT管理中的作用
  20. 线性代数中解方程组的加减消元和求特征向量的加减消元的区别

热门文章

  1. CodeForces:54
  2. CF1137F-Matches Are Not a Child‘s Play【LCT】
  3. jzoj6287-扭动的树【区间dp】
  4. P2473-[SCOI2008]奖励关【数学期望,状压dp】
  5. 【2018.4.7】模拟赛之五-ssl2386 序列【dp】
  6. 2021“MINIEYE杯”中国大学生算法设计超级联赛(2)I love counting(Trie树)
  7. 【矩阵乘法】【倍增】美食家(luogu 6772)
  8. 基于SSM+JBPM的智能化OA办公平台
  9. 一次惊险的跳槽面试经历(阿里/美团/头条/网易/有赞...)
  10. Java并发包:ConcurrentMap