0、环境:Win7 x64,Python 2.7,APScheduler 2.1.2。

1、图:

2、代码:

(1)、中心节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 中心节点(主要功能是分配任务)
import SocketServer, socket, Queue
CenterIP = '127.0.0.1'    #中心节点IP
CenterListenPort = 9999   #中心节点监听端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  #中心节点用于发送网络消息的socket
TaskQueue = Queue.Queue() #任务队列
#获取任务队列
def GetTaskQueue():
    for in range(111):
        TaskQueue.put(str(i))
#CenterServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        data = self.request[0].strip()
        socket = self.request[1]
        print(data)
         
        if data.startswith('wait'):   
            vec = data.split(':')
            if len(vec) != 3:
                print('Error: len(vec) != 3')
            else:
                nodeIP = vec[1]
                nodeListenPort = vec[2]
                nodeID = nodeIP + ':' + nodeListenPort
                if not TaskQueue.empty():
                    task = TaskQueue.get()
                    print('send task ' + task + ' to ' + nodeID)
                    CenterClient.sendto('task:' + task, (nodeIP, int(nodeListenPort)))
                else:
                    print('TaskQueue is empty!')
GetTaskQueue()  #获取任务队列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print('Listen port ' + str(CenterListenPort) + ' ...')
CenterServer.serve_forever()

(2)、任务节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 任务节点(请求/接收/执行任务)
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = '127.0.0.1'    #中心节点IP
CenterListenPort = 9999   #中心节点监听端口
NodeIP = socket.gethostbyname(socket.gethostname())   #任务节点自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)    #任务节点用于发送网络消息的socket
#任务:发送网络信息
def jobSendNetMsg():
    msg = ''
    if NodeServer.TaskState == 'wait':
        msg = 'wait:' + NodeIP + ':' + str(NodeListenPort)
    elif NodeServer.TaskState == 'exec':
        msg = 'exec:' + NodeIP + ':' + str(NodeListenPort)
     
    print(msg)
    NodeClient.sendto(msg, (CenterIP, CenterListenPort)) 
#添加并启动定时任务
def InitTimer():
    sched = Scheduler()
    sched.add_interval_job(jobSendNetMsg, seconds=1)
    sched.start()
     
#执行任务
def ExecTask(task):
    print('ExecTask ' + task + ' ...')
    time.sleep(2)
    print('ExecTask ' + task + ' over')
#NodeServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        data = self.request[0].strip()
        socket = self.request[1]
        print('recv data: ' + data)
         
        if data.startswith('task'):
            vec = data.split(':')
            if len(vec) != 2:
                print('Error: len(vec) != 2')
            else:
                task = vec[1]
                self.server.TaskState = 'exec'
                ExecTask(task)
                self.server.TaskState = 'wait'
InitTimer()
                 
NodeServer = SocketServer.UDPServer(('', 0), MyUDPHandler)
NodeServer.TaskState = 'wait' #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print('NodeListenPort:' + str(NodeListenPort))
NodeServer.serve_forever()

*** walker * 2014-12-03 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1585826如需转载请自行联系原作者
RQSLT

