背景

在完成脚本Redis的key的遍历脚本之后,原以为事情就这么过去了,在同事试用脚本之后,拿了一个线上的集群做了测试,响应速度非常满意,觉得不错但是qps过高担心影响线上业务。于是我查看了测试环境的qps之后发现遍历五百万key的时候,qps会非常高。

redis-cli -p 6379 -r 100 -i 1 info|grep ops
instantaneous_ops_per_sec:104245
instantaneous_ops_per_sec:106227
instantaneous_ops_per_sec:109910
instantaneous_ops_per_sec:105947
instantaneous_ops_per_sec:107111
instantaneous_ops_per_sec:104354
instantaneous_ops_per_sec:103974
instantaneous_ops_per_sec:106596
instantaneous_ops_per_sec:97469
instantaneous_ops_per_sec:99605
instantaneous_ops_per_sec:98182
instantaneous_ops_per_sec:100320
instantaneous_ops_per_sec:99811
instantaneous_ops_per_sec:94457
instantaneous_ops_per_sec:103132
instantaneous_ops_per_sec:110565
instantaneous_ops_per_sec:104259
instantaneous_ops_per_sec:93459
instantaneous_ops_per_sec:95340
instantaneous_ops_per_sec:99658
instantaneous_ops_per_sec:106938
instantaneous_ops_per_sec:109205

从采样的数据可用看见qps最高大概有十万左右,在测试环境中大概整个脚本跑完花了八十秒左右,测试环境(还是那个八核,6G的虚拟机),但是此时来查看一下虚拟机的性能指标,

 5556 root      20   0 1527188  41608   2144 R  95.7  0.3   0:31.81 python5559 root      20   0 1527180  41556   2144 R  95.3  0.3   0:31.79 python5558 root      20   0 1527184  41576   2144 R  95.0  0.3   0:31.73 python5561 root      20   0 1527212  41592   2144 R  95.0  0.3   0:31.90 python5554 root      20   0 1527112  41580   2144 R  94.7  0.3   0:31.89 python4224 root      20   0  778136 501036   1032 R  38.5  3.1 182:47.31 redis-server

运行过程中,redis-server的cpu利用率其实都不算高,内存利用率也不算高,但是qps确非常的高。根据同事的反馈如果线上的qps在三四万的时候,运行qps的采样程序也会有些许卡顿并且CPU利用率是飙升,并且使用方就会把redis的访问时间太长的告警。但是在测试环境测试的时候,采样程序并无卡顿,cpu占用率也不算高,这不仅引起了我的思考,这是为什么。

追根问底-Redis的qps计数机制

本文就讲究看着5.0.4的代码吧,首先找到了info命令的具体过程;

