文章目录

  • 概述
  • pyspider message_queue 源码解读
  • pyspider的message_queue的配置文件
    • 使用redis集群时的配置文件
    • 使用redis单点,带认证时的message_queue配置
  • pyspider实现redis集群带认证的支持

概述

最近使用pyspider作为调度部署一个项目,因为客户方提供需要使用redis集群作为消息队列。所以在网上搜索了好多,都说不支持redis集群。静下心来一想,这么常规的需求不应该不支持呀。本着一切都在源码中的宗旨,打开了pyspider的源码,果然让我发现了其实是支持redis集群的。但是如果redis集群需要认证的话,就不支持了。因此,需要对pyspider的代码做一个改造,让它支持redis集群带认证的方式。

pyspider message_queue 源码解读

首先看redis_queue.py文件中,我们可以看到RedisQueue类的构造函数中是有关于StrictRedisCluster的信息的,说明pyspider是支持redis集群模式的。和单点的redis连接相比,其中并没有关于redis集群的密码认证的代码。

class RedisQueue(object):"""A Queue like message built over redis"""Empty = BaseQueue.EmptyFull = BaseQueue.Fullmax_timeout = 0.3def __init__(self, name, host='localhost', port=6379, db=0,maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):"""Constructor for RedisQueuemaxsize:    an integer that sets the upperbound limit on the number ofitems that can be placed in the queue.lazy_limit: redis queue is shared via instance, a lazy size limit is usedfor better performance."""self.name = nameif(cluster_nodes is not None):from rediscluster import StrictRedisClusterself.redis = StrictRedisCluster(startup_nodes=cluster_nodes)else:self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)self.maxsize = maxsizeself.lazy_limit = lazy_limitself.last_qsize = 0

首先抛开redis集群的密码认证不说,如果是没有密码认证的集群,我们应该怎么配置呢。我们知道,在运行pypsider的时候,使用的命令是:

pyspider -c  pyspider.json

这种方式。也就是说我们程序的入口就在pyspider命令中。
跟踪代码就会发现,与消息队列初始化有关的操作在run.py文件中,代码如下:

@cli.command()
@click.option('--fetcher-num', default=1, help='instance num of fetcher')kwargs['is_%s_default' % db] = True# create folder for counter.dumpif not os.path.exists(kwargs['data_path']):os.mkdir(kwargs['data_path'])# message queue, compatible with old versionif kwargs.get('message_queue'):passelif kwargs.get('amqp_url'):kwargs['message_queue'] = kwargs['amqp_url']elif os.environ.get('RABBITMQ_NAME'):kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)elif kwargs.get('beanstalk'):kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher','fetcher2processor', 'processor2result'):if kwargs.get('message_queue'):kwargs[name] = utils.Get(lambda name=name: connect_message_queue(name, kwargs.get('message_queue'), kwargs['queue_maxsize']))else:kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),kwargs['queue_maxsize'])

connect_message_queue 方法是非常关键的一个函数,继续跟踪,就会发现这个函数在message_queue包下面的__init__.py文件中,定义如下:

def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):"""create connection to message queuename:name of message queuerabbitmq:amqp://username:password@host:5672/%2Fsee https://www.rabbitmq.com/uri-spec.htmlbeanstalk:beanstalk://host:11300/redis:redis://host:6379/dbredis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)kombu:kombu+transport://userid:password@hostname:port/virtual_hostsee http://kombu.readthedocs.org/en/latest/userguide/connections.html#urlsbuiltin:None"""if not url:from pyspider.libs.multiprocessing_queue import Queuereturn Queue(maxsize=maxsize)parsed = urlparse.urlparse(url)if parsed.scheme == 'amqp':from .rabbitmq import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)elif parsed.scheme == 'beanstalk':from .beanstalk import Queuereturn Queue(name, host=parsed.netloc, maxsize=maxsize)elif parsed.scheme == 'redis':from .redis_queue import Queueif ',' in parsed.netloc:"""redis in cluster mode (there is no concept of 'db' in cluster mode)ex. redis://host1:port1,host2:port2,...,hostn:portn"""cluster_nodes = []for netloc in parsed.netloc.split(','):cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})return Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes)else:db = parsed.path.lstrip('/').split('/')try:db = int(db[0])except:logging.warning('redis DB must zero-based numeric index, using 0 instead')db = 0password = parsed.password or Nonereturn Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)elif url.startswith('kombu+'):url = url[len('kombu+'):]from .kombu_queue import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)else:raise Exception('unknown connection url: %s', url)

