主要有两个类,一个队列类和一个job的抽象类。

保证队列类中的key的唯一性,就可以用spring配置多个实例。水平有限,欢迎吐槽。

上代码:

1、队列类

import net.spy.memcached.MemcachedClient;

import net.spy.memcached.internal.OperationFuture;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import com.izx.services.common.Constant;

/**

*

* @ClassName: MemCacheQueue

* @Description: 基于memcache的消息队列的实现

* @author hai.zhu

* @date 2016-3-31 下午3:29:00

*

*/

public class MemCacheQueue implements InitializingBean, DisposableBean,ApplicationContextAware {

private static final Log log = LogFactory.getLog(MemCacheQueue.class);

/**

* 队列名

*/

private String key;

/**

* 队列锁失效分钟

*/

private Integer lockExpireMinite = 3;

private MemcachedClient memcachedClient;

private ApplicationContext applicationContext;

ListenerThread listenerThread = new ListenerThread();

public void setKey(String key) {

this.key = key;

}

public void setMemcachedClient(MemcachedClient memcachedClient) {

this.memcachedClient = memcachedClient;

}

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

@Override

public void destroy() throws Exception {

try {

this.sign = false;

listenerThread.interrupt();

} catch (Exception e) {

log.error(e);

}

}

@Override

public void afterPropertiesSet() throws Exception {

//初始化队列,用add防止重启覆盖

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 0, "0");

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 0, "0");

//设置任务线程

listenerThread.setDaemon(true);

listenerThread.start();

}

/**

*

* @Title: push

* @Description: 唯一对外方法,放入要执行的任务

* @param @param value

* @param @throws Exception    设定文件

* @return void    返回类型

* @throws

*/

public synchronized void push(MemCacheQueueJobAdaptor value) throws Exception {

//分布加锁

queuelock();

//放入队列

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 1);

Object keyorder = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);

memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorder, 0, value);

//分布解锁

queueUnLock();

}

/**

*

* @Title: pop

* @Description: 取出要执行的任务

* @param @return

* @param @throws Exception    设定文件

* @return MemCacheQueueJobAdaptor    返回类型

* @throws

*/

private synchronized MemCacheQueueJobAdaptor pop() throws Exception {

Object keyorderstart = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key);

Object keyorderend = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);

if(keyorderstart.equals(keyorderend)){

return null;

}

MemCacheQueueJobAdaptor adaptor = (MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 1);

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);

return adaptor;

}

/**

*

* @Title: queuelock

* @Description: 加锁

* @param @throws InterruptedException    设定文件

* @return void    返回类型

* @throws

*/

private void queuelock() throws Exception {

do {

OperationFuture sign = memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key, lockExpireMinite * 60, key);

if(sign.get()){

return;

} else {

log.debug("key: " + key + " locked by another business");

}

Thread.sleep(300);

} while (true);

}

/**

*

* @Title: queueUnLock

* @Description: 解锁

* @param     设定文件

* @return void    返回类型

* @throws

*/

private void queueUnLock() {

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key);

}

private boolean sign = true;

private long THREAD_SLEEP = 10;

class ListenerThread extends Thread {

@Override

public void run(){

log.error("队列["+key+"]开始执行");

while(sign){

try {

Thread.sleep(THREAD_SLEEP);

dojob();

} catch (Exception e) {

log.error(e);

}

}

}

private void dojob(){

try{

queuelock();

MemCacheQueueJobAdaptor adaptor = pop();

//逐个执行

if(adaptor != null){

THREAD_SLEEP = 10;

try {

adaptor.setApplicationContext(applicationContext);

adaptor.onMessage();

} catch (Exception e) {

log.error(e);

}

}else{

THREAD_SLEEP = 5000;

}

}catch(Exception e){

log.error(e);

}finally{

queueUnLock();

}

}

}

}[/code]

2、job抽象类

import org.springframework.context.ApplicationContext;

import java.io.Serializable;

/**

*

* @ClassName: MemCacheQueueJobAdaptor

* @Description: 基于memcache队列的任务适配器

* @author hai.zhu

* @date 2015-12-11 上午11:48:26

* @param

*/

public abstract class MemCacheQueueJobAdaptor implements Serializable{

private static final long serialVersionUID = -5071415952097756327L;

private ApplicationContext applicationContext;

public ApplicationContext getApplicationContext() {

return applicationContext;

}

public void setApplicationContext(ApplicationContext applicationContext) {

this.applicationContext = applicationContext;

}

/**

*

* @Title: onMessage

* @Description: 异步执行任务接口

* @author hai.zhu

* @param @param value 设定文件

* @return void 返回类型

* @throws

*/

public abstract void onMessage();

}[/code]

3、部分放在constant的常量

/**

* 基于memcache的队列存放前缀

*/

public static String MEMCACHE_GLOBAL_QUEUE_VARIABLE = "MEMCACHE_GLOBAL_QUEUE_VARIABLE_";

/**

* 基于memcache的队列锁的前缀

*/

public static String MEMCACHE_GLOBAL_QUEUE_LOCK = "MEMCACHE_GLOBAL_QUEUE_LOCK_";

/**

* 基于memcache的队列锁的开始元素

*/

public static String MEMCACHE_GLOBAL_QUEUE_STARTKEY = "MEMCACHE_GLOBAL_QUEUE_STARTKEY_";

