背景

暖心同学突然跟我说想要获取线上所有的Redis的key的大小信息,就是想知道redis中所有对应Key的大小信息(线上使用的redis存储的信息基本统一而且没有其他复杂的如set等数据结构),让我帮忙来解决一下这个问题。听到这个问题之后,我对着电脑深吸一口气,表面表现出嗯嗯这个问题有点复杂,我需要整理一下思绪。

内心早就想着这个问题好像不难吧,抄起电脑吭哧吭哧几行代码不就搞定了嘛,哈哈哈。

Redis心花路放第一版

import redis
import timerip = "192.168.10.205"
rport = "6379"def wrapper_time(func):def wrapper(*args, **kwargs):start = time.time()r = func(*args, **kwargs)end = time.time()print("finish use time {0} second".format(end-start))return rreturn wrapper@wrapper_time
def t_redis():pool = redis.ConnectionPool(host=rip, port=rport, socket_connect_timeout=3)r = redis.Redis(connection_pool=pool)total_count = 0for key in r.scan_iter(count=5000):total_count += 1keyMemUsage = int(r.memory_usage(key))print(key, keyMemUsage)print("total  count  {0}".format(total_count))if __name__ == '__main__':t_redis()

快乐就是这么简单,python点读机哪里不会点哪里,心中窃喜,有时候问题就是这么简单。本地运行一下一切正常,输出如下;

('mylist', 421132)
('key_9fe99777-97df-4bb8-b2e9-c1a7f525d5d7', 122)
('a', 48)
('_kombu.binding.celeryev', 907)
('_kombu.binding.celery.pidbox', 276)
('myset:__rand_int__', 100)
('key_e76a94ac-cd9f-438c-9459-ca4711260451', 122)
('key_66afcfba-fccd-4bf4-8dc2-fb0d99ba532d', 122)
('_kombu.binding.celery', 235)
('key:__rand_int__', 65)
('counter:__rand_int__', 64)
('key_31a63d80-a029-400f-8634-4c64fa679b2d', 122)
('key_c3cea6f9-2e29-4bce-a31b-d58c36fb597d', 122)
total  count  13
finish use time 0.0105438232422 second

我拿着这段代码,给到同事时,同事说这个key的数量不是13个,所有的实例加在一起总共的key的数量大概有十几亿个。what???

再激动我也要保持自己自信的微笑,拿回来继续优化一波。

在本地测试一下,在上述代码中,如果不执行r.memory_usage这个函数来看一下只是遍历一下所有的key的时间消耗。此时测试的redis的key的数量已经增加到大概五百二万左右。

import redis
import timerip = "192.168.10.205"
rport = "6379"def wrapper_time(func):def wrapper(*args, **kwargs):start = time.time()r = func(*args, **kwargs)end = time.time()print("finish use time {0} second".format(end-start))return rreturn wrapper@wrapper_time
def t_redis():pool = redis.ConnectionPool(host=rip, port=rport, socket_connect_timeout=3)r = redis.Redis(connection_pool=pool)total_count = 0for key in r.scan_iter(count=5000):total_count += 1# keyMemUsage = int(r.memory_usage(key))# print(key, keyMemUsage)print("total  count  {0}".format(total_count))if __name__ == '__main__':t_redis()

终端输出如下;

total  count  5271392
finish use time 82.052093029 second

总共花了82秒就完成了所有的Key的遍历,总共大约有520万个key被遍历。

那我们还是依照第一版的redis代码来执行遍历这520万个key,看遍历完成需要多久。

刚刚开始的时候

大概阅读完两会概要,看完了最新的技术文章之后。

在听了无数遍你笑起来真好看之后;

都不知道过了多久之后,我关掉了这个一直在输出的脚本。问题在哪里,问题出在哪里。

简单的几行代码,里面就出现了scan_iter和r.memory_usage这两个比较可能出现性能问题的地方。

