分布式任务调度系统V1目标

初步目标实现,实现任务的下发分配,分布式任务执行,支持任务分片(在代码上支持),任务执行记录。

任务调度系统构思

基于C/S架构实现,基于长连接来管理实现,当前版本的逻辑架构图如下;

注册等
注册等
注册等
下发任务等
下发任务等
下发任务等
客户端A
服务端
客户端B
客户端C

系统主要是通过自有协议进行通信,采用单个长连接来进行数据交互,Server端主要参考早期版本tornado的设计思路,Client端实现主要参考Kazoo的设计思路。

协议

实现自有的协议,在选择协议时,二进制协议相对而言性能较高,但由于精力所限还是采用的字节流解析,首先将接受的数据解析成字符串,再根据字符串来进一步解析。基本的消息格式如下;

16\r\ntest\nstatus\t20016:代表test\nstatus\t200的长度
\r\n: 表示头部长度信息分隔符
test:表示消息的类型
\n: 表示消息类的内容信息
status: 表示消息里面的key
\t: 表示消息体里面的分隔符,分割key与value
200: 表示解析出来的key对应的value

根据该消息格式可进一步解析为;

length\r\ntype\nkey1\tvalue2\tkey2\tvalue2....

根据不同的类型去定义不同的解析方式,并可灵活扩展不同的参数。

Server端

Server端的实现,主要参考了tornado的早期版本的架构实现,采用了单线程IO复用的架构模式来实现服务端较好的处理性能,在早期版本中该模式可以减少部分高并发模式下一些资源竞争的问题。其中也是分层的抽象为Server层,数据流Stream层,Application处理层,并通过注册handler的形式,将不同的处理条件注册到Application中。

有事件处理
无事件触发
启动
服务端
单线程IO复用循环
接受数据进行业务处理
Client端

Client端主要参考了Kazoo的架构的实现的思路,借鉴了其实现的基本思路,即开启单独的线程来进行事件监听,来监听服务端发送过来的数据,并通过条件变量来实现与主线程之间的数据交互。

响应客户端请求
接受任务处理
客户端启动
监听事件线程
通过api接口发送数据服务端并等待
处理任务
任务执行完成并汇报结果
继续执行

时序图

用户任务调度中心agent_Aagent_B注册agent_A信息和Task1,Task3任务注册成功注册agent_B信息和Task2,Task3任务注册成功汇报当前agent_A库中自行编写的任务任务汇报成功汇报当前agent_B库中自行编写的任务任务汇报成功下发实时任务只执行一次接受任务Task1并执行执行结果返回下发定时任务给所有客户端Task3任务下发任务Task3下发任务Task3执行结果返回执行结果返回下发定时任务给所有客户端Task3任务分片任务为5下发任务Task3,item=0下发任务Task3,item=1执行结果返回执行结果返回下发任务Task3,item=2下发任务Task3,item=3执行结果返回执行结果返回下发任务Task3,item=4执行结果返回用户任务调度中心agent_Aagent_B

系统演示

项目穿梭们

首先,开启一个server端,也可以通过taskserver启动,会监听默认的端口4546,ip为127.0.0.1

(venv) wuzideMacBook-Pro:local_work wuzi$ taskserver
server start listen at 127.0.0.1:4546

此时,编写两个文件分别为task1.py,与task2.py

# task1.py
import syssys.path.append("/Users/wuzi/PycharmProjects/distributed_schedule")from distributed_schedule.tasks import Task, register_task
from distributed_schedule.client import mainimport timeclass TestTask(Task):def execute(self, *args, **kwargs):print("TestTask task {0}  {1}".format(args, kwargs))time.sleep(10)print("TestTask over ")class TestDTask(Task):def execute(self, *args, **kwargs):print("TestDTask task {0}  {1}".format(args, kwargs))time.sleep(1)raiseprint("TestDTask over ")register_task(TestTask("test_task"))
register_task(TestDTask("testd_task"))main()# task2.py
import syssys.path.append("/Users/wuzi/PycharmProjects/distributed_schedule")from distributed_schedule.tasks import Task, register_task
from distributed_schedule.client import mainimport timeclass Test2Task(Task):def execute(self, *args, **kwargs):print("Test2Task task {0}  {1}".format(args, kwargs))time.sleep(1)print("Test2Task over ")class TestD2Task(Task):def execute(self, *args, **kwargs):print("TestD2Task task {0}  {1}".format(args, kwargs))time.sleep(1)print("TestD2Task over ")register_task(Test2Task("test2_task"))
register_task(TestD2Task("test2d_task"))main()