Python自定义主从分布式架构相关推荐

  1. 游戏开发--开源软件11--Firefly(python 服务端分布式框架)||pygame

    2019独角兽企业重金招聘Python工程师标准>>> Firefly是免费.开源.稳定.快速扩展.能 "热更新"的分布式游戏服务器端框架,采用Python编写, ...

  2. 软件开发随笔系列一——分布式架构实现

    软件开发随笔系列一--分布式架构实现 文章目录 软件开发随笔系列一--分布式架构实现 理论基础 分布式架构的实现 内核框架 应用开发 基础设施 服务接入 监控 日志监控 调用链监控 度量指标监控 健康 ...

  3. 记一次自定义 Redis 分布式锁导致的故障

    点击上方关注 "终端研发部" 设为"星标",和你一起掌握更多数据库知识 背景 企微报警群里连续发出生产环境报错警告,报错核心信息如下: redis setNX ...

  4. 一文了解四种软件架构:Serverless架构、微服务架构、分布式架构、单体架构

    如果一个软件开发人员,不了解软件架构的演进,会制约技术的选型和开发人员的生存.晋升空间.这里我列举了目前主要的四种软件架构以及他们的优缺点,希望能够帮助软件开发人员拓展知识面. 一.单体架构 单体架构 ...

  5. Elasticsearch 分布式架构原理

    前言 前面介绍了很多ES使用过程中的具体实战知识点,本文主要是谈谈ES分布式架构原理. 一.Elasticsearch特点 elasticsearch是近实时的分布式搜索分析引擎,底层实现基于Luce ...

  6. 【秒杀购物商城业务服务】「分布式架构服务」盘点中间件服务的高可用模式及集群技术的方案分析

    秒杀购物商城业务服务-分布式架构介绍 基于MySQL数据库集群技术实现服务的高可用 基于Tomcat的集群负载机制实现Tomcat服务器的高可用 基于Nginx负载均衡机制实现负载均衡(介绍和配置) ...

  7. 「秒杀购物商城业务服务」「分布式架构服务」盘点中间件服务

    ​ 秒杀购物商城业务服务-分布式架构介绍 基于MySQL数据库集群技术实现服务的高可用 基于Tomcat的集群负载机制实现Tomcat服务器的高可用 基于Nginx负载均衡机制实现负载均衡(介绍和配置 ...

  8. 记一次自定义Redis分布式锁导致的生产事件时间

    背景 企微报警群里连续发出生产环境报错警告,报错核心信息如下: redis setNX error java.lang.NumberFormatException: For input string: ...

  9. 记一次自定义Redis分布式锁导致的生产事件

    背景 企微报警群里连续发出生产环境报错警告,报错核心信息如下: redis setNX error java.lang.NumberFormatException: For input string: ...

最新文章

  1. Illustrator+FontLab 进行字体设计教程
  2. linux创建多个子进程,[Linux进程]使用fork函数创建多个子进程
  3. 非法ip通过ssh成功登录,自动结束会话
  4. System.Object 是 .NET 中所有类型的根吗?
  5. 一个 Dubbo 服务启动要两个小时
  6. 每日英语:The First Day On A Job Is Tough Work
  7. 《钢铁神兵》里的较量的数学题,都是什么级别的难题?
  8. 08、单链表编程考点
  9. Vmware安装CentOS7后访问不了外网
  10. vue结合vue-amap调用高德地图行政区划分并添加标记点
  11. 中文拼音表,完全包括GB2312字库中的字(除极少数生僻字)
  12. 云流化方案为水利数字孪生带来哪些新变化?
  13. WTL 自绘控件库 (CQsRadioBox)
  14. Android修改分区格式为F2FS
  15. typescript环境配置
  16. java游戏回转贝贝龙2下载,崩坏3:暴雨将至最后的剧情,为何贝贝龙拼死保护琪亚娜...
  17. 假新闻遇上AI,祸兮福兮?
  18. ​20X44 FCPX模板电影胶片滚动回忆照片相册图文展示动画 Move Time
  19. linux ps aux 命令解释
  20. 明星讲师心石闪耀ArchSummit大会 | 手机淘宝构架演化实践

热门文章

  1. MapReduce编程实例之自定义排序
  2. 如果在BackgroundWorker运行过程中关闭窗体…
  3. 在VMware ESX Server使用华为存储
  4. python conda虚拟环境
  5. python二分查找
  6. setleft android,android TextView的setCompoundDrawables()方法
  7. 南京晓庄学院计算机网络试卷,南京晓庄学院计算机网络8套卷(完整含答案).doc...
  8. python中常见的数据类型_Python中常见的数据类型总结(四)
  9. Docker虚拟化解析
  10. linux java 栈_关于Java中栈与堆的思考