# scan_iterdef scan_iter(self, match=None, count=None):"""Make an iterator using the SCAN command so that the client doesn'tneed to remember the cursor position.``match`` allows for filtering the keys by pattern``count`` allows for hint the minimum number of returns"""cursor = '0'while cursor != 0:cursor, data = self.scan(cursor=cursor, match=match, count=count)for item in data:yield itemdef scan(self, cursor=0, match=None, count=None):"""Incrementally return lists of key names. Also return a cursorindicating the scan position.``match`` allows for filtering the keys by pattern``count`` allows for hint the minimum number of returns"""pieces = [cursor]if match is not None:pieces.extend([b'MATCH', match])if count is not None:pieces.extend([b'COUNT', count])return self.execute_command('SCAN', *pieces)

看了这两行代码之后再查看了一下scan这个命令基本上排除了这个scan命令会严重影响性能。于是继续查看memory_usage

    def memory_usage(self, key, samples=None):"""Return the total memory usage for key, its value and associatedadministrative overheads.For nested data structures, ``samples`` is the number of elements tosample. If left unspecified, the server's default is 5. Use 0 to sampleall elements."""args = []if isinstance(samples, int):args.extend([b'SAMPLES', samples])return self.execute_command('MEMORY USAGE', key, *args)

看了代码之后,每一次的获取key大小都会调用去发送一条命令到redis服务器然后等待接收返回的结果,一切尽在这里面。

看到这里的第一个解决方法便是通过pipeline来解决这个问题。于是继续修改。

Redis幡然醒悟第二版

import redis
import timerip = "192.168.10.205"
rport = "6379"def wrapper_time(func):def wrapper(*args, **kwargs):start = time.time()r = func(*args, **kwargs)end = time.time()print("finish use time {0} second".format(end-start))return rreturn wrapper@wrapper_time
def t_redis(pipe_size=1000):pool = redis.ConnectionPool(host=rip, port=rport, socket_connect_timeout=3)r = redis.Redis(connection_pool=pool)pipe = r.pipeline()pipe_count = 0total_count = 0keys = []for key in r.scan_iter(count=5000):pipe_count += 1total_count += 1if pipe_count < pipe_size:pipe.memory_usage(key)keys.append(key)continueelse:pipe.memory_usage(key)result = pipe.execute()pipe_count = 0for i, v in enumerate(result):keyMemUsage = int(v)keys = []if keys:result = pipe.execute()pipe_count = 0for i, v in enumerate(result):keyMemUsage = int(v)print("total  count  {0}".format(total_count))if __name__ == '__main__':t_redis()

等了良久之后输出如下;

total  count  5271392
finish use time 254.994492054 second

遍历完成五百多万的key需要大约254秒,相对大约82秒就能遍历完成所有key,感觉我是不是还能在抢救一下呢,主要就是scan_iter快,memory_usage慢一些。

import redis
import time
import Queue
import threadingrip = "192.168.10.205"
rport = "6379"def wrapper_time(func):def wrapper(*args, **kwargs):start = time.time()r = func(*args, **kwargs)end = time.time()print("finish use time {0} second".format(end-start))return rreturn wrapper@wrapper_time
def t_redis(pipe_size=1000):pool = redis.ConnectionPool(host=rip, port=rport, socket_connect_timeout=3)r = redis.Redis(connection_pool=pool)pipe_count = 0total_count = 0keys = []start_time = time.time()workQueue = Queue.Queue()def work_execute():work_count = 0while True:try:keys = workQueue.get(timeout=3)except Exception as e:print("get exeption  {0}".format(e))continueif keys is None or not keys:end_time = time.time()print("exist {0}  count {1}".format(end_time - start_time, work_count))returnpipe = r.pipeline()for k in keys:pipe.memory_usage(k)try:result = pipe.execute()except Exception as e:print("get execute error {0}".format(e))returnfor key_usage in result:work_count += 1keyMemUsage = int(key_usage)t = threading.Thread(target=work_execute)t.start()for key in r.scan_iter(count=5000):pipe_count += 1total_count += 1if pipe_count < pipe_size:keys.append(key)continueelse:keys.append(key)workQueue.put([k for k in keys])pipe_count = 0keys = []if keys:workQueue.put([k for k in keys])workQueue.put("")print("total  count  {0}".format(total_count))if __name__ == '__main__':t_redis()