此时我们通过类似与api的形式来调用该任务client_api.py

import sysfrom distributed_schedule.client import TaskClient, Client
from distributed_schedule.config import set_logging_levelsys.path.append("/Users/wuzi/PycharmProjects/distributed_schedule")set_logging_level(20)c = Client("client")
c.start()tclient = TaskClient(c)print("start")
res = tclient.submit("test_task", shard=5)
#
print(res)res = tclient.submit("test_task", mode="all")
print(res)res = tclient.submit("test2d_task", mode="once")
print(res)

此时我们先启动task1.py和task2.py

python task1.py --name clienttask1  --role workerpython task2.py --name clienttask2 --role workerpython task1.py  --name="clienttask3" --role worker

此时,就有两个客户端执行了task1.py中的任务,即test_task,testd_task任务有两个可以执行的客户端即clienttask1和clienttask3。此时执行client_api.py文件。

python client_api.pystart
('broad', 'status\t200\t')
('broad', 'status\t200\t')
('broad', 'status\t200\t')

此时终端任务的展示如下;

(venv) wuzideMacBook-Pro:local_work wuzi$ python task1.py --name clienttask1  --role worker
response ok
response ok
TestTask task ()  {'event_id': '06ac4d14-b1eb-47c7-ab00-535a7587f6eb', 'status': 'doing', 'client_event_id': 'a677403a-a5e2-42b1-b61b-00940388917d', 'item': '0'}
TestTask task ()  {'event_id': '06ac4d14-b1eb-47c7-ab00-535a7587f6eb', 'status': 'doing', 'client_event_id': 'a677403a-a5e2-42b1-b61b-00940388917d', 'item': '2'}
TestTask task ()  {'event_id': '06ac4d14-b1eb-47c7-ab00-535a7587f6eb', 'status': 'doing', 'client_event_id': 'a677403a-a5e2-42b1-b61b-00940388917d', 'item': '4'}
TestTask task ()  {'event_id': '4e8297c1-80ba-4c40-bb84-392849169efa', 'status': 'doing', 'client_event_id': '79d3a627-e5eb-4869-8136-c995b502c1f9'}
TestTask over
TestTask over
TestTask over
TestTask over 
(venv) wuzideMacBook-Pro:local_work wuzi$ python task2.py --name clienttask2 --role worker
response ok
response ok
TestD2Task task ()  {'event_id': 'bd874874-7e44-4ea0-9cfd-0d41024d807a', 'status': 'doing', 'client_event_id': 'eb54c8bf-c979-4f2f-912b-03edbb6c7b14'}
TestD2Task over 
(venv) wuzideMacBook-Pro:local_work wuzi$ python task1.py  --name="clienttask3" --role worker
response ok
response ok
TestTask task ()  {'event_id': '06ac4d14-b1eb-47c7-ab00-535a7587f6eb', 'status': 'doing', 'client_event_id': 'e31ccc68-f0dc-47e0-bfa8-cdf4a9de0003', 'item': '1'}
TestTask task ()  {'event_id': '06ac4d14-b1eb-47c7-ab00-535a7587f6eb', 'status': 'doing', 'client_event_id': 'e31ccc68-f0dc-47e0-bfa8-cdf4a9de0003', 'item': '3'}
TestTask task ()  {'event_id': '4e8297c1-80ba-4c40-bb84-392849169efa', 'status': 'doing', 'client_event_id': 'd804769d-0844-4cff-aa41-beb254bb84e7'}
TestTask over
TestTask over
TestTask over 

从终端输出结果可以看出,test_task被分片执行了五次,分别为clienttask1中的分片0,2,4和clienttask3中的分片1,3;在test_task通知所有执行的过程中,clienttask1和clienttask3分别执行了一次,最后调用了一次test2d_task任务,该任务就执行了一次。

总结

本文只是简单编写的第一版本的分布式任务,项目上传到了github传送门,由于本人水平有限,编写过程中会出现不少错误与问题,并且代码设计与编写并不算规范,希望多多批评。本项目测试用例并没有编写完成,所以后续会继续补上,并且第一版架构相对简单,存在服务单点问题,并且数据并没有做持久化,后续会会慢慢规划上。由于本人才疏学浅,如有错误请批评指正。

