目录

1.背景

2.中间件

1)zookeeper

2)kafka

3)elasticsearch

3.参考资料


1.背景

最近的脚本中需要使用Python操作中间件(zookeeper/ kafka/ elastichsearch),之前没有使用过,所以度娘上到处查资料,这里记录一下常用方法,方便以后使用,也希望其他人遇到时能方便查找

2.中间件

1)zookeeper

lib version
zookeeper 3.7.0
kazoo 2.8.0
# 连接
try:zk = KazooClient(hosts=['127.0.0.1:2181'])zk.start()# 获取节点# znode_path 节点。例如,"/"result = zk.get_children(znode_path)zk.stop()
except KazooTimeoutError as e:print(e.args[0])

2)kafka

lib version
kafka 2.3.0
zookeeper 3.4.5
kafka-python 2.0.2
  • 连接
client = KafkaAdminClient(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'])
  • 查询TOPIC列表
topics_list = client.list_topics()
  • 查询TOPIC详情
topic_dict = client.describe_topics()
  • 生产消息
# 连接
producer = KafkaProducer(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], retries=3, api_version=(0, 10, 2))# 发送多条消息
for i in range(0, 5):k = bytes("k" + str(i), encoding='utf-8')v = bytes("v" + str(i), encoding='utf-8')    producer.send(topic_name, key=k, value=v)# 刷新
producer.flush()# 关闭连接
producer.close()
  • 消费消息
# 连接
# group_id随机产生一串英文+数字的字符串即可consumer = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], consumer_timeout_ms=1000)# 订阅消息
consumer.subscribe(topics=[topic_name])tp = TopicPartition(topic_name, 0)# 消费到最后一条消息则退出,否则代码中会一直等待
for message in consumer:if message.offset == consumer.end_offsets([tp])[tp] - 1:break# 关闭连接
consumer.close()

3)elasticsearch

lib version
ElasticSearch 7.4.2& 7.6.0
elasticsearch 7.6.0
  • 连接
conn = Elasticsearch(["http://127.0.0.1:9200"], http_auth=('elastic', '123456'))
  • 测试连接
conn.ping()
  • 索引的操作