静静等待输出结果;

total  count  5271392
finish use time 90.5585868359 second
exist 243.682749033  count 5271392

从scan_iter来看,该函数还是保持一贯的水准,差不多80秒就迭代完成,然后就通过一个线程来执行memory_usage函数,从执行结果来看一个线程的消费只比没修改之前快了大概十几秒,场面有点尴尬。

继续改改看;

import redis
import time
import Queue
import threadingrip = "192.168.10.205"
rport = "6379"def wrapper_time(func):def wrapper(*args, **kwargs):start = time.time()r = func(*args, **kwargs)end = time.time()print("finish use time {0} second".format(end-start))return rreturn wrapper@wrapper_time
def t_redis(pipe_size=1000):pool = redis.ConnectionPool(host=rip, port=rport, socket_connect_timeout=3)r = redis.Redis(connection_pool=pool)pipe_count = 0total_count = 0keys = []start_time = time.time()workQueue = Queue.Queue()numThreads = 5threads = []def work_execute():work_count = 0while True:try:keys = workQueue.get(timeout=3)except Exception as e:print("get exeption  {0}".format(e))continueif keys is None or not keys:end_time = time.time()print("exist {0}  count {1}".format(end_time - start_time, work_count))returnpipe = r.pipeline()for k in keys:pipe.memory_usage(k)try:result = pipe.execute()except Exception as e:print("get execute error {0}".format(e))returnfor key_usage in result:work_count += 1keyMemUsage = int(key_usage)for i in range(numThreads):t = threading.Thread(target=work_execute)t.start()threads.append(t)for key in r.scan_iter(count=5000):pipe_count += 1total_count += 1if pipe_count < pipe_size:keys.append(key)continueelse:keys.append(key)workQueue.put([k for k in keys])pipe_count = 0keys = []if keys:workQueue.put([k for k in keys])RUNNING = Truewhile RUNNING:RUNNING = Falsefor t in threads:if t.isAlive():workQueue.put("")RUNNING = Truetime.sleep(0.5)print("total  count  {0}".format(total_count))if __name__ == '__main__':t_redis()

输出结果如下;

exist 218.543583155  count 1058000
exist 218.544660091  count 1049000
exist 218.566720009  count 1054392
exist 218.714774132  count 1056000
exist 218.821619987  count 1054000
total  count  5271392
finish use time 218.969571114 second

看见结果大约218秒,提高了大概二十多秒,相对而言有点提高,心里美滋滋;

回顾这一段路程,都是顺着常规的性能解决方案来一步步实现,但是在本例中最主要的场景终归是网络IO的问题,想到这里我也陷入了沉思。

Redis悠扬小道第三版

使用Python带有的相关的协程相关来尝试解决一下这个问题,推荐一下aioredis这个异步的redis库,基于Python3版本的异步库。继续征程。

import asyncio
import aioredis
import timedef t_redis():async def go():redis = await aioredis.create_redis_pool('redis://192.168.10.205')cursor = '0'work_count = 0async def scan_iter(count=5000):nonlocal work_countcursor = "0"while cursor != 0:cursor, data = await redis.scan(cursor=cursor, count=count)if len(data):work_count += len(data)await scan_iter()print("total count key {0}".format(work_count))redis.close()await redis.wait_closed()start = time.time()asyncio.run(go())end = time.time()print("finish use time {0} second".format(end-start))if __name__ == '__main__':t_redis()

输出结果如下;