分布式任务调度系统V1相关推荐

  1. 【Python】轻量级分布式任务调度系统-RQ

    一 前言       Redis Queue 一款轻量级的P分布式异步任务队列,基于Redis作为broker,将任务存到redis里面,然后在后台执行指定的Job.就目前而言有三套成熟的工具cele ...

  2. 这些优秀的国产分布式任务调度系统,你用过几个?

    2019独角兽企业重金招聘Python工程师标准>>> 分布式调度在互联网企业中占据着十分重要的作用,尤其是电子商务领域,由于存在数据量大.高并发的特点,对数据处理的要求较高,既要保 ...

  3. 探寻繁杂定时任务的解决方案:分布式任务调度系统

    导语:本文我们从架构和技术实现上来为大家讲解腾讯云分布式任务调度系统TCT(Tencent Cloud Task)如何实现任务调度的精准实时.稳定高效,以及任务的切分和编排.(编辑:中间件小Q妹) 0 ...

  4. 分布式任务调度系统-定时任务的解决方案

    导语:在前面我们讲过了阿里云分布式任务调度平台,今天我们从架构和技术实现上来为大家讲解腾讯云分布式任务调度系统TCT(Tencent Cloud Task)如何实现任务调度的精准实时.稳定高效,以及任 ...

  5. 赫拉(hera)分布式任务调度系统之项目启动(二)

    文章目录 赫拉 创建表 打包部署 测试 TIPS 加入群聊 赫拉 大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,shell脚本调度.怎么样让大量的ETL任务准确 ...

  6. 赫拉(hera)分布式任务调度系统

    相关介绍 赫拉(hera)分布式任务调度系统之架构,基本功能(一) 赫拉(hera)分布式任务调度系统之项目启动(二) 赫拉(hera)分布式任务调度系统之开发中心(三) 赫拉(hera)分布式任务调 ...

  7. 开源分布式任务调度系统就选它!

    分布式任务调度这个话题是每个后端开发和大数据开发都会接触的话题.因为应用场景的广泛,所以有很多开源项目专注于解决这类问题,比如我们熟知的xxl-job. 那么今天要给大家推荐的则是另一个更为强大的开源 ...

  8. 一款你不得不了解的轻量级分布式任务调度系统

    CronMan 分布式任务调度/定时任务系统 github地址:CronMan, 欢迎star 欢迎朋友们站内私信交流~ 简介 CronMan是一款轻量级的分布式任务调度系统.随着微服务化架构的逐步演 ...

  9. 赫拉(hera)分布式任务调度系统之开发中心(三)

    文章目录 赫拉 简介 目录介绍 创建一个脚本 执行选中的代码 上传资源 同步任务 脚本自动保存 加入群聊 赫拉 大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,s ...

最新文章

  1. Nacos源码心跳异常检测
  2. 六种常用的物联网通信协议
  3. 挖矿区块链_使用Java语言从零开始创建区块链
  4. 迪捷软件团队研发的国产替代MBSE系统建模仿真软件
  5. 面试官:高并发下重启服务,接口调用老是超时,你有什么解决办法?
  6. Oracle加快终止对以往Java版本的免费支持期
  7. spring boot入门学习---热部署
  8. 程序在发布前就应该发现的一些错误
  9. Activity是如何挂载Pargment的Day35
  10. bzoj 1603 打谷机
  11. Eclipse自动生成返回值对象
  12. 简述几种常用编码器协议
  13. smplayer变成电视操作步骤
  14. 全国地区 mysql表_2017全国省市区数据库-2017全国省市区数据库下载 官方版 - 河东下载站...
  15. 博微三维技术篇【八】——智能CAD识别
  16. DTOI 10.25 测试 T3 雪人
  17. 能上QQ不能上网的解决方法
  18. 监控摄像机安装的正确位置是哪里
  19. Secret的三种形式
  20. UndoManager教程

热门文章

  1. 深度学习上的又一重点发现——利用MSCNN实现人群密度监测
  2. 超星未来发布新一代高级别自动驾驶车载计算平台
  3. 360数科张家兴:如何突破三大瓶颈,破解金融科技发展难题?
  4. 用 Java 训练深度学习模型,原来这么简单
  5. 刘群:华为诺亚方舟NLP预训练模型工作的研究与应用 | AI ProCon 2019
  6. 痛!“做C#半年,挣的不如做AI 1个月?”看到第二句泪目……
  7. 70个NumPy分级练习:用Python一举搞定机器学习矩阵运算
  8. 监控告警满飞天,运维在家睡到自然醒...
  9. 搞了半天,终于弄懂了TCP Socket数据的接收和发送,太难~
  10. 深入理解Spring的ImportSelector接口