pyspider的message_queue的配置文件

使用redis集群时的配置文件

由上面的代码我们就可以非常清楚的知道,如果使用redis集群模式时,首先要做的就是两件事:

  • 安装redis集群python工具包
pip install rediscluster
  • pyspider.json中配置如下:
{"message_queue": "redis://192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
}

节点与节点之间使用“,”分隔开。

在上面的代码中,我们可以看到如果是redis的单节点带认证的方式,配置文件应该是如下所示:

使用redis单点,带认证时的message_queue配置

redis单节点有认证,则message_queue的url为:
冒号+密码+@+host:port

{"message_queue": redis://:redispass@192.168.0.1:5000/db0
}

pyspider实现redis集群带认证的支持

因此,我们可以模仿,如果使用redis集群带认证的方式按照redis单点类似的配置要怎么实现呢?
即配置文件如下:

{"message_queue": "redis://:redispass@192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
}

如果要实现上面这种配置方式,显然我们是需要改代码的,首先要改的就是解析配置文件的地方。


def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):"""create connection to message queuename:name of message queuerabbitmq:amqp://username:password@host:5672/%2Fsee https://www.rabbitmq.com/uri-spec.htmlbeanstalk:beanstalk://host:11300/redis:redis://host:6379/dbredis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)kombu:kombu+transport://userid:password@hostname:port/virtual_hostsee http://kombu.readthedocs.org/en/latest/userguide/connections.html#urlsbuiltin:None"""if not url:from pyspider.libs.multiprocessing_queue import Queuereturn Queue(maxsize=maxsize)parsed = urlparse.urlparse(url)if parsed.scheme == 'amqp':from .rabbitmq import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)elif parsed.scheme == 'beanstalk':from .beanstalk import Queuereturn Queue(name, host=parsed.netloc, maxsize=maxsize)elif parsed.scheme == 'redis':from .redis_queue import Queue# 从URL中解析出passwordpassword = parsed.password or Noneif ',' in parsed.netloc:"""redis in cluster mode (there is no concept of 'db' in cluster mode)ex. redis://host1:port1,host2:port2,...,hostn:portn"""cluster_nodes = []for netloc in parsed.netloc.split(','):#拿到了每一个节点,因为第一个节点中有password信息,所以需要做处理if password is not None:prefix = ":"+password+"@"netloc =netloc.replace(prefix,"")cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})# 在集群的构造函数中加入passwordreturn Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes,password=password)else:db = parsed.path.lstrip('/').split('/')try:db = int(db[0])except:logging.warning('redis DB must zero-based numeric index, using 0 instead')db = 0password = parsed.password or Nonereturn Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)elif url.startswith('kombu+'):url = url[len('kombu+'):]from .kombu_queue import Queuereturn Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)else:raise Exception('unknown connection url: %s', url)

由上述代码可知,我们改动的地方很少,所以还是非常简单的。

  • 第二处需要改动的则是RedisQueue构造函数,需要在连接RedisCluster时加入密码参数。
class RedisQueue(object):"""A Queue like message built over redis"""Empty = BaseQueue.EmptyFull = BaseQueue.Fullmax_timeout = 0.3def __init__(self, name, host='localhost', port=6379, db=0,maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):"""Constructor for RedisQueuemaxsize:    an integer that sets the upperbound limit on the number ofitems that can be placed in the queue.lazy_limit: redis queue is shared via instance, a lazy size limit is usedfor better performance."""self.name = nameif(cluster_nodes is not None):# StrictRedisCluster 方法在新的版本中已经被移除了,所以需要更改成RedisClusterfrom rediscluster import RedisClusterself.redis = # 在构造函数中加入密码RedisCluster(startup_nodes=cluster_nodes,password=password)else:self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)self.maxsize = maxsizeself.lazy_limit = lazy_limitself.last_qsize = 0

整个代码的改动也就不超过10行,就可以完美的实现我们的需求。

当然,因为rediscluster工具包不再维护了,我们需要更换新的redis集群工具包。

pip install redis==3.5.3
pip install redis-py-cluster==2.1.3