total count key 5271392
finish use time 28.971981048583984 second

通过与第一个版本遍历所有的key的时间大约80秒左右相比,提升了一大步,那我们继续使用memory_usage加入其中看是否有性能的提升。

然鹅在一脸懵圈的文档中没有看见支持memory usage这个命令,

不能认输,都到这里了不能认输,在对比了redis库中解析memory usage的相关代码之后,在研究一下aioredis的execute的命令解释到byte的过程,修改一下代码如下;

import asyncio
import aioredis
import timedef t_redis():async def go():redis = await aioredis.create_redis_pool('redis://192.168.10.205')cursor = '0'work_count = 0async def scan_iter(count=5000):nonlocal work_countcursor = "0"while cursor != 0:cursor, data = await redis.scan(cursor=cursor, count=count)if len(data):work_count += len(data)for k in data:r = await redis.execute(b"MEMORY", *["USAGE", k])# print(k, r)await scan_iter()print("total count key {0}".format(work_count))redis.close()await redis.wait_closed()start = time.time()asyncio.run(go())end = time.time()print("finish use time {0} second".format(end-start))if __name__ == '__main__':t_redis()

运行一下,我发现我没有等到它停止的时候;

应该是execute创建的事件太多,导致效率不高,接下来如果要改进的话,可以考虑通过pipeline来改进或者启动子进程来进行单独的一个事件循环来进行处理,由于aioredis本身没有支持memory usage 并且在pipeline的过程中初步了解还是用的协程包装,故没有往这个方向尝试转而使用了多线程方式;

import asyncio
import aioredis
import time
import queue
import threadingwork_queue = queue.Queue()
start_time = time.time()def worker():async def go():redis = await aioredis.create_redis('redis://192.168.10.205')while True:try:keys = work_queue.get(timeout=3)except Exception as e:print("get exeption  {0}".format(e))continueif keys is None or not keys:end_time = time.time()print("exist {0}  count {1}".format(end_time - start_time))returnr = await redis.execute(b"MEMORY", *["USAGE", keys])print(r)asyncio.run(go())t = threading.Thread(target=worker)
t.start()def t_redis():async def go():redis = await aioredis.create_redis('redis://192.168.10.205')cursor = '0'work_count = 0async def scan_iter(count=5000):nonlocal work_countcursor = "0"pipe_count = 0while cursor != 0:cursor, data = await redis.scan(cursor=cursor, count=count)if len(data):work_count += len(data)for k in data:work_queue.put(k)pipe_count += 1print(work_count)await scan_iter()print("total count key {0}".format(work_count))work_queue.put("")redis.close()await redis.wait_closed()start = time.time()asyncio.run(go())end = time.time()print("finish use time {0} second".format(end-start))if __name__ == '__main__':t_redis()

好吧,我没有认识到自己的错误,这段代码也注定是漫长的等待,现在问题要么重新详细查看一下aioredis的memory usage相关的pipeline的封装,就算多个线程多个loop也无法搞定这么多的单个memory的查询;

难道我和Redis这段相恋相杀的纠葛到此结束了吗!!!

Redis奋力一搏第四版

通过前三部曲的探索,我放下了我的倔强,一搏操作完成之后,在现阶段有限的时间里,我好像只能看出如下的最后的招数来化解这个尴尬的问题。