/**

* 基于memcache的队列锁的结束元素

*/

public static String MEMCACHE_GLOBAL_QUEUE_ENDKEY = "MEMCACHE_GLOBAL_QUEUE_ENDKEY_";[/code]

4、spring配置,保证队列名的唯一性就可以配置多个队列

转载于:https://my.oschina.net/zhuxuan/blog/650935

java memcache 队列_基于memcache的java分布式队列实现。相关推荐

  1. java arp 攻击_基于Jpcap的Java ARP断网攻击

    这是大二学习计算机网络的时候写的一个小程序,可实现局域网内断网攻击.这也作为学习网络层.数据链路层(在OSI模型中ARP协议属于链路层:而在TCP/IP模型中,ARP协议属于网络层)的其中一个小实验吧 ...

  2. java 打印 发票_基于Excel和Java自动化:发票生成器

    对于销售人员,使用Excel创建发票是很常见的.但是该过程通常涉及许多容易出错的手动操作,例如输入数据,复制/粘贴等.如何实现一个可以将数据从数据库自动填充到发票Excel模板中,而无需再辛苦手动输入 ...

  3. java动物乐园_基于jsp的动物园管理系统-JavaEE实现动物园管理系统 - java项目源码...

    基于jsp+servlet+pojo+mysql实现一个javaee/javaweb的动物园管理系统, 该项目可用各类java课程设计大作业中, 动物园管理系统的系统架构分为前后台两部分, 最终实现在 ...

  4. java员工信息管理_基于jsp的员工信息管理-JavaEE实现员工信息管理 - java项目源码...

    基于jsp+servlet+pojo+mysql实现一个javaee/javaweb的员工信息管理, 该项目可用各类java课程设计大作业中, 员工信息管理的系统架构分为前后台两部分, 最终实现在线上 ...

  5. 基于java家教管理系统_基于jsp的家教信息管理-JavaEE实现家教信息管理 - java项目源码...

    基于jsp+servlet+pojo+mysql实现一个javaee/javaweb的家教信息管理, 该项目可用各类java课程设计大作业中, 家教信息管理的系统架构分为前后台两部分, 最终实现在线上 ...

  6. java订单类_基于Java创建一个订单类代码实例

    这篇文章主要介绍了基于Java创建一个订单类代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 需求描述 定义一个类,描述订单信息 订单id 订 ...

  7. java固定资产管理系统_基于jsp的固定资产管理系统-JavaEE实现固定资产管理系统 - java项目源码...

    基于jsp+servlet+pojo+mysql实现一个javaee/javaweb的固定资产管理系统, 该项目可用各类java课程设计大作业中, 固定资产管理系统的系统架构分为前后台两部分, 最终实 ...

  8. java udp 流量控制_基于UDP传输协议的实现分析之流量和拥塞控制

    UDP的概念 UDP 是User Datagram Protocol的简称, 中文名是用户数据报协议,是OSI(Open System Interconnection,开放式系统互联) 参考模型中一种 ...

  9. java数组实现队列_使用数组在Java中进行队列实现

    java数组实现队列 什么是队列? (What is a Queue?) Queue is a special type of data structure, which is designed to ...

最新文章

  1. Storybook 5.0正式发布:有史以来变化最大的版本\n
  2. 基于ONOS的T-SDN Super控制器
  3. 【PC工具】更新chrome谷歌浏览器最新离线安装版各种版本,最好用的浏览器没有之一...
  4. 最近总结——关于自己的基础问题
  5. html5学习笔记---03. Canvas简介,Canvas的使用方法
  6. centos7 文本编辑 不能移动光标_【200905】Linux系统的使用基础(CentOS 7)
  7. 用jquery给Struts2的s:radio /标签添加change事件
  8. ExtJS4系列目录
  9. 携程酒店自动化360度质量保障体系
  10. 机器人操作系统ROS—深度相机+激光雷达实现vSLAM建图与导航
  11. 百度云网页视频加速播放
  12. 基于机器学习的电信套餐个性化推荐模型的设计与实现
  13. 记一次App异常kill分析处理
  14. BadEncoder: Backdoor Attacks to Pre-trained Encoders in Self-Supervised Learning 论文笔记
  15. Linux服务器git clone卡住不动
  16. 转载自科技猿人:联想5G投票这事,我们就来彻底讲清楚
  17. SMBus与I2C的区别
  18. 【计算机视觉与深度学习】全连接神经网络(二)
  19. Vivado2021.2版本安装教程
  20. 二维离散变换由c语言编写,离散余弦变换(DCT)的DSP程序设计与实现

热门文章

  1. 蓝桥杯 2011年第二届C语言初赛试题(3)
  2. 汽车电子专业知识篇(三十二)-整车电控系统及架构设计技术
  3. excel实战应用案例100讲(十)-下载的文件显示“文件已损坏,无法打开”?
  4. mac设置首页访问php,mac系统下php项目除了首页全访问不了
  5. JSP + Struts + Hibernate + Spring+MySQL+Myeclipse实现固定资产管理系统
  6. 程序员赚钱资源汇总,结合自己亲身经历
  7. Error: Module “xxx“ does not exist in container. / antd pro v5启用qiankun报错 / 同时使用mfsu和qiankun报错
  8. 谈一谈Http Request 与 Http Response
  9. CSS光标cursor
  10. C#中用WebClient.UploadData 方法上载文件数据