Pyspider 使用带认证redis集群作为消息队列相关推荐

  1. SpringBoot2.0 整合 Redis集群 ,实现消息队列场景

    本文源码 GitHub地址:知了一笑 https://github.com/cicadasmile/middle-ware-parent 一.Redis集群简介 1.RedisCluster概念 Re ...

  2. flask模拟集群实现消息队列和简单高并发支持

    思路: 1.一个总端口实现服务的代理和分发--使用gevent做协程,解决高并发: 2.多个子端口实现集群构建业务层--使用make_server,构成消息队列: 3.总端口/子端口--增加延迟启动/ ...

  3. 理想化的 Redis 集群

    摘要: 豁达是正确乐观的面对失败的系统.不需要过多的担心,需要一种去说那又怎样的能力.因此架构的设计是如此的重要.许多优秀的系统没有进一步成长的能力,我们应该做的是使用其他的系统去共同分担工作. Re ...

  4. redis 集群master slave飘逸,造成订阅消息失败问题解决

    一.问题 之前的代码,通过redis消息订阅发布功能,来获取最新的设备信息,运行一段时间后发现,从某个点开始,无法订阅新发布的消息 二.调研 1.在本地debug模式启动功能,连接到redis集群订阅 ...

  5. Redis集群读写分离架构搭建以及主从数据连通验证(附加集群口令认证以及Redis端口6379释放)

    1. 先在两台主机上装好Redis 如果这部分工作还没有准备好的话,可以看我的另一篇博客. 2. 设置主从节点以及从节点只读(实现读写分离) 2.1 配置slave节点作为master的从机,打开/e ...

  6. 搭建高可用的redis集群,避免standalone模式带给你的苦难

    现在项目上用redis的话,很少说不用集群的情况,毕竟如果生产上只有一台redis会有极大的风险,比如机器挂掉,或者内存爆掉,就比如我们生产环境 曾今也遭遇到这种情况,导致redis内存不够挂掉的情况 ...

  7. 带你搭建一下虚拟机和Redis集群,记得收藏

    前言: 我们看到分析 Redis 使用或原理的文章不少,但是完整搭建一套独立的 redis 集群环境的介绍,并不是很多或者说还不够详细. 那么,本文会手把手带着大家搭建一套 Redis 集群环境,Re ...

  8. 带你来搭建虚拟机和Redis集群,记得收藏

    1.前言 我们看到分析 Redis 使用或原理的文章不少,但是完整搭建一套独立的 Redis 集群环境的介绍,并不是很多或者说还不够详细. 那么,本文会手把手带着大家搭建一套 Redis 集群环境,R ...

  9. centos redis 升级版本_带你来搭建虚拟机和Redis集群,记得收藏

    来源于公众号Java爱好者社区 , 作者东升的思考 1.前言 我们看到分析 Redis 使用或原理的文章不少,但是完整搭建一套独立的 Redis 集群环境的介绍,并不是很多或者说还不够详细. 那么,本 ...

最新文章

  1. 几个cvebase_ifo基础信息融合在一起
  2. 无人驾驶产业发展现状及影响
  3. mysql类 php100_PHP100视频教程26:制作自己的PHP+MYSQL的类
  4. set的使用03(较多的操作函数)
  5. 使用PHP得到所有的HTTP请求头
  6. View Flash AS3 and AVM2
  7. 零基础学编程,如何区分C语言和Java?我们到底如何怎么进行选择!
  8. WORD 如何在方框里打勾?
  9. 小狼毫(Rime)输入法设置Shift直接上屏英文字符并切换为英文状态方法
  10. 如何将html转为report,如何把Html5 Report Viewer添加到Web项目
  11. 编译WINDOWS版FFmpeg:msys2环境准备
  12. InnoDB之锁机制
  13. 腾讯云服务器公网ip无法访问
  14. SpringAOP简单案例
  15. java 读取 excel 科学计数
  16. 魔法才能打败魔法?银行现身说法
  17. Redis底层数据结构——跳跃列表
  18. NYOJ - [第八届河南省程序设计大赛]Distribution(水题)
  19. 增设区域分销商:APC渠道变革拒绝“扁平化”
  20. doolittle分解法解线性方程

热门文章

  1. 案例:990万次骑行:纽约自行车共享系统分析
  2. WICC 2021来袭 融云领衔探索互联网通信云技术新方向
  3. 多重背包模板 C++
  4. javascript 删除节点问题
  5. 社交需求和社交产品的更替
  6. 初始化56个民族下拉框
  7. Android CTA认证设定首选网络类型
  8. Tableau(5):符号地图、仪表盘、自定义显示
  9. 精选『捷径』干货,反正我是都收藏了!
  10. JAVA 阶段测试题-SQL