void infoCommand(client *c) {char *section = c->argc == 2 ? c->argv[1]->ptr : "default";if (c->argc > 2) {addReply(c,shared.syntaxerr);return;}addReplyBulkSds(c, genRedisInfoString(section));  // 直接就是获取redis的info信息
}sds genRedisInfoString(char *section) {.../* Stats */if (allsections || defsections || !strcasecmp(section,"stats")) {if (sections++) info = sdscat(info,"\r\n");info = sdscatprintf(info,"# Stats\r\n""total_connections_received:%lld\r\n""total_commands_processed:%lld\r\n""instantaneous_ops_per_sec:%lld\r\n""total_net_input_bytes:%lld\r\n""total_net_output_bytes:%lld\r\n""instantaneous_input_kbps:%.2f\r\n""instantaneous_output_kbps:%.2f\r\n""rejected_connections:%lld\r\n""sync_full:%lld\r\n""sync_partial_ok:%lld\r\n""sync_partial_err:%lld\r\n""expired_keys:%lld\r\n""expired_stale_perc:%.2f\r\n""expired_time_cap_reached_count:%lld\r\n""evicted_keys:%lld\r\n""keyspace_hits:%lld\r\n""keyspace_misses:%lld\r\n""pubsub_channels:%ld\r\n""pubsub_patterns:%lu\r\n""latest_fork_usec:%lld\r\n""migrate_cached_sockets:%ld\r\n""slave_expires_tracked_keys:%zu\r\n""active_defrag_hits:%lld\r\n""active_defrag_misses:%lld\r\n""active_defrag_key_hits:%lld\r\n""active_defrag_key_misses:%lld\r\n",server.stat_numconnections,server.stat_numcommands,getInstantaneousMetric(STATS_METRIC_COMMAND),  // 获取qpsserver.stat_net_input_bytes,server.stat_net_output_bytes,(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,server.stat_rejected_conn,server.stat_sync_full,server.stat_sync_partial_ok,server.stat_sync_partial_err,server.stat_expiredkeys,server.stat_expired_stale_perc*100,server.stat_expired_time_cap_reached_count,server.stat_evictedkeys,server.stat_keyspace_hits,server.stat_keyspace_misses,dictSize(server.pubsub_channels),listLength(server.pubsub_patterns),server.stat_fork_time,dictSize(server.migrate_cached_sockets),getSlaveKeyWithExpireCount(),server.stat_active_defrag_hits,server.stat_active_defrag_misses,server.stat_active_defrag_key_hits,server.stat_active_defrag_key_misses);}...return info;
}/* Return the mean of all the samples. */
long long getInstantaneousMetric(int metric) {int j;long long sum = 0;for (j = 0; j < STATS_METRIC_SAMPLES; j++)sum += server.inst_metric[metric].samples[j];  // 统计最近十六次的平均值return sum / STATS_METRIC_SAMPLES;
}

现在代码看到这里了就大致知道server保存了最近一个十六的数组,然后把所有值取平均就是qps,那这个inst_metric是在哪里生成的呢?就在redis自带的定时任务中执行每一百毫秒执行一次;

 // 位于server.c的sererCron函数中run_with_period(100) {                                                         // 通过loop执行的次数来判断是否执行或者小于一个执行hz的时间,从而扩展到指定的时间长度饿回调trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);    // 每次都运行采样数据并保存trackInstantaneousMetric(STATS_METRIC_NET_INPUT,server.stat_net_input_bytes);trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,server.stat_net_output_bytes);}/* Add a sample to the operations per second array of samples. */
void trackInstantaneousMetric(int metric, long long current_reading) {long long t = mstime() - server.inst_metric[metric].last_sample_time;long long ops = current_reading -server.inst_metric[metric].last_sample_count;long long ops_sec;ops_sec = t > 0 ? (ops*1000/t) : 0;server.inst_metric[metric].samples[server.inst_metric[metric].idx] =ops_sec;server.inst_metric[metric].idx++;server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;server.inst_metric[metric].last_sample_time = mstime();server.inst_metric[metric].last_sample_count = current_reading;
}

原来就是通过stat_numcommands来计数100ms中执行了多少个命令,然后获取qps,最后将最近16次的qps再取平均即得到终端输出的qps的值,现在看到这里好像一切都是那么顺利成章,通过统计单位时间内的命令执行数来,此时我闭起来双眼,如果没擦错的话,redis把我的5000个的memusge的pipeline命令的请求就计数为5000次了。

void call(client *c, int flags) {...server.stat_numcommands++;
}int processCommand(client *c) {.../* Exec the command */if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){queueMultiCommand(c);addReply(c,shared.queued);} else {call(c,CMD_CALL_FULL);                      // 执行命令c->woff = server.master_repl_offset;if (listLength(server.ready_keys))handleClientsBlockedOnKeys();}return C_OK;
}void processInputBuffer(client *c) {server.current_client = c;                          // 设置当前处理的连接的客户端/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {            // 检查长度是否符合要求, 循环执行 因为缓冲区可能会接受多个命令/* Return if clients are paused. */if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;/* Immediately abort if the client is in the middle of something. */if (c->flags & CLIENT_BLOCKED) break;/* Don't process input from the master while there is a busy script* condition on the slave. We want just to accumulate the replication* stream (instead of replying -BUSY like we do with other clients) and* later resume the processing. */if (server.lua_timedout && c->flags & CLIENT_MASTER) break;/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands).** The same applies for clients we want to terminate ASAP. */if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;/* Determine request type when unknown. */if (!c->reqtype) {                                                      // 通过传入的第一个数据来检查是否是多个命令参数if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {                                   // 检查接受数据的参数类型if (processInlineBuffer(c) != C_OK) break;                          // 解析一个命令参数} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;                       // 解析多个命令参数} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {                                             resetClient(c);                                                     // 重置标志位和参数} else {/* Only reset the client when the command was executed. */      // if (processCommand(c) == C_OK) {                                    // 处理参数查找命令if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {/* Update the applied replication offset of our master. */c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;     }/* Don't reset the client structure for clients blocked in a* module blocking command, so that the reply callback will* still be able to access the client argv and argc field.* The client will be reset in unblockClientFromModule(). */if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)resetClient(c);}/* freeMemoryIfNeeded may flush slave output buffers. This may* result into a slave, that may be the active client, to be* freed. */if (server.current_client == NULL) break;}}/* Trim to pos */if (server.current_client != NULL && c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;}server.current_client = NULL;                       // 处理完成置为空
}void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {client *c = (client*) privdata;int nread, readlen;size_t qblen;UNUSED(el);UNUSED(mask);readlen = PROTO_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG)                                       // 检查该命令是否是多个命令的请求{ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);/* Note that the 'remaining' variable may be zero in some edge case,* for example once we resume a blocked client after CLIENT PAUSE. */if (remaining > 0 && remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);                                                    // 获取保存的缓冲区大小if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);                             // 扩充内存来容纳剩余需要读到的数据nread = read(fd, c->querybuf+qblen, readlen);                                   // 从连接中读取数据if (nread == -1) {                                                              // 如果读取的长度为空则检查是否连接出错if (errno == EAGAIN) {return;} else {serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));       // 释放连接并打印错误freeClient(c);return;}} else if (nread == 0) {                                                        // 如果为0 则表示连接关闭 释放客户端serverLog(LL_VERBOSE, "Client closed connection");freeClient(c);return;} else if (c->flags & CLIENT_MASTER) {/* Append the query buffer to the pending (not applied) buffer* of the master. We'll use this buffer later in order to have a* copy of the string applied by the last command executed. */c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}sdsIncrLen(c->querybuf,nread);                                                  // 提升保存缓存区的大小c->lastinteraction = server.unixtime;                                           // 获取时间if (c->flags & CLIENT_MASTER) c->read_reploff += nread;                 server.stat_net_input_bytes += nread;                                           // 保存新加入的数据处理长度if (sdslen(c->querybuf) > server.client_max_querybuf_len) {                     // 如果超过了缓冲区最大的接受长度则答应错误释放内存关闭客户端sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64);serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}/* Time to process the buffer. If the client is a master we need to* compute the difference between the applied offset before and after* processing the buffer, to understand how much of the replication stream* was actually applied to the master state: this quantity, and its* corresponding part of the replication stream, will be propagated to* the sub-slaves and to the replication backlog. */processInputBufferAndReplicate(c);                                      // 尝试去解析输入数据并根据解析的数据执行对应命令
}

代码看到这里,一切就清晰了,调用的逻辑如下

readQueryFromClient -> processInputBufferAndReplicate -> processInputBuffer -> processCommand -> call

从processInputBuffer一次接受的buff中依次挨个解析每一个命令,如果buffer一次性接受了5000个memusage命令,则会调用call5000次,即统计计数也会增加,但是为什么qps这么高却跟线上实际的现象不同呢。其实再想一想,线上的环境会被多个client连接,常用的命令基本上面没有一次5000的pipeline,并且每个客户端基本上都响应的是不同的短命令,而测试环境的redis其实只有几个client,处理的事件响应数比较小,而且每次处理的时候都是处理很多命令组成一起的buffer一起发送,故qps高但是性能指标没有明显飙升。

线上环境:
A1 -- get命令 -->
A2 -- set命令 -->    Redis事件(计数加1)
A3 -- get命令 -->
A4 -- get命令 -->测试环境
A1 -- get命令/get命令/get命令 Redis事件(计数加3)

这样一对比相比大家也都能看出区别吧。

重构-轻量简洁的访问

此时为了不让一下把所有的请求都打入到redis上面,并且在一定程度上能够监控Redis的qps的值,以免压力太大而影响线上业务。思来想去,好吧,直接解析协议吧,全部改为异步IO来实现。

import asyncio
from asyncio import events
import timeimport aioredisredis_ip = "192.168.10.205"STATUS = ["running", "stop", "waiting"]RUNNING_STATUS = "running"REDIS_MAX_QPS = 1000000def change_status(status):global RUNNING_STATUSRUNNING_STATUS = statusdef encode_command(*args, buf=None):if buf is None:buf = bytearray()buf.extend(b'*%d\r\n' % len(args))try:for arg in args:if isinstance(arg, str):arg = arg.encode("utf-8")buf.extend(b'$%d\r\n%s\r\n' % (len(arg), arg))except KeyError:raise TypeError("Argument {!r} expected to be of bytearray, bytes,"" float, int, or str type".format(arg))return bufdef memory_usage_muti_t(keys=None):buf = bytearray()buf.extend(b'*%d\r\n$%d\r\n%s\r\n' % (1, len(b"MULTI"), b"MULTI"))for k in keys:buf.extend(encode_command(b"MEMORY", *["USAGE", k]))buf.extend(b'*%d\r\n$%d\r\n%s\r\n' % (1, len(b"EXEC"), b"EXEC"))return bufdef parse_memory_muti_i(buf, length):vals = buf.split("\r\n")if length > 0:length = -1 - lengthfor v in vals[length:-1]:yield int(v[1:])async def redis_pipeline(reader, writer, data):message = memory_usage_muti_t(data)length = len(data)writer.write(message)recv_buf = ""counts = length*2 + 2while True:try:recv = await reader.read(1024)if not recv:returnexcept Exception as e:writer.close()print(e)raise erecv_buf += recv.decode()if counts == recv_buf.count("\r\n"):i = 0for v in parse_memory_muti_i(recv_buf, length):# print(data[i], v)i += 1breakdef t_redis():loop = events.new_event_loop()async def monitor_qps():redis = await aioredis.create_redis('redis://{0}'.format(redis_ip))while RUNNING_STATUS in ["running", "waiting"]:await asyncio.sleep(0.1)redis_stats = await redis.info("stats")redis_qps = int(redis_stats["stats"]["instantaneous_ops_per_sec"])if redis_qps >= REDIS_MAX_QPS:print("qps  {0}".format(redis_qps))# RUNNING_STATUS = "waiting"if RUNNING_STATUS == "running":change_status("waiting")else:if RUNNING_STATUS == "waiting":change_status("running")async def go():redis = await aioredis.create_redis('redis://{0}'.format(redis_ip))reader, writer = await asyncio.open_connection(redis_ip, 6379,loop=loop)work_count = 0async def scan_iter(count=5000):nonlocal work_countcursor = "0"pipe_count = 0while cursor != 0:cursor, data = await redis.scan(cursor=cursor, count=count)if len(data):work_count += len(data)pipe_count += 1await redis_pipeline(reader, writer, data)print(work_count)# 判断是否需要进行等待while RUNNING_STATUS == "waiting":await asyncio.sleep(1)await scan_iter()print("total count key {0}".format(work_count))redis.close()# 关闭检查qps的协程change_status("stop")print(RUNNING_STATUS)await redis.wait_closed()start = time.time()events.set_event_loop(loop)loop.set_debug(False)loop.run_until_complete(asyncio.gather(go(), monitor_qps()))end = time.time()print("finish use time {0} second".format(end-start))if __name__ == '__main__':t_redis()

如果设置REDIS_MAX_QPS为1000000(相当于对qps不加限制),此时跑完520万左右的key大约需要67秒,性能数据如下;

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
11352 root      20   0  191168  14672   4556 R  74.4  0.1   0:20.43 python4224 root      20   0  778136 501504   1032 S  35.2  3.1 185:20.51 redis-server

如果此时我修改REDIS_MAX_QPS为20000的话,当检测到qps大于20000的时候就会停止访问,此时的执行时间大概需要300秒,此时的性能数据;

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
10865 root      20   0  190904  14652   4556 S  13.2  0.1   0:44.06 python4224 root      20   0  778136 501504   1032 S   7.9  3.1 185:07.18 redis-server

通过两次对比来看的话,确实限速之后的资源占用率要少一些,但是对应的响应请求的时间就会长一些。事情到了这个地方,其实qps来进行访问的限制还是有一定的问题的,如果某个集群的qps一直大于设置的qps的值,那个这个redis基本上不会被执行一直在那里检查是否可以被允许查询。鱼和熊掌不可兼得。

重来-通过主从复制的原理来访问

因为redis支持主从复制,假如我伪装成一个redis的从,然后接受redis节点发送的同步数据,通过同步数据来进行key大小的判断,这个就变成了一个client端只需要被动的收数据就行不进行主动的访问。

有了这个想法,就需要思考一下如何才能实现,redis的主从复制的原理查看代码之后发现其实还是比较简单的就是连接之后发送一个sync同步命令,同步对应的rdb文件,等同步完成之后就发送最新redis接受的操作命令从而达成主从数据一致性。

在完成主从复制之后,就需要解析rdb文件,当我查看了rdb文件的格式之后发现短期去实现一个解析工具还是有难度,那就使用现成的rdbtool这个工具来吧。

现在就需要组合这两个想法,于是就有了如下代码(代码风格确实不好,而且有些逻辑处理不完善,大家仅供查看一下功能而已)。

import socket
import logging
import time
import threadingfrom rdbtools import RdbParser, KeyValsOnlyCallback
from rdbtools.encodehelpers import ESCAPE_CHOICESlogger = logging.getLogger(__package__)start = time.time()redis_ip = "192.168.10.205"
redis_port = 6379
key_size = 42def encode_command(*args, buf=None):if buf is None:buf = bytearray()buf.extend(b'*%d\r\n' % len(args))try:for arg in args:if isinstance(arg, str):arg = arg.encode("utf-8")buf.extend(b'$%d\r\n%s\r\n' % (len(arg), arg))except KeyError:raise TypeError("Argument {!r} expected to be of bytearray, bytes,"" float, int, or str type".format(arg))return bufclass RecvBuff(object):def __init__(self):self.buff = b""self.cond = threading.Condition()self.length = 0self.total_length = 0self.is_done = Falsedef add(self, data):self.buff += datadef acquire(self):self.cond.acquire()def release(self):self.cond.release()def wait(self, timeout=None):self.cond.wait(timeout=timeout)def notify(self):self.cond.notify_all()def consumer_length(self, n):if len(self.buff) >= n:r = self.buff[:n]self.buff = self.buff[n:]self.length += nif self.length == self.total_length:self.is_done = Trueraisereturn relse:while True:self.notify()self.wait()if len(self.buff) >= n:r = self.buff[:n]self.buff = self.buff[n:]self.length += nif self.length == self.total_length:self.is_done = Trueraisereturn rrecv_buff = RecvBuff()def rdb_work():# out_file_obj = os.fdopen(sys.stdout.fileno(), 'wb')class Writer(object):def write(self, value):if b" " in value:index = value.index(b" ")length = len(value)if length - index - 1 >= key_size:print(value, index, length)out_file_obj = Writer()callback = {'justkeyvals': lambda f: KeyValsOnlyCallback(f, string_escape=ESCAPE_CHOICES[0]),}["justkeyvals"](out_file_obj)parser = RdbParser(callback)def parse(self, filename=None):class Reader(object):def __init__(self, buff):self.buff = buffdef __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):passdef read(self, n):if n <= 0:returnres = self.buff.consumer_length(n)return resdef close(self):passf = Reader(recv_buff)self.parse_fd(f)setattr(parser, "parse", parse)recv_buff.acquire()parser.parse(parser)recv_buff.is_done = Truerecv_buff.notify()recv_buff.release()class RedisServer(object):def __init__(self, host=None, port=None):self.host = host or "127.0.0.1"self.port = port or 6379self.conn = Noneself.recv_buff = recv_buffdef init(self):try:self.conn = socket.socket()self.conn.connect((self.host, self.port))except Exception as e:logger.exception(e)self.conn = Nonereturnself.slave_sync()def slave_sync(self):self.send_sync()self.recv_buff.acquire()total_read_length = 0while True:data = self.conn.recv(1024 * 1)print("check ", data)if b"$" == data[:1]:length = len(data)for i in range(length-1):if b"\r\n" == data[i:(i + 2)]:breakself.recv_buff.total_length = int(data[1:(i-2)].decode())left_data = data[(i+2):]total_read_length += len(left_data)if left_data:self.recv_buff.add(left_data)self.recv_buff.notify()self.recv_buff.wait()breakif b"\n" == data:continuewhile True:try:data = self.conn.recv(1024 * 8)except Exception as e:print("recv error : {0}".format(e))returnif data:total_read_length += len(data)self.recv_buff.add(data)self.recv_buff.notify()self.recv_buff.wait()if self.recv_buff.is_done:returndef send_sync(self):data = encode_command("SYNC")try:self.conn.send(data)except Exception as e:returnif __name__ == '__main__':rs = RedisServer(redis_ip, redis_port)t = threading.Thread(target=rs.init)t.start()t1 = threading.Thread(target=rdb_work)t1.start()t.join()t1.join()end = time.time()print("finish use time {0}  second  ".format(end - start))

运行之后系统的监控如下;

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND1608 root      20   0  334428  10560   4368 S  99.7  0.1   0:30.99 python4224 root      20   0  778136 504568   1032 S   0.3  3.1 204:00.90 redis-server

可以看出,脚本的cpu利用率较高,但是redis-server的各项性能指标都比较平稳,cpu利用率也比异步执行的要低一些。并且该方法在分析比较大的redis的key时并不会将rdb文件落盘,从而节省了空间,但是该脚本的执行性能相对一般。

....
b'test_71c275a0-91d4-4745-b8d8-864ca37a63d0 test_71c275a0-91d4-4745-b8d8-864ca37a63d0' 41 83
b'test_d36ab798-2ee2-435a-8157-cf15cb42a5ce test_d36ab798-2ee2-435a-8157-cf15cb42a5ce' 41 83
b'test_29839173-db14-4c2c-8900-5ed139d7b7da test_29839173-db14-4c2c-8900-5ed139d7b7da' 41 83
b'test_c8636449-f06b-41c7-862a-3ae40c98e5f6 test_c8636449-f06b-41c7-862a-3ae40c98e5f6' 41 83
b'test_3f02114f-06f8-4f85-8133-415bd695137c test_3f02114f-06f8-4f85-8133-415bd695137c' 41 83
finish use time 149.25378394126892  second

五百二十万左右的key,大约需要149秒访问成功。不过该方法目前来看对redis集群影响相对较小。

总结

本文只是对上一次大量redis的key的节点访问的一个继续探索,因为使用过程中发现有qps打满cpu压力过大等问题,就继续做了改进,通过换成全部的异步IO和通过主从复制的原理来进行解析,各有优缺点,不过最后的通过主从复制的原理来使用的话,对redis的节点的影响相对较小,但是该方法需要连接的redis的节点的集群状态正确,不能是一个断了主节点的从节点。由于本人才疏学浅,如有错误请批评指正。

遍历百万级Redis的键值的续集相关推荐

  1. 遍历百万级Redis的键值的大结局

    背景 上次改完利用条件变量的形式来进行rdbtool和socket接受的数据联合分析,我再想能不能通过协程来实现避免条件变量这种调用系统调用的方式,当然如果算一下因为每一次接受的socket的数据都尽 ...

  2. 遍历百万级Redis的键值的曲折经历

    背景 暖心同学突然跟我说想要获取线上所有的Redis的key的大小信息,就是想知道redis中所有对应Key的大小信息(线上使用的redis存储的信息基本统一而且没有其他复杂的如set等数据结构),让 ...

  3. redis中键值出现 \xAC\xED\x00\x05t\x00\x11的原因和解决方法

    一.redis中键值出现乱码情况 1.1 问题描述 1.1.1 使用SpringBoot项目结合redis做缓存,发现redis客户端工具中db0库key为USER_USER_ID_1000的前缀出现 ...

  4. Redis 大键值对 Big Key 懒惰删除机制

    一.懒惰删除介绍 在删除元素数量很多的集合(set/hash/list/sortedSet)时,无论是使用DEL命令删除还是redis为了释放内存空间而进行的删除,在删除这些big key的时候,会导 ...

  5. java redis存储键值包含\xac\xed\x00\x05t\x00\特殊字符

    java RedisTemplate操作redis后,想看一下是否成功, 就redis-cli执行:keys * "\xac\xed\x00\x05t\x00\x04name" & ...

  6. redis常用操作2, redis操作键值, redis安全设置

    string数据 127.0.0.1:6379> setnx k1 888 #键存在,setnx检测到,不会覆盖: (integer) 0 127.0.0.1:6379> get k1 & ...

  7. 03,redis多键值对,哈希散列hset

    // 客户端Jedis连接到服务端,并选择第2个数据库Jedis jedis = new Jedis("127.0.0.1",6379);jedis.select(1);jedis ...

  8. JS 遍历JSON对象中的键值对

    对象:一组无序属性的集合,属性的值可以是任意的类型: json也是对象,数据都是成对的,也就是键值对: json实际上就是一组格式化后的字符串数据. 遍历JSON对象中的数据,可通过for-in循环实 ...

  9. 5.1.8 NoSQL数据库-Redis(键值key-value)-Redis配置详解

    目录 1.写在前面 2.具体信息 2.1 单位 2.2 包含 2.3 网络 2.4 通用 GENERAL 2.5 快照 2.6 REPLICATION 主从复制 2.7 SECURITY 安全 2.8 ...

最新文章

  1. c语言时间库函数#includetime.h
  2. 腾讯 AI Lab Robotics X 主任张正友博士:计算机视觉的三生三世 | CCF-GAIR 2019
  3. 百度大脑“乘风”新基建,“破浪”产业智能化落地
  4. 2018数据库流行度12月排行:Oracle续跌至年内低位,PostgreSQL激增创新高
  5. MySQL创建价格_mysql 建表时的价格用什么类型定义?
  6. Tomcat错误解决
  7. java序列化原理_Java序列化机制和原理
  8. 有时,你离成功的 exploit 只差一个信息泄漏:通过 ID tag 找到它
  9. Python返回Json格式定义的例子
  10. 实验5 振幅调制(集成乘法器幅度调制电路)
  11. python爬图mzitu_换个框架爬妹子图mzitu解决直接访问的403
  12. 电脑断网的解决办法(方法不会对电脑或网络造成负面影响)
  13. 如何添加xp操作系统的打印服务器,xp系统添加局域网打印机的简单方法
  14. Nginx 安全漏洞
  15. 计算机系统中三级存储器结构,计算机三级存储体系(1)
  16. mysql引用表无效列_Mysql使用索引可能失效的场景
  17. win10安装onnx、tensorrt(python用,超简单安装版)
  18. MacPro系统重装操作步骤(U盘重装)
  19. 在标准ASCII码表中,已知英文字母K的十六进制码值是4B,则二进制ASCII码1001000对应的字符是( )
  20. VTP:Cisco VLAN Trunking Protocol

热门文章

  1. 这个 AI 模型火上 GitHub 热榜第一,在线修复照片
  2. 让大规模深度学习训练线性加速、性能无损,基于BMUF的Adam优化器并行化实践...
  3. 在商业中,如何与人工智能建立共生关系?
  4. 北大教授张大庆:无线感知,让你变老也优雅
  5. 实战:基于技术分析的Python算法交易
  6. T5,一个探索迁移学习边界的模型
  7. 深度学习难,这本书让你轻松学深度学习
  8. AI芯片的“战国时代”:计算力将会驶向何方?
  9. 机器学习,就用Python!五大专家详解其优势何在
  10. AI一分钟 | 今天,百度又多了一个好基友华为,还互赠了信物;腾讯AI Lab“肢体动作追踪”技术造出了个“AI 尬舞机”