# 判断索引是否存在
conn.indics.exists(index_name)# 获取索引信息
conn.indics.get(index_name)# 写入数据
for i in range(0, 5):data = {"key": "k" + str(i),"value": i}conn.index(index=index_name, body=data)# 查询数据
1)
body = {'query': {'prefix': {'key.keyword': 'k'        # 匹配前缀}},'size': 10
}filter_path = ['hits.hits._source.key','hits.hits._source.value']    # 展示出的字段
result = conn.search(index=index_name, filter_path=filter_path, body=body)2)
body = {'query': {'term': {        # 匹配整个字串'key.keyword': 'k1'}},'size': 10
}result = conn.search(index=index_name, filter_path=filter_path, body=body)# result:找到(True)/ 未找到(False)# 修改数据
body = {'doc': {'key': 'k2','value': 2}
}conn.update(index=index_name, id=1, body=body)# 删除数据
all_data = conn.search(index=index_name)
hits = all_data['hits']['hits']
for item in hits:conn.delete(index=index_name, id=item['_id'])# 批量写入数据
data_list = []
for i in range(100):body = {'_op_type': 'create','_index': index_name,'_type': 'doc','_id': str(i),'_source': {'key': 'k' + str(i), 'value': i}}data_list.append(body)
result = helpers.buld(conn, data_list)# 一种批量修改数据
data_list = []
for i in range(100):body = {'_op_type': 'index','_index': index_name,'_type': 'doc','_id': str(i),'_source': {'key': 'k' + str(i), 'value': int(i+1)}}data_list.append(body)
result = helpers.buld(conn, data_list)# 另一种批量修改数据
data_list = []
for i in range(100):body = {'_op_type': 'update','_index': index_name,'_type': 'doc','_id': str(i),'_source': {'key': 'k' + str(i), 'value': int(i+2)}}data_list.append(body)
result = helpers.buld(conn, data_list)# 批量删除数据
data_list = []
for i in range(100):body = {'_op_type': 'delete','_index': index_name,'_type': 'doc','_id': str(i),'_source': {'key': 'k' + str(i), 'value': int(i+2)}}data_list.append(body)
result = helpers.buld(conn, data_list)

3.参考资料

zookeeper:使用python操作zookeeper_Lucky@Dong的博客-CSDN博客_python zookeeper

python操作kafka - 是阿凯啊 - 博客园

Python3操作Kafka - 慕夏一缕风 - 博客园

python操作elasticsearch_Python热爱者的博客-CSDN博客_python 操作elasticsearch

Pytest操作中间件相关推荐

  1. 基于RMI技术的数据库操作中间件设计 综合实践报告

    前言 1.1  实践目的和要求 为了将理论用于实践,巩固所学知识,提高自己发现问题并用所学知识分析问题和解决问题的能力,锻炼自己的工作能力,适应社会能力,自我管理能力,了解目前软件的应用情况,需求情况 ...

  2. 太强了!这款轻量级的数据库中间件完美解决了SpringBoot中分库分表问题

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:Macky_He blog.csdn.net/Macky_H ...

  3. php实现中间件6,说一说ThinkPHP6中五花八门的中间件_PHP开发框架教程

    thinkphp配置配置多应用多配置的方法_PHP开发框架教程 一般的thinkphp框架一般都是单模块开发的,但有时候我们可能需要进行多模块开发,本文就来为大家介绍一下thinkphp配置多模块.多 ...

  4. 数据库——MySQL分库分表的演进和实践以及中间件的比较

    1.了解几个问题? 1.分库分表相关术语 读写分离: 不同的数据库,同步相同的数据,分别只负责数据的读和写: 分区: 指定分区列表达式,把记录拆分到不同的区域中(必须是同一服务器,可以是不同硬盘),应 ...

  5. Spring Boot集成Sharding-jdbc + Mybatis-Plus实现分库分表

    来源:https://blog.csdn.net/Macky_He/article/details/95754402 作者:Macky_He 一. Sharding-jdbc简介 Sharding-j ...

  6. jdbc与hibernate的优缺点比较

    jdbc与hibernate的优缺点比较 一. Hibernate是JDBC的轻量级的对象封装,它是一个独立的对象持久层框架,和App Server,和EJB没有什么必然的联系.Hibernate可以 ...

  7. idea启动springboot卡_写给新手看的 Spring Boot 入门学习指南

    什么是 Spring Boot ? 解释一下:Spring Boot 可以构建一切.Spring Boot 设计之初就是为了最少的配置,最快的速度来启动和运行 Spring 项目.Spring Boo ...

  8. 详解JDBC与Hibernate区别

    详解JDBC与Hibernate区别 引用地址:http://www.cnblogs.com/JemBai/archive/2011/04/13/2014940.html 刚开始学习JAVA时,认为H ...

  9. nodejs 框架 中文express 4.xxx中文API手册

       介于最近express 中文文档比较难找的现状,特地找了一个,供大家学习思考 Express 4.x API express 翻译 api文档 中文 -- express() express() ...

最新文章

  1. 程序员javascript写乒乓球,和机器人练技术!没赢过!
  2. 知识库问答中的关系识别研究回顾
  3. mybatis plus实现多表分页条件查询
  4. python小测试1答案_测试1:Python 基本语法(选择题
  5. 小P寻宝记——好基友一起走
  6. maven 打包失败 提示找不到jar的问题
  7. 概率就是个冷冰冰的坑
  8. 计算机毕业设计JAVA房屋租赁系统mybatis
  9. C/C++经典项目开发:教你破解Windows系统密码,手把手教你做解密项目
  10. php怎么发ddos包,解决服务器上通过PHP代码DDOS的方法
  11. 基于STM32F4实现FOC(磁场定向控制)一:电流采样和波形产生
  12. java deff_java – 在配置中添加时出现Spring NoClassDefF...
  13. 帆软按钮控件变查询_JS使用填报页面的控件查询
  14. CSS属性前的 -webkit, -moz
  15. PESniffer4PEiD plugin from NEOx's 0depts by Slip
  16. 关于一个简单函数方程问题的深入探究
  17. Vue之鼠标悬停显示页面加载时间
  18. 用Python做雷霆战机小游戏【附素材+源码】
  19. 电涡流传感器线性灵敏度
  20. 怎么让手机和电脑处于一个局域网 电脑本机发布的项目(非阿里云发布),怎么让手机也可以访问电脑发布的项目

热门文章

  1. [GBase 8s 教程]GBase 8s 运算符/函数
  2. Cent OS 7.7 搭建蓝鲸智云社区版5.1.27(2)——标准部署
  3. 蓝鲸智云-腾讯给广大运维工作者的福利
  4. Apache Camel 了解一下?
  5. Linux 中用 dd 命令来测试硬盘读写速度
  6. 用isalpha函数来判断一个字符串中的字符是否是字母
  7. Python发送邮件的类
  8. OPenCV4-颜色识别(一)调色板和简单的颜色识别
  9. 转:使用DOS命令chcp查看windows操作系统的默认编码以及编码和语言的对应关系
  10. Win7 英文专业版安装中文包汉化后部分软件中文乱码问题处理