Pyspider 使用带认证redis集群作为消息队列
文章目录
- 概述
- pyspider message_queue 源码解读
- pyspider的message_queue的配置文件
- 使用redis集群时的配置文件
- 使用redis单点,带认证时的message_queue配置
- pyspider实现redis集群带认证的支持
概述
最近使用pyspider作为调度部署一个项目,因为客户方提供需要使用redis集群作为消息队列。所以在网上搜索了好多,都说不支持redis集群。静下心来一想,这么常规的需求不应该不支持呀。本着一切都在源码中的宗旨,打开了pyspider的源码,果然让我发现了其实是支持redis集群的。但是如果redis集群需要认证的话,就不支持了。因此,需要对pyspider的代码做一个改造,让它支持redis集群带认证的方式。
pyspider message_queue 源码解读
首先看redis_queue.py文件中,我们可以看到RedisQueue类的构造函数中是有关于StrictRedisCluster的信息的,说明pyspider是支持redis集群模式的。和单点的redis连接相比,其中并没有关于redis集群的密码认证的代码。
class RedisQueue(object):"""A Queue like message built over redis"""Empty = BaseQueue.EmptyFull = BaseQueue.Fullmax_timeout = 0.3def __init__(self, name, host='localhost', port=6379, db=0,maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):"""Constructor for RedisQueuemaxsize: an integer that sets the upperbound limit on the number ofitems that can be placed in the queue.lazy_limit: redis queue is shared via instance, a lazy size limit is usedfor better performance."""self.name = nameif(cluster_nodes is not None):from rediscluster import StrictRedisClusterself.redis = StrictRedisCluster(startup_nodes=cluster_nodes)else:self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)self.maxsize = maxsizeself.lazy_limit = lazy_limitself.last_qsize = 0
首先抛开redis集群的密码认证不说,如果是没有密码认证的集群,我们应该怎么配置呢。我们知道,在运行pypsider的时候,使用的命令是:
pyspider -c pyspider.json
这种方式。也就是说我们程序的入口就在pyspider命令中。
跟踪代码就会发现,与消息队列初始化有关的操作在run.py文件中,代码如下:
@cli.command()
@click.option('--fetcher-num', default=1, help='instance num of fetcher')kwargs['is_%s_default' % db] = True# create folder for counter.dumpif not os.path.exists(kwargs['data_path']):os.mkdir(kwargs['data_path'])# message queue, compatible with old versionif kwargs.get('message_queue'):passelif kwargs.get('amqp_url'):kwargs['message_queue'] = kwargs['amqp_url']elif os.environ.get('RABBITMQ_NAME'):kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)elif kwargs.get('beanstalk'):kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher','fetcher2processor', 'processor2result'):if kwargs.get('message_queue'):kwargs[name] = utils.Get(lambda name=name: connect_message_queue(name, kwargs.get('message_queue'), kwargs['queue_maxsize']))else:kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),kwargs['queue_maxsize'])
connect_message_queue 方法是非常关键的一个函数,继续跟踪,就会发现这个函数在message_queue包下面的__init__.py文件中,定义如下:
def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):"""create connection to message queuename:name of message queuerabbitmq:amqp://username:password@host:5672/%2Fsee https://www.rabbitmq.com/uri-spec.htmlbeanstalk:beanstalk://host:11300/redis:redis://host:6379/dbredis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)kombu:kombu+transport://userid:password@hostname:port/virtual_hostsee http://kombu.readthedocs.org/en/latest/userguide/connections.html#urlsbuiltin:None"""if not url:from pyspider.libs.multiprocessing_queue import Queuereturn Queue(maxsize=maxsize)parsed = urlparse.urlparse(url)if parsed.scheme == 'amqp':from .rabbitmq import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)elif parsed.scheme == 'beanstalk':from .beanstalk import Queuereturn Queue(name, host=parsed.netloc, maxsize=maxsize)elif parsed.scheme == 'redis':from .redis_queue import Queueif ',' in parsed.netloc:"""redis in cluster mode (there is no concept of 'db' in cluster mode)ex. redis://host1:port1,host2:port2,...,hostn:portn"""cluster_nodes = []for netloc in parsed.netloc.split(','):cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})return Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes)else:db = parsed.path.lstrip('/').split('/')try:db = int(db[0])except:logging.warning('redis DB must zero-based numeric index, using 0 instead')db = 0password = parsed.password or Nonereturn Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)elif url.startswith('kombu+'):url = url[len('kombu+'):]from .kombu_queue import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)else:raise Exception('unknown connection url: %s', url)
pyspider的message_queue的配置文件
使用redis集群时的配置文件
由上面的代码我们就可以非常清楚的知道,如果使用redis集群模式时,首先要做的就是两件事:
- 安装redis集群python工具包
pip install rediscluster
- pyspider.json中配置如下:
{"message_queue": "redis://192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
}
节点与节点之间使用“,”分隔开。
在上面的代码中,我们可以看到如果是redis的单节点带认证的方式,配置文件应该是如下所示:
使用redis单点,带认证时的message_queue配置
redis单节点有认证,则message_queue的url为:
冒号+密码+@+host:port
{"message_queue": redis://:redispass@192.168.0.1:5000/db0
}
pyspider实现redis集群带认证的支持
因此,我们可以模仿,如果使用redis集群带认证的方式按照redis单点类似的配置要怎么实现呢?
即配置文件如下:
{"message_queue": "redis://:redispass@192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
}
如果要实现上面这种配置方式,显然我们是需要改代码的,首先要改的就是解析配置文件的地方。
def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):"""create connection to message queuename:name of message queuerabbitmq:amqp://username:password@host:5672/%2Fsee https://www.rabbitmq.com/uri-spec.htmlbeanstalk:beanstalk://host:11300/redis:redis://host:6379/dbredis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)kombu:kombu+transport://userid:password@hostname:port/virtual_hostsee http://kombu.readthedocs.org/en/latest/userguide/connections.html#urlsbuiltin:None"""if not url:from pyspider.libs.multiprocessing_queue import Queuereturn Queue(maxsize=maxsize)parsed = urlparse.urlparse(url)if parsed.scheme == 'amqp':from .rabbitmq import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)elif parsed.scheme == 'beanstalk':from .beanstalk import Queuereturn Queue(name, host=parsed.netloc, maxsize=maxsize)elif parsed.scheme == 'redis':from .redis_queue import Queue# 从URL中解析出passwordpassword = parsed.password or Noneif ',' in parsed.netloc:"""redis in cluster mode (there is no concept of 'db' in cluster mode)ex. redis://host1:port1,host2:port2,...,hostn:portn"""cluster_nodes = []for netloc in parsed.netloc.split(','):#拿到了每一个节点,因为第一个节点中有password信息,所以需要做处理if password is not None:prefix = ":"+password+"@"netloc =netloc.replace(prefix,"")cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})# 在集群的构造函数中加入passwordreturn Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes,password=password)else:db = parsed.path.lstrip('/').split('/')try:db = int(db[0])except:logging.warning('redis DB must zero-based numeric index, using 0 instead')db = 0password = parsed.password or Nonereturn Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)elif url.startswith('kombu+'):url = url[len('kombu+'):]from .kombu_queue import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)else:raise Exception('unknown connection url: %s', url)
由上述代码可知,我们改动的地方很少,所以还是非常简单的。
- 第二处需要改动的则是RedisQueue构造函数,需要在连接RedisCluster时加入密码参数。
class RedisQueue(object):"""A Queue like message built over redis"""Empty = BaseQueue.EmptyFull = BaseQueue.Fullmax_timeout = 0.3def __init__(self, name, host='localhost', port=6379, db=0,maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):"""Constructor for RedisQueuemaxsize: an integer that sets the upperbound limit on the number ofitems that can be placed in the queue.lazy_limit: redis queue is shared via instance, a lazy size limit is usedfor better performance."""self.name = nameif(cluster_nodes is not None):# StrictRedisCluster 方法在新的版本中已经被移除了,所以需要更改成RedisClusterfrom rediscluster import RedisClusterself.redis = # 在构造函数中加入密码RedisCluster(startup_nodes=cluster_nodes,password=password)else:self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)self.maxsize = maxsizeself.lazy_limit = lazy_limitself.last_qsize = 0
整个代码的改动也就不超过10行,就可以完美的实现我们的需求。
当然,因为rediscluster工具包不再维护了,我们需要更换新的redis集群工具包。
pip install redis==3.5.3
pip install redis-py-cluster==2.1.3
Pyspider 使用带认证redis集群作为消息队列相关推荐
- SpringBoot2.0 整合 Redis集群 ,实现消息队列场景
本文源码 GitHub地址:知了一笑 https://github.com/cicadasmile/middle-ware-parent 一.Redis集群简介 1.RedisCluster概念 Re ...
- flask模拟集群实现消息队列和简单高并发支持
思路: 1.一个总端口实现服务的代理和分发--使用gevent做协程,解决高并发: 2.多个子端口实现集群构建业务层--使用make_server,构成消息队列: 3.总端口/子端口--增加延迟启动/ ...
- 理想化的 Redis 集群
摘要: 豁达是正确乐观的面对失败的系统.不需要过多的担心,需要一种去说那又怎样的能力.因此架构的设计是如此的重要.许多优秀的系统没有进一步成长的能力,我们应该做的是使用其他的系统去共同分担工作. Re ...
- redis 集群master slave飘逸,造成订阅消息失败问题解决
一.问题 之前的代码,通过redis消息订阅发布功能,来获取最新的设备信息,运行一段时间后发现,从某个点开始,无法订阅新发布的消息 二.调研 1.在本地debug模式启动功能,连接到redis集群订阅 ...
- Redis集群读写分离架构搭建以及主从数据连通验证(附加集群口令认证以及Redis端口6379释放)
1. 先在两台主机上装好Redis 如果这部分工作还没有准备好的话,可以看我的另一篇博客. 2. 设置主从节点以及从节点只读(实现读写分离) 2.1 配置slave节点作为master的从机,打开/e ...
- 搭建高可用的redis集群,避免standalone模式带给你的苦难
现在项目上用redis的话,很少说不用集群的情况,毕竟如果生产上只有一台redis会有极大的风险,比如机器挂掉,或者内存爆掉,就比如我们生产环境 曾今也遭遇到这种情况,导致redis内存不够挂掉的情况 ...
- 带你搭建一下虚拟机和Redis集群,记得收藏
前言: 我们看到分析 Redis 使用或原理的文章不少,但是完整搭建一套独立的 redis 集群环境的介绍,并不是很多或者说还不够详细. 那么,本文会手把手带着大家搭建一套 Redis 集群环境,Re ...
- 带你来搭建虚拟机和Redis集群,记得收藏
1.前言 我们看到分析 Redis 使用或原理的文章不少,但是完整搭建一套独立的 Redis 集群环境的介绍,并不是很多或者说还不够详细. 那么,本文会手把手带着大家搭建一套 Redis 集群环境,R ...
- centos redis 升级版本_带你来搭建虚拟机和Redis集群,记得收藏
来源于公众号Java爱好者社区 , 作者东升的思考 1.前言 我们看到分析 Redis 使用或原理的文章不少,但是完整搭建一套独立的 Redis 集群环境的介绍,并不是很多或者说还不够详细. 那么,本 ...
最新文章
- 几个cvebase_ifo基础信息融合在一起
- 无人驾驶产业发展现状及影响
- mysql类 php100_PHP100视频教程26:制作自己的PHP+MYSQL的类
- set的使用03(较多的操作函数)
- 使用PHP得到所有的HTTP请求头
- View Flash AS3 and AVM2
- 零基础学编程,如何区分C语言和Java?我们到底如何怎么进行选择!
- WORD 如何在方框里打勾?
- 小狼毫(Rime)输入法设置Shift直接上屏英文字符并切换为英文状态方法
- 如何将html转为report,如何把Html5 Report Viewer添加到Web项目
- 编译WINDOWS版FFmpeg:msys2环境准备
- InnoDB之锁机制
- 腾讯云服务器公网ip无法访问
- SpringAOP简单案例
- java 读取 excel 科学计数
- 魔法才能打败魔法?银行现身说法
- Redis底层数据结构——跳跃列表
- NYOJ - [第八届河南省程序设计大赛]Distribution(水题)
- 增设区域分销商:APC渠道变革拒绝“扁平化”
- doolittle分解法解线性方程