kafka端
consumer vpc版代码
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError# context.check_hostname = True

consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],group_id='xx',api_version = (0,10))print('consumer start to consuming...')
consumer.subscribe(('xx',))
for message in consumer:print(message.topic)print(message.offset)print(message.key)print(message.value)print(message.partition)

producer vpc版代码

#!/usr/bin/env python
# encoding: utf-8import socket
from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],api_version = (0,10),retries=5)partitions = producer.partitions_for('xx')
print('Topic下分区: %s' % partitions)try:future = producer.send(topic='xx', value=b'hello aliyun-kafka!')future.get()print('send message succeed.')
except KafkaError as e:print('send message failed.')print(e)

consumer公网版代码

import ssl
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaErrorcontext = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],group_id='xxx',sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),sasl_plain_username='xxx',sasl_plain_password='1234567890')print('consumer start to consuming...')
consumer.subscribe(('xxx', ))
for message in consumer:print(message.topic)print(message.offset)print(message.value)break

producer 公网版代码
#!/usr/bin/env python
# encoding: utf-8import ssl
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaErrorcontext = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")
#这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo

producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],sasl_mechanism="PLAIN",ssl_context=context,security_protocol='SASL_SSL',api_version = (0,10),retries=5,sasl_plain_username='xx',sasl_plain_password='1234567890'#注意是access-key的最后十位)

partitions = producer.partitions_for('xxx')
print ('Topic下分区: %s' % partitions)try:future = producer.send('xxx', b'hello aliyun-kafka!')future.get()print('send message succeed.')
except KafkaError as e:print('send message failed.')print(e)

从阿里云控台获得连接信息

转载于:https://www.cnblogs.com/castlevania/p/10370803.html

阿里云kafka使用记录(python版本)相关推荐

  1. 动态更新阿里云DDNS解析记录的IPv6地址,随时随地用域名远程访问自己的电脑【如何远程访问家里的电脑】

    远程访问电脑 日志 简介 要求 1. 获取两台电脑 2.IPv6网络 2.1检查光猫 2.2检查路由器 2.3配置电脑防火墙 2.3.1添加ICMPv6协议 2.3.2配置SMB协议 2.4配置远程桌 ...

  2. 利用MQTT协议与阿里云数据交互的python程序

    利用MQTT协议与阿里云数据交互的python程序 设计目的 功能要求和关键问题 环境配置问题 本地程序如何连接云上设备 云上的数据交互问题 界面设计问题 阿里云相关操作 本地程序 设计目的 设计开发 ...

  3. python海龟图画龙珠_阿里云天池龙珠计划——Python基础入门第2课:变量和赋值...

    阿里云天池龙珠计划--Python基础入门第2课:变量和赋值 [我是测试题2]下面这段代码的运行结果是什么? # 运行一下结果就出来了 a = "hello" b = " ...

  4. 关于阿里云服务器配置Ubuntu18.04+python环境

    关于阿里云服务器配置Ubuntu18.04+python环境 需要准备的软件有以下 Xshell Xftp 云服务器(这里以阿里云为例) 环境配置文件(附上下载链接) 点击前往git下载环境配置文件 ...

  5. 阿里云服务器centos7配置Python Django项目

    阿里云服务器centos7配置Python Django项目 2020-3-25更新更换服务器后还原系统+相应配置 2019-12-13更新 Nginx域名绑定 2019-12-12更新 静态文件加载 ...

  6. 阿里云SMTP邮件发送Python demon实践

    阿里云SMTP邮件发送Python & Django实践 说明: python 3.6 Django 2.2 阿里云轻应用服务器 注意:轻应用服务器上80 端口和25端口是被占用,465端口被 ...

  7. python实现批量变更阿里云DNS解析记录状态

    包含两个脚本,一个是阿里云和维格表的同步脚本ali_dns.py,另一个是执行阿里云解析记录的开启暂停脚本ali_modify.py. 使用阿里云的sdk,ali_dns.py将所有域名解析拉取后写入 ...

  8. 技术干货 | 阿里云数据库PostgreSQL 13大版本揭秘

    简介:阿里云RDS PostgreSQL是一款兼容开源PostgreSQL的全托管云数据库产品,自2015年首次发布以来,根据用户需求不断升级迭代,已支持9.4.10.11.12等多个版本,覆盖了高可 ...

  9. Knative 实战:基于阿里云 Kafka 实现消息推送

    在 Knative 中已经提供了对 Kafka 事件源的支持,那么如何在阿里云上基于 Kafka 实现消息推送,本文给大家解锁这一新的姿势. 背景 消息队列 for Apache Kafka 是阿里云 ...

最新文章

  1. mysql replication延迟_MySQL Replication--复制延迟01--源码瞎猜
  2. Luogu P1087 FBI树
  3. 非法ip通过ssh成功登录,自动结束会话
  4. 项目: 图片放大缩小。
  5. Android 源码查看和解析
  6. corosync和pacemaker实现httpd和mysql双集群
  7. char添加一个字符_给你五十行代码把图片变成字符画!程序:太多了,一半都用不完...
  8. linux 进程通信 消息队列
  9. 本地运行vue.js项目,如何更改调试的默认端口?
  10. 漫步线性代数二十三——行列式公式
  11. (一)使用MLOps自动训练、测试和部署AI:概述
  12. hbase 核心知识
  13. U盘引导启动LINUX
  14. 国稻种芯百团计划行动 丰收节贸促会·黎志康:惠及亚非18国家
  15. Vue(狂神学习笔记)2021-10-8
  16. wb在计算机知识里是什么意思,计算机二级考试内容是什么?
  17. php中的 notice,PHP中出现Notice: Undefined index的三种解决办法
  18. oppo手机忘记解锁图案怎么办
  19. Axure8下载和安装
  20. 如何在5天内学会Vue?聊聊我的学习方法!

热门文章

  1. Windows Tftpd32 DHCP服务器 使用
  2. [LeetCode]题解(python):086-Partition List
  3. 找DB2存储过程的package
  4. 精美jQuery插件及源码 前端开发福利
  5. 登录过gnome主题后无法再登录xfce主题
  6. Java常用spark的pom.xml与读取csv为rdd到最终join操作+java常用pom.xml文件
  7. 真实集群中Flink命令行各种模式提交wordcount
  8. 模态识别分析:随机减量法/ITD法
  9. StarUML 逆向工程插件加载失败问题解决
  10. C++自定义非极大值抑制(Canny边缘检测,亚像素方法)