import os
import time
from multiprocessing import Pool, Manager
import asyncioimport aioredis
import redisstart_time = time.time()def consumer(queue):pipe_count = 0work_count = 0pipe_size = 1000pool = redis.ConnectionPool(host="192.168.10.205", port="6379", socket_connect_timeout=3)r = redis.Redis(connection_pool=pool)pipe = r.pipeline()print('Run task (%s)...' % (os.getpid()))while True:try:keys = queue.get(timeout=3)except Exception as e:print(" queue get error  {0}".format(e))time.sleep(3)if queue.qsize() == 0:end_time = time.time()print("exist {0}  count {1}  ".format(end_time - start_time, work_count))returncontinueif keys is None or not keys:end_time = time.time()print("exist {0}  count {1}".format(end_time - start_time, work_count))returnstore_keys = []for key in keys:pipe_count += 1key_decode = key.decode("utf-8")store_keys.append(key_decode)pipe.memory_usage(key_decode)if pipe_count < pipe_size:if key != keys[-1]:continuetry:result = pipe.execute()except Exception as e:print("get execute error {0}".format(e))returnpipe_count = 0for i, key_usage in enumerate(result):work_count += 1keyMemUsage = int(key_usage)# print(store_keys[i], keyMemUsage)store_keys = []def producer(queue):async def go():redis = await aioredis.create_redis('redis://192.168.10.205')work_count = 0async def scan_iter(count=5000):nonlocal work_countcursor = "0"while cursor != 0:cursor, data = await redis.scan(cursor=cursor, count=count)if len(data):work_count += len(data)queue.put(data)# print(work_count)await scan_iter()print("total count key {0}".format(work_count))redis.close()await redis.wait_closed()start = time.time()asyncio.run(go())end = time.time()print("finish use time {0} second".format(end-start))while queue.qsize():print(queue.qsize())time.sleep(1)print("produce end")if __name__=='__main__':queue = Manager().Queue()print('Parent process %s.' % os.getpid())p = Pool(6)num_worker = 5for i in range(num_worker):p.apply_async(consumer, args=(queue,))p.apply_async(producer, args=(queue, ))print('Waiting for all subprocesses done...')p.close()p.join()print('All subprocesses done.')

利用一个进程池来完成这个任务,选用aioredis来获取所有的key,然后通过pipeline的进程池来获取key的大小。

Parent process 53200.
Waiting for all subprocesses done...
Run task (53204)...
Run task (53203)...
Run task (53202)...
Run task (53205)...
Run task (53206)...
total count key 5271392
finish use time 84.6968948841095 second
150
138
120
101
86
68
53
37
20
4
produce endqueue get error  queue get error  queue get error  queue get error  queue get error
exist 99.95775818824768  count 1060055
exist 100.00523614883423  count 961136
exist 100.05303382873535  count 1075066
exist 100.10038113594055  count 1065064
exist 100.10428404808044  count 1110071
All subprocesses done.

现在运行完成大概需要100秒左右的时间,因为真正执行脚本的机器的核心数大于6的,内存消耗也够用,结束吧,就用当前这个折中的办法吧。

总结

本文只是简单记录了一下这个过程而已,其实这当中还有好多事情可以去深入探索,因为时间紧迫的原因就没有再深入探索aioredis对应的memory usage如何使用pipeline来进行执行,不过aio来遍历所有的key的效率确实要好一些,因为都是局限于python相关的技术栈,也可以考虑一下通过golang来实现一下看golang实现的效率表现如何。由于本人才疏学浅,如有错误请批评指正。

