java jqc_GitHub - autowang/jqc: 消息中间件
项目简介
JQC是一个Java写的分布式的拉取式的消息队列中间件,具备高并发性和可扩展性,主要用于解决C/S架构消息传递的问题。以HTTP接口的形式提供调用,最大限度的降低开发成本。
功能介绍
角色
JQC中存在4种角色
Producer:消息产生者,也就是用户。使用接口来产生消息。
JobTracker:调度中心,用于处理各种请求,包括创建消息、消费消息等等。JobTracker是平等的,可存在多个,支持任意扩展。
-** Admin**:管理员,用于监控集群状态,处理消息超时和回收机制、节点宕机报警等。当前版本Admin只能存在一个。
-TaskTracker:消费者,通过接口来拉取消息和反馈结果。
组件
JQC用到了3个组件:Nginx、Redis、ElasticSearch
Nginx:当前版本使用Nginx作为集群入口和请求转发、负载均衡。
Redis:Redis用于存储消息队列和节点信息,当前版本Redis采用了单节点的简单模式。
ElasticSearch:用于存储历史消息和查询。
支持功能
消息拉取模式:单向通信,TaskTracker根据自身需要主动向JobTracker请求拉取消息。这样TaskTracker可自己维护自己的负载情况。
异步请求:消息被创建后,Producer得到消息的id,并通过id来轮询消息的状态和结果。
优先级:支持0-4共5个不同的优先级(0最高)。
先进先出:同一优先级的消息先进先出。
自定义消息队列(Point类型):Producer指定消息存在在某个名称的队列中,TaskTracker可去该名称的队列中拉取消息。
定向分配(Compete类型):即TaskTracker的群组功能。举例:TaskTracker1和TaskTracker2属于GroupA,TaskTracker3和TaskTracker4属于GroupB。Producer在产生消息的时候可以指定该消息能被GroupA(支持多个Group)消费,则Tasktracker1和TaskTracker2都可以获取该消息,谁先来拉取,谁获得(消息仅能被消费1次)。
超时机制:JQC采取了最简单的消息超时机制:每条消息被消费后必须在指定时间(消息属性可设置)反馈结果,否则将被Admin检测之后回收处理(超时处理或重试处理)。
-** 重试机制**:消息可以设置重试次数,当超时后重新放回队列中等待被取走。
有效时间机制:消息被创建后可以设置有效时间,超过有效时间仍未被取走则会被废弃。
节点监控:TaskTracker有注册机制,并通过心跳信息维持自身可用性信息、JobTracker、Admin也会周期性的刷新自身状态。Admin可以监控到整个集群的状态,感知到节点宕机并通知。
使用方法
##1.创建JOB
前提说明
目前JQC创建JOB支持API方式直接创建,无需其他前提操作如登录,验证密码,token等。
Path :/job/create
Method:post
ParamType:FormParam(ContentType:application/x-www-form-urlencoded)
Params:
product:产品线名称,必填(比如:mvp,pluto)。
topic: job放置的队列名称,必填(比如:sports,music)。
type: job类型,缺省值为POINT(比如:POINT,COMPETE)。
priority:任务优先级,缺省值为1(0为最高优先级),范围0~4。
name:job名。
data:job携带的具体数据,必填.
expiration:过期时间(单位秒),从Job进入待分配队列开始算起,若被取走时已经超过了设置的过期时间,则不会被取走,被认定为已过期失效。缺省值为0,代表永不过期。
timeout:超时时间(单位秒),从Job被取走开始算起,若超过了超时时间仍未返回结果,则认定为执行超时,配合retryCount进行重试。缺省值为300秒。
sender:发送者(比如:mayaming)
topicGroup:当type为PUBLISH时的必填项,指定Job复制和放置到哪些topic群组所包含的topic队列里。
consumeGroup:当type为COMPETE时的必填项,指定Job定向分配给哪些消费者群组。
retryCount:当Job执行超时时,可重试的次数。缺省值为0,不重试。
info:用户自定义字段,留待后续开发使用。
Response:
{
"message": "OK",(请求的结果)
"status": true,(请求结果的状态)
"data": {
"jobId": "2a0988b7965d4d428eac08f98b33086e"(请求的结果中的数据)
}
}
返回值说明
status: 接口调用成功返回true,否则返回false
message: 接口调用请求的结果
data: 创建的JOB的uuid,也是该JOB的唯一标志字段
2.TASKTRACKER与JOBTRACKER通信
前提说明
拉取JOB和返回JOB结果之前,TASKTRACKER需要使用账号和密码进行注册操作(详见接口2.1)。
TASKTRACKER通过控制心跳中的参数来实现拉取JOB和返回JOB结果(详见接口2.2)
2.1 注册TASKTRACKER
Path :/taskTracker/register
Method:post
ParamType:json格式的String
Params:
{
"identity":"EPtest", TASKTRACKER账号,唯一
"passwd":"0385B200C56Fxxx", 密码
"version":"1.0.0", 该TASKTRACKER的版本号
"consumeGroup":"baidu", TASKTRACKER所属群组(拉取COMPETE类型JOB的话,是必填参数)。
"ip":"172.18.23.xx", 节点所属IP地址
"hostname":"BDSY00540", 节点所属hostname
"info":"" 节点的附加信息,注册后可修改
}
Response:
{
"message":"OK",
"status":true,
"data":"e71f4a81c0ee4641913c183e84a18347"
}
返回值说明
status: 接口调用成功返回true,否则返回false
message: 接口调用请求的结果
data: token信息,每次注册后JOBTRACKER将返回token信息,此后与JOBTRACKER通信只需要携带账号和token信息。但是如果超时未与JOBTRACKER通信,该TASKTRACKER将会被判定为掉线,此token也将会失效,即下次通信需要再次执行注册动作,获取新的token。
2.2 发送心跳,拉取任务与返回任务结果
Path :/taskTracker/heartbeat
Method:post
ParamType:json格式的String
Params: 目的不同,以下参数略有不同
心跳说明
在无需拉取任务也无需返回任务结果时,通过下面的参数与JOBTRACKER通信,保证该TASKTRACKER的在线状态。
{
"identity":"EPtest", 账号,必填
"passwd":"0385B200C56Fxxx",
"token":"5f5eac64b93d4df3a30599e602fae9e2",注册后返回的token信息,必填
"version":"1.0.0",
"consumeGroup":"baidu", 注册时填写的TASKTRACKER所属群组参数
"ip":"172.18.23.xx",
"hostname":"BDSY0054xxx",
"ackJobs":[],
"info":"xxx"
}
Response:
{
"message":"OK",
"status":true,
"data":[]
}
返回值说明
status: 接口调用成功返回true,否则返回false
message: 接口调用请求的结果
data: 如果没有请求拉取JOB,该字段为空,否则为请求的JOB数组。
拉取JOB说明
TASKTRACKER注册后,才可以拉取任务。
{
"identity":"EPtest", 账号,必填
"passwd":"0385B200C56Fxxx",
"token":"5f5eac64b93d4df3a30599e602fae9e2",注册后返回的token信息,必填
"product":"mvp", 期望从这个业务线拉取任务,必填
"topic":"sports", 获取的任务的topic,必填
"version":"1.0.0",
"consumeGroup":"baidu", 注册时填写的TASKTRACKER所属群组参数,拉取COMPETE类型的任务时候为必填
"ip":"172.18.23.xx",
"hostname":"BDSY0054xxx",
"isExclusive":false, 如果为true,则只请求属于该群组的任务;否则不限制。
"consumeType":"POINT", 本次请求的任务的类型,POINT(点到点)或COMPETE(竞争)
"consumeNum":2 期望拿到的JOB量
}
Response:
{
"message": "OK",
"status": true,
"data": [
{
"product": "mvp", 创建JOB时指定的产品线
"topic": "CompatibilityTest", 创建JOB时指定的topic
"priority": 1, 创建JOB时指定的优先级
"uuid": "d171d0966d7e4xxxccc6074daed", 即jobId
"type": "POINT", JOB的类型
"status": "consumed", JOB的状态。ready-已创建,还未被拉取;consumed-已被拉取;acknowledged-已经返回结果
"sendTimestamp": 1510300995173, 创建JOB的时间戳
"consumeTimestamp": 1510736526967, 拉取JOB的时间戳
"name": "test", 创建JOB时指定的名字
"data": "{xxxxxx}", 存储需要被执行的详情
"expiration": 0,
"timeout": 300000,
"sender": "xulinwei", 创建JOB的人
"consumer": "EPtest", 拉取JOB的人
"retryCount": 0,
"isDeleted": 0,
"oriTimeout": 300000
},{
"product": "mvp",
"topic": "CompatibilityTest",
"priority": 1,
"uuid": "d171d0966xxx3975ccc6074daed",
"type": "POINT",
"status": "consumed",
"sendTimestamp": 1510300995173,
"consumeTimestamp": 1510736526967,
"name": "test",
"data": "{xxxxxx}",
"expiration": 0,
"timeout": 300000,
"sender": "xulinwei",
"consumer": "EPtest",
"retryCount": 0,
"isDeleted": 0,
"oriTimeout": 300000
}
]
}
返回值说明
status: 接口调用成功返回true,否则返回false
message: 接口调用请求的结果
data: 如果通过心跳拉取JOB,则data字段是拉取的JOB清单,以数组方式存储。
返回JOB结果说明
TASKTRACKER注册后,才可以返回任务结果。
{
"identity":"EPtest", 账号,必填
"passwd":"0385B200C56Fxxx",
"token":"5f5eac64b93d4df3a30599e602fae9e2",注册后返回的token信息,必填
"version":"1.0.0",
"consumeGroup":"baidu", 注册时填写的TASKTRACKER所属群组参数
"ip":"172.18.23.xx",
"hostname":"BDSY0054xxx",
"ackJobs":[
{"uuid":"1721f436131e4xxxa1888326eae","result":"xxxxxxxx"},
{"uuid":"835ef436fbe04xxx988c25732f3","result":"12345654321"}
],
"info":"xxx"
}
Response:
{
"message":"OK",
"status":true,
"data":[]
}
返回值说明
status: 接口调用成功返回true,否则返回false
message: 接口调用请求的结果
data: 如果没有请求拉取JOB,该字段为空,否则为请求的JOB数组。
代码结构
com.baidu.ecomqaep.schedule.admin
AdminHeartbeatThread:Admin节点专用发送心跳线程
CheckJobThread:监控是否有超时JOB
CheckNodeThread:监控是否有超时TASKTRACKER和JOBTRACKER
InsertEsThread:将已完成JOB从Redis刷进Es
com.baidu.ecomqaep.schedule.base
AckEntity: JOB返回结果中ackJobs字段的基本单位
ConsumeEntity:拉取任务请求的基本单位
JobEntity:JOB的数据结构
JobResultEntity:查询JOB接口的返回的数据结构
NodeEntity:节点(包括TASKTRACKER,JOBTRACKER,ADMIN)的数据结构
ResultType:接口返回的数据结构。
StatusCode:ResultType中statusCode字段的库。
TaskTracker2JobTrackerModel:TASKTRACKER与JOBTRACKER通信的数据结构
TopicGroupEntity:订阅topicGroup所需的数据结构
com.baidu.ecomqaep.schedule.config
BaseConfiguration:参数相关类
Config:固定参数配置,包括redis地址,es地址等
ConfigInterface: 参数相关类
Constants:固定参数
com.baidu.ecomqaep.schedule.jobTracker
JobTrackerHeartBeatThread:节点专用发送心跳线程
com.baidu.ecomqaep.schedule.listner
ApplicationContextAwareListner:spring启动类
com.baidu.ecomqaep.schedule.log
LogFormat:日志模板文件
com.baidu.ecomqaep.schedule.manager
AdminManager:Admin逻辑处理类
EsManager:ES处理类
JobManager:消息处理类
NodeManagre:节点处理类
RedisManager:redis处理类
TopicManager:群组处理类
com.baidu.ecomqaep.schedule.util
IdUtil:唯一性id生成工具类
MailUtil:邮件工具类
Md5Util:MD5工具类
ShortMessageUtil:短信工具类
com.baidu.ecomqaep.schedule.web.action
AdminAction: Admin角色相关的接口
BaseAction: 接口基类
BulletinAction: Redis存储能力开放接口
JobAction:消息处理相关接口
TaskTrackAction:taskTracker相关接口
com.baidu.ecomqaep.schedule.web.filter
RequestFilter:请求过滤类
ResponseFilter:响应过滤类
版本记录
发布时间
版本号
发布内容
20171115
1.0.0
开源
java jqc_GitHub - autowang/jqc: 消息中间件相关推荐
- Java Review - Java进程内部的消息中间件_Event Bus设计模式
文章目录 概述 EventBus架构类图 Code Bus接口 (定义注册topic以及发送event接口) 自定义注解-回调方法及topic 同步EventBus 异步EventBus Subscr ...
- java开发培训中消息中间件的优势有哪些
系统解耦 交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低. 提高系统响应时间 例如原来的一套逻辑,完成支付可能涉及先修改订单状态.计算会员积分.通知物流配送几个逻辑才能完 ...
- java 中间件介绍_java消息中间件的使用与简介
一.为什么要使用消息中间件 消息中间件就是可以省去繁琐的步骤,直达目的,怎么讲呢,就是比如你想很多人,知道你的动态,而知道的人可能手机没电,可能手机信号不好,可能手机不在服务区,或者看的人比较忙,看的 ...
- JAVA面试系列(二)消息中间件
本人目前项目中有用到rocketMQ作为数据传输过程中的中间件,今天整理了一下资料,记录一下自己对消息中间件的理解. 一.概念 1.MQ message queue消息队列,消息队列是就是队列,是一种 ...
- java分布式 mq_分布式系统消息中间件—RabbitMQ的使用进阶篇
前言: 这篇文章主要总结一下RabbitMQ在日常项目开发中比较常用的几个特性. 一. mandatory 参数 上一篇文章中我们知道,生产者将消息发送到RabbitMQ的交换器中通过RoutingK ...
- java rabbitmq 并发_RabbitMQ消息中间件 高级篇二 高并发情况下保障消息投递可靠性...
RabbitMQ消息中间件技术精讲9 高级篇二 高并发场景下,消息的延迟投递做二次确认进行回调检查来保障生产者消息投递成功的可靠性 在上一篇文章中,我们介绍了BAT大厂中一种方式保障生成者消息投递可靠 ...
- Java消息中间件(activeMQ)
文章目录 **第一章 消息中间件概述** 1. 消息中间件的好处 2. 什么是消息中间件 3. 什么是JMS(规范) 4. 什么是AMQP(协议) 5. 几个常用消息中间对比 **第二章 初始JMS* ...
- Github 一夜爆火:这份金九银十 Java 面试手册我给跪了
这几天给筒子们整理了一份<Java面试手册>,106页,目前大约6万字左右,初衷也很简单,就是希望在面试的时候能够帮助到大家,减轻大家的负担和节省时间. 废话不多说,本手册目前为第一版,后 ...
- 如何基于消息中间件实现分布式事务?万字长文给你答案!!
写在前面 最近小伙伴们的要求越来越高,学完设计模式学高并发,学完高并发又想学Java8新特性,学完Java8新特性又要学Spring,学着Spring又让我整理一篇关于分布式事务的文章,而且还提出了要 ...
最新文章
- Jsp—02—项目:登录案例
- C语言再学习 -- 数组和指针
- Maven发布工程到私服
- Python获取两个列表list的不同之处
- ++ba--运算结果解析
- c语言较大的整型相加,二个超长正整数的相加
- 【kafka】kafka 新增节点 报错 InconsistentBrokerIdException Configured broker.id doesn‘t match
- StakeDAO新增Sushiswap流动性奖励计划
- php memcache技术,Memcache操作类如何在PHP中使用
- Illustrator 教程,了解 AI 中的绘图工具
- thinkphp5.x之Collection(集合)解析 php集合
- Axure 9 实战案例,动态面板的应用 3,京东的拖动拼图登录验证
- 文件夹批量重命名排序递增的方法
- 解决宿舍路由器校园网共享登陆问题
- Google创始人---谢尔盖-布林
- 微信转盘抽奖前端源码(三):移动端浏览器兼容性(12个奖品,指针开始时指向奖品)
- 如何用Python搭建一个36Kr网站|高手4步带你入门
- 【Azure】微软 Azure 基础解析(三)描述云计算运营中的 CapEx 与 OpEx,如何区分 CapEx 与 OpEx
- 传统货币支付与电子支付的简要比较
- AD936x+ZYNQ搭建收音机(二)含视频演示
热门文章
- 企业高管该看什么书?《CEO必读12篇》了解下!
- pycharm编译的时候报错KeyError: ‘XXX_XXX_XXX‘
- gitlab hostname修改
- PyCharm社区版搭建Django框架
- C语言:使用for语句计算n的阶乘
- 异步fifo的设计与验证
- 有哪些真实的兵棋推演模拟训练仿真系统软件
- 利用GEE(Google Earth Engine)在线处理NDVI、EVI、SAVI、NDMI等指数归一化教程!
- 王者荣耀4月3号服务器维护,王者荣耀新英雄云中君上线 体验服4月3日更新公告...
- Win10基于python,spleeter 人声提取工具安装和使用(全网最全,超详细)