阿里云kafka使用记录(python版本)
import socket from kafka import KafkaConsumer from kafka.errors import KafkaError# context.check_hostname = True consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],group_id='xx',api_version = (0,10))print('consumer start to consuming...') consumer.subscribe(('xx',)) for message in consumer:print(message.topic)print(message.offset)print(message.key)print(message.value)print(message.partition)
producer vpc版代码
#!/usr/bin/env python # encoding: utf-8import socket from kafka import KafkaProducer from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],api_version = (0,10),retries=5)partitions = producer.partitions_for('xx') print('Topic下分区: %s' % partitions)try:future = producer.send(topic='xx', value=b'hello aliyun-kafka!')future.get()print('send message succeed.') except KafkaError as e:print('send message failed.')print(e)
consumer公网版代码
import ssl import socket from kafka import KafkaConsumer from kafka.errors import KafkaErrorcontext = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert")consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],group_id='xxx',sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),sasl_plain_username='xxx',sasl_plain_password='1234567890')print('consumer start to consuming...') consumer.subscribe(('xxx', )) for message in consumer:print(message.topic)print(message.offset)print(message.value)break
#!/usr/bin/env python # encoding: utf-8import ssl import socket from kafka import KafkaProducer from kafka.errors import KafkaErrorcontext = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED # context.check_hostname = True context.load_verify_locations("/tmp/ca-cert") #这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),retries=5,sasl_plain_username='xx',sasl_plain_password='1234567890'#注意是access-key的最后十位) partitions = producer.partitions_for('xxx') print ('Topic下分区: %s' % partitions)try:future = producer.send('xxx', b'hello aliyun-kafka!')future.get()print('send message succeed.') except KafkaError as e:print('send message failed.')print(e)
从阿里云控台获得连接信息
转载于:https://www.cnblogs.com/castlevania/p/10370803.html
阿里云kafka使用记录(python版本)相关推荐
- 动态更新阿里云DDNS解析记录的IPv6地址,随时随地用域名远程访问自己的电脑【如何远程访问家里的电脑】
远程访问电脑 日志 简介 要求 1. 获取两台电脑 2.IPv6网络 2.1检查光猫 2.2检查路由器 2.3配置电脑防火墙 2.3.1添加ICMPv6协议 2.3.2配置SMB协议 2.4配置远程桌 ...
- 利用MQTT协议与阿里云数据交互的python程序
利用MQTT协议与阿里云数据交互的python程序 设计目的 功能要求和关键问题 环境配置问题 本地程序如何连接云上设备 云上的数据交互问题 界面设计问题 阿里云相关操作 本地程序 设计目的 设计开发 ...
- python海龟图画龙珠_阿里云天池龙珠计划——Python基础入门第2课:变量和赋值...
阿里云天池龙珠计划--Python基础入门第2课:变量和赋值 [我是测试题2]下面这段代码的运行结果是什么? # 运行一下结果就出来了 a = "hello" b = " ...
- 关于阿里云服务器配置Ubuntu18.04+python环境
关于阿里云服务器配置Ubuntu18.04+python环境 需要准备的软件有以下 Xshell Xftp 云服务器(这里以阿里云为例) 环境配置文件(附上下载链接) 点击前往git下载环境配置文件 ...
- 阿里云服务器centos7配置Python Django项目
阿里云服务器centos7配置Python Django项目 2020-3-25更新更换服务器后还原系统+相应配置 2019-12-13更新 Nginx域名绑定 2019-12-12更新 静态文件加载 ...
- 阿里云SMTP邮件发送Python demon实践
阿里云SMTP邮件发送Python & Django实践 说明: python 3.6 Django 2.2 阿里云轻应用服务器 注意:轻应用服务器上80 端口和25端口是被占用,465端口被 ...
- python实现批量变更阿里云DNS解析记录状态
包含两个脚本,一个是阿里云和维格表的同步脚本ali_dns.py,另一个是执行阿里云解析记录的开启暂停脚本ali_modify.py. 使用阿里云的sdk,ali_dns.py将所有域名解析拉取后写入 ...
- 技术干货 | 阿里云数据库PostgreSQL 13大版本揭秘
简介:阿里云RDS PostgreSQL是一款兼容开源PostgreSQL的全托管云数据库产品,自2015年首次发布以来,根据用户需求不断升级迭代,已支持9.4.10.11.12等多个版本,覆盖了高可 ...
- Knative 实战:基于阿里云 Kafka 实现消息推送
在 Knative 中已经提供了对 Kafka 事件源的支持,那么如何在阿里云上基于 Kafka 实现消息推送,本文给大家解锁这一新的姿势. 背景 消息队列 for Apache Kafka 是阿里云 ...
最新文章
- mysql replication延迟_MySQL Replication--复制延迟01--源码瞎猜
- Luogu P1087 FBI树
- 非法ip通过ssh成功登录,自动结束会话
- 项目: 图片放大缩小。
- Android 源码查看和解析
- corosync和pacemaker实现httpd和mysql双集群
- char添加一个字符_给你五十行代码把图片变成字符画!程序:太多了,一半都用不完...
- linux 进程通信 消息队列
- 本地运行vue.js项目,如何更改调试的默认端口?
- 漫步线性代数二十三——行列式公式
- (一)使用MLOps自动训练、测试和部署AI:概述
- hbase 核心知识
- U盘引导启动LINUX
- 国稻种芯百团计划行动 丰收节贸促会·黎志康:惠及亚非18国家
- Vue(狂神学习笔记)2021-10-8
- wb在计算机知识里是什么意思,计算机二级考试内容是什么?
- php中的 notice,PHP中出现Notice: Undefined index的三种解决办法
- oppo手机忘记解锁图案怎么办
- Axure8下载和安装
- 如何在5天内学会Vue?聊聊我的学习方法!
热门文章
- Windows Tftpd32 DHCP服务器 使用
- [LeetCode]题解(python):086-Partition List
- 找DB2存储过程的package
- 精美jQuery插件及源码 前端开发福利
- 登录过gnome主题后无法再登录xfce主题
- Java常用spark的pom.xml与读取csv为rdd到最终join操作+java常用pom.xml文件
- 真实集群中Flink命令行各种模式提交wordcount
- 模态识别分析:随机减量法/ITD法
- StarUML 逆向工程插件加载失败问题解决
- C++自定义非极大值抑制(Canny边缘检测,亚像素方法)