遍历百万级Redis的键值的曲折经历相关推荐

  1. 遍历百万级Redis的键值的续集

    背景 在完成脚本Redis的key的遍历脚本之后,原以为事情就这么过去了,在同事试用脚本之后,拿了一个线上的集群做了测试,响应速度非常满意,觉得不错但是qps过高担心影响线上业务.于是我查看了测试环境 ...

  2. 遍历百万级Redis的键值的大结局

    背景 上次改完利用条件变量的形式来进行rdbtool和socket接受的数据联合分析,我再想能不能通过协程来实现避免条件变量这种调用系统调用的方式,当然如果算一下因为每一次接受的socket的数据都尽 ...

  3. redis中键值出现 \xAC\xED\x00\x05t\x00\x11的原因和解决方法

    一.redis中键值出现乱码情况 1.1 问题描述 1.1.1 使用SpringBoot项目结合redis做缓存,发现redis客户端工具中db0库key为USER_USER_ID_1000的前缀出现 ...

  4. Redis 大键值对 Big Key 懒惰删除机制

    一.懒惰删除介绍 在删除元素数量很多的集合(set/hash/list/sortedSet)时,无论是使用DEL命令删除还是redis为了释放内存空间而进行的删除,在删除这些big key的时候,会导 ...

  5. java redis存储键值包含\xac\xed\x00\x05t\x00\特殊字符

    java RedisTemplate操作redis后,想看一下是否成功, 就redis-cli执行:keys * "\xac\xed\x00\x05t\x00\x04name" & ...

  6. redis常用操作2, redis操作键值, redis安全设置

    string数据 127.0.0.1:6379> setnx k1 888 #键存在,setnx检测到,不会覆盖: (integer) 0 127.0.0.1:6379> get k1 & ...

  7. 03,redis多键值对,哈希散列hset

    // 客户端Jedis连接到服务端,并选择第2个数据库Jedis jedis = new Jedis("127.0.0.1",6379);jedis.select(1);jedis ...

  8. JS 遍历JSON对象中的键值对

    对象:一组无序属性的集合,属性的值可以是任意的类型: json也是对象,数据都是成对的,也就是键值对: json实际上就是一组格式化后的字符串数据. 遍历JSON对象中的数据,可通过for-in循环实 ...

  9. 5.1.8 NoSQL数据库-Redis(键值key-value)-Redis配置详解

    目录 1.写在前面 2.具体信息 2.1 单位 2.2 包含 2.3 网络 2.4 通用 GENERAL 2.5 快照 2.6 REPLICATION 主从复制 2.7 SECURITY 安全 2.8 ...

最新文章

  1. 用Delphi设计能携带附件的EMail
  2. SDL 库 无法解析的外部符号 __imp__fprintf
  3. 第5章 Python 数字图像处理(DIP) - 图像复原与重建14 - 逆滤波
  4. .NET Core开源行动:一键创建Excel Add-in
  5. Linux下git使用
  6. android gridview拖动排序,Asp.net GridView 拖拽排序    原创(欢迎拍砖,敬请嘴下留情!)...
  7. Maven学习总结(53)——利用Maven插件构建镜像进行持续交付中的版本号管理
  8. Wine 4.4 发布,Windows 应用的兼容层
  9. [转]Using Entity Framework (EF) Code-First Migrations in nopCommerce for Fast Customizations
  10. 【实物】端到端自动驾驶搭建教程(二)附完整资料
  11. 学习.net 2.0需要讲究一下策略
  12. css实现返回顶部,实现返回顶部效果
  13. Qt播放常见视频格式的方法
  14. python自动点击网页按钮_python网页自动化操作
  15. 纯js轮播图练习-3,类似于淘宝海报带小圆点轮播图
  16. 1050Ti 安装CUDA、cuDNN
  17. 学生党无线蓝牙耳机推荐哪个,2022口碑最好的蓝牙耳机推荐
  18. 判断一个轮廓是否为圆形
  19. web靶场 --- sqli-labs
  20. Linux下多个进程可以同时打开同一个文件吗?文件描述符与打开文件的关系?

热门文章

  1. 5个短小精悍的 Python 趣味脚本,太赞了,非常适合小白上手!
  2. 张口闭口就是焦虑,现在的程序员怎么了?
  3. 百度携手同济大学,瞄准AI、智慧交通等核心科技领域攻关
  4. 用韩信三技能,讲清楚一致性哈希
  5. 全国大学生数学建模竞赛中,哈工大被禁用MATLAB
  6. 开发者在行动!中国防疫开源项目登上GitHub TOP榜
  7. 亚马逊机器学习服务:深入研究AWS SageMaker
  8. 机器学习模型五花八门不知道怎么选?这份指南告诉你
  9. 我在旷视研究院做检测 | 技术头条
  10. 2019秋招AI岗位竞争究竟有多激烈?