<?php
/*
* memcache队列类
* 支持多进程并发写入、读取
* 边写边读,AB面轮值替换
* @author lkk/lianq.net
* @create on 9:25 2012-9-28
*
* @example:
$obj = new memcacheQueue('duilie');
$obj->add('1asdf');
$obj->getQueueLength();
$obj->read(11);
$obj->get(8);
*/
class memcacheQueue{
public static   $client;            //memcache客户端连接
public          $access;            //队列是否可更新
private         $currentSide;       //当前轮值的队列面:A/B
private         $lastSide;          //上一轮值的队列面:A/B
private         $sideAHead;         //A面队首值
private         $sideATail;         //A面队尾值
private         $sideBHead;         //B面队首值
private         $sideBTail;         //B面队尾值
private         $currentHead;       //当前队首值
private         $currentTail;       //当前队尾值
private         $lastHead;          //上轮队首值
private         $lastTail;          //上轮队尾值
private         $expire;            //过期时间,秒,1~2592000,即30天内;0为永不过期
private         $sleepTime;         //等待解锁时间,微秒
private         $queueName;         //队列名称,唯一值
private         $retryNum;          //重试次数,= 10 * 理论并发数
const   MAXNUM      = 10;                 //(单面)最大队列数,建议上限10K
const   HEAD_KEY    = '_lkkQueueHead_';     //队列首key
const   TAIL_KEY    = '_lkkQueueTail_';     //队列尾key
const   VALU_KEY    = '_lkkQueueValu_';     //队列值key
const   LOCK_KEY    = '_lkkQueueLock_';     //队列锁key
const   SIDE_KEY    = '_lkkQueueSide_';     //轮值面key
/*
* 构造函数
* @param   [config]    array   memcache服务器参数
* @param   [queueName] string  队列名称
* @param   [expire]    string  过期时间
* @return  NULL
*/
public function __construct($queueName ='',$expire='',$config =''){
if(empty($config)){
self::$client = memcache_pconnect('localhost',11211);
}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
self::$client = memcache_pconnect($config['host'],$config['port']);
}elseif(is_string($config)){//"127.0.0.1:11211"
$tmp = explode(':',$config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'],$conf['port']);
}
if(!self::$client) return false;
ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
set_time_limit(0);//取消脚本执行延时上限
$this->access = false;
$this->sleepTime = 1000;
$expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
$this->expire = $expire;
$this->queueName = $queueName;
$this->retryNum = 10000;
$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
$this->getHeadNTail($queueName);
if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
}
/*
* 获取队列首尾值
* @param   [queueName] string  队列名称
* @return  NULL
*/
private function getHeadNTail($queueName){
$this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
$this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
$this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
$this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
}
/*
* 获取当前轮值的队列面
* @return  string  队列面名称
*/
public function getCurrentSide(){
$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
if($currentSide == 'A'){
$this->currentSide = 'A';
$this->lastSide = 'B';
$this->currentHead   = $this->sideAHead;
$this->currentTail   = $this->sideATail;
$this->lastHead      = $this->sideBHead;
$this->lastTail      = $this->sideBTail;
}else{
$this->currentSide = 'B';
$this->lastSide = 'A';
$this->currentHead   = $this->sideBHead;
$this->currentTail   = $this->sideBTail;
$this->lastHead      = $this->sideAHead;
$this->lastTail      = $this->sideATail;
}
return $this->currentSide;
}
/*
* 队列加锁
* @return boolean
*/
private function getLock(){
if($this->access === false){
while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
usleep($this->sleepTime);
@$i++;
if($i > $this->retryNum){//尝试等待N次
return false;
break;
}
}
return $this->access = true;
}
return false;
}
/*
* 队列解锁
* @return NULL
*/
private function unLock(){
memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
$this->access = false;
}
/*
* 添加数据
* @param   [data]  要存储的值
* @return  boolean
*/
public function add($data){
$result = false;
if(!$this->getLock()){
return $result;
}
$this->getHeadNTail($this->queueName);
$this->getCurrentSide();
if($this->isFull()){
$this->unLock();
return false;
}
if($this->currentTail < self::MAXNUM){
$value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
$this->changeTail();
$result = true;
}
}else{//当前队列已满,更换轮值面
$this->unLock();
$this->changeCurrentSide();
return $this->add($data);
}
$this->unLock();
return $result;
}
/*
* 取出数据
* @param   [length]    int 数据的长度
* @return  array
*/
public function get($length=0){
if(!is_numeric($length)) return false;
if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
if(!$this->getLock()) return false;
if($this->isEmpty()){
$this->unLock();
return false;
}
$keyArray   = $this->getKeyArray($length);
$lastKey    = $keyArray['lastKey'];
$currentKey = $keyArray['currentKey'];
$keys       = $keyArray['keys'];
$this->changeHead($this->lastSide,$lastKey);
$this->changeHead($this->currentSide,$currentKey);
$data   = @memcache_get(self::$client, $keys);
foreach($keys as $v){//取出之后删除
@memcache_delete(self::$client, $v, 0);
}
$this->unLock();
return $data;
}
/*
* 读取数据
* @param   [length]    int 数据的长度
* @return  array
*/
public function read($length=0){
if(!is_numeric($length)) return false;
if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
$keyArray   = $this->getKeyArray($length);
$data   = @memcache_get(self::$client, $keyArray['keys']);
return $data;
}
/*
* 获取队列某段长度的key数组
* @param   [length]    int 队列长度
* @return  array
*/
private function getKeyArray($length){
$result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
$this->getHeadNTail($this->queueName);
$this->getCurrentSide();
if(empty($length)) return $result;
//先取上一面的key
$i = $result['lastKey'] = 0;
for($i=0;$i<$length;$i++){
$result['lastKey'] = $this->lastHead + $i;
if($result['lastKey'] >= $this->lastTail) break;
$result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
}
//再取当前面的key
$j = $length - $i;
$k = $result['currentKey'] = 0;
for($k=0;$k<$j;$k++){
$result['currentKey'] = $this->currentHead + $k;
if($result['currentKey'] >= $this->currentTail) break;
$result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
}
return $result;
}
/*
* 更新当前轮值面队列尾的值
* @return  NULL
*/
private function changeTail(){
$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
//memcache_increment(self::$client, $tail_key, 1);//队列尾+1
$v = memcache_get(self::$client, $tail_key) +1;
memcache_set(self::$client, $tail_key,$v,false,$this->expire);
}
/*
* 更新队列首的值
* @param   [side]      string  要更新的面
* @param   [headValue] int     队列首的值
* @return  NULL
*/
private function changeHead($side,$headValue){
if($headValue < 1) return false;
$head_key = $this->queueName .$side . self::HEAD_KEY;
$tail_key = $this->queueName .$side . self::TAIL_KEY;
$sideTail = memcache_get(self::$client, $tail_key);
if($headValue < $sideTail){
memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
}elseif($headValue >= $sideTail){
$this->resetSide($side);
}
}
/*
* 重置队列面,即将该队列面的队首、队尾值置为0
* @param   [side]  string  要重置的面
* @return  NULL
*/
private function resetSide($side){
$head_key = $this->queueName .$side . self::HEAD_KEY;
$tail_key = $this->queueName .$side . self::TAIL_KEY;
memcache_set(self::$client, $head_key,0,false,$this->expire);
memcache_set(self::$client, $tail_key,0,false,$this->expire);
}
/*
* 改变当前轮值队列面
* @return  string
*/
private function changeCurrentSide(){
$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
if($currentSide == 'A'){
memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
$this->currentSide = 'B';
}else{
memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
$this->currentSide = 'A';
}
return $this->currentSide;
}
/*
* 检查当前队列是否已满
* @return  boolean
*/
public function isFull(){
$result = false;
if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
$result = true;
}
return $result;
}
/*
* 检查当前队列是否为空
* @return  boolean
*/
public function isEmpty(){
$result = true;
if($this->sideATail > 0 || $this->sideBTail > 0){
$result = false;
}
return $result;
}
/*
* 获取当前队列的长度
* 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
* @return  int
*/
public function getQueueLength(){
$this->getHeadNTail($this->queueName);
$this->getCurrentSide();
$sideALength = $this->sideATail - $this->sideAHead;
$sideBLength = $this->sideBTail - $this->sideBHead;
$result = $sideALength + $sideBLength;
return $result;
}
/*
* 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
* @return  boolean
*/
public function clear(){
if(!$this->getLock()) return false;
for($i=0;$i<self::MAXNUM;$i++){
@memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);
@memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
}
$this->unLock();
$this->resetSide('A');
$this->resetSide('B');
return true;
}
/*
* 清除所有memcache缓存数据
* @return  NULL
*/
public function memFlush(){
memcache_flush(self::$client);
}
}
?>

简单实现memcache队列函数,A/B块切换

<?php
include_once("memcacheQueue.php");
$obj = new memcacheQueue('SSS');
var_dump($obj->getCurrentSide());
//exit;
for($i=0;$i<30;$i++){
$v1=$obj->add('中国'."**".$i);
}
//var_dump($v1);
$v2=$obj->getQueueLength();
var_dump($v2);
$v3=$obj->read(30);
var_dump($v3);
$v4=$obj->get(8);
var_dump($v4);
$v5=$obj->read(15);
var_dump($v5);
$obj->clear();
?>

结果:

string 'A' (length=1)
int 20
array
'SSSA_lkkQueueValu_0' => string '中国**0' (length=7)
'SSSA_lkkQueueValu_1' => string '中国**1' (length=7)
'SSSA_lkkQueueValu_2' => string '中国**2' (length=7)
'SSSA_lkkQueueValu_3' => string '中国**3' (length=7)
'SSSA_lkkQueueValu_4' => string '中国**4' (length=7)
'SSSA_lkkQueueValu_5' => string '中国**5' (length=7)
'SSSA_lkkQueueValu_6' => string '中国**6' (length=7)
'SSSA_lkkQueueValu_7' => string '中国**7' (length=7)
'SSSA_lkkQueueValu_8' => string '中国**8' (length=7)
'SSSA_lkkQueueValu_9' => string '中国**9' (length=7)
'SSSB_lkkQueueValu_0' => string '中国**10' (length=8)
'SSSB_lkkQueueValu_1' => string '中国**11' (length=8)
'SSSB_lkkQueueValu_2' => string '中国**12' (length=8)
'SSSB_lkkQueueValu_3' => string '中国**13' (length=8)
'SSSB_lkkQueueValu_4' => string '中国**14' (length=8)
'SSSB_lkkQueueValu_5' => string '中国**15' (length=8)
'SSSB_lkkQueueValu_6' => string '中国**16' (length=8)
'SSSB_lkkQueueValu_7' => string '中国**17' (length=8)
'SSSB_lkkQueueValu_8' => string '中国**18' (length=8)
'SSSB_lkkQueueValu_9' => string '中国**19' (length=8)
array
'SSSA_lkkQueueValu_0' => string '中国**0' (length=7)
'SSSA_lkkQueueValu_1' => string '中国**1' (length=7)
'SSSA_lkkQueueValu_2' => string '中国**2' (length=7)
'SSSA_lkkQueueValu_3' => string '中国**3' (length=7)
'SSSA_lkkQueueValu_4' => string '中国**4' (length=7)
'SSSA_lkkQueueValu_5' => string '中国**5' (length=7)
'SSSA_lkkQueueValu_6' => string '中国**6' (length=7)
'SSSA_lkkQueueValu_7' => string '中国**7' (length=7)
array
'SSSA_lkkQueueValu_8' => string '中国**8' (length=7)
'SSSA_lkkQueueValu_9' => string '中国**9' (length=7)
'SSSB_lkkQueueValu_0' => string '中国**10' (length=8)
'SSSB_lkkQueueValu_1' => string '中国**11' (length=8)
'SSSB_lkkQueueValu_2' => string '中国**12' (length=8)
'SSSB_lkkQueueValu_3' => string '中国**13' (length=8)
'SSSB_lkkQueueValu_4' => string '中国**14' (length=8)
'SSSB_lkkQueueValu_5' => string '中国**15' (length=8)
'SSSB_lkkQueueValu_6' => string '中国**16' (length=8)
'SSSB_lkkQueueValu_7' => string '中国**17' (length=8)
'SSSB_lkkQueueValu_8' => string '中国**18' (length=8)
'SSSB_lkkQueueValu_9' => string '中国**19' (length=8)

转载于:https://blog.51cto.com/taoshi/1254372

memcacheQueue队列相关推荐

  1. java memcache 队列_基于memcache的java分布式队列实现。

    主要有两个类,一个队列类和一个job的抽象类. 保证队列类中的key的唯一性,就可以用spring配置多个实例.水平有限,欢迎吐槽. 上代码: 1.队列类 import net.spy.memcach ...

  2. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  3. Redis 笔记(04)— list类型(作为消息队列使用、在列表头部添加元素、尾部删除元素、查看列表长度、遍历指定列表区间元素、获取指定区间列表元素、阻塞式获取列表元素)

    Redis 的列表是链表而不是数组.这意味着 list 的插入和删除操作非常快,时间复杂度为 O(1),但是索引定位很慢,时间复杂度为 O(n). 当列表弹出了最后一个元素之后,该数据结构自动被删除, ...

  4. 翻转二叉树 c语言实现 递归 栈 队列

    前言 题目比较好理解,就是翻转二叉树 代码 c语言实现 #include<stdio.h> #include<stdlib.h> #include<string.h> ...

  5. 队列:实用程序服务和数据结构

    队列:实用程序服务和数据结构 Queues: utility services and data structures 队列实用程序服务 Nucleus RTOS有四个API调用,它们提供与队列相关的 ...

  6. 2021年大数据Kafka(一):❤️消息队列和Kafka的基本介绍❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 消息队列和Kafka的基本介绍 一.什么是消息队列 二.消息队列的应用场景 ...

  7. Laravel/Lumen 使用 Redis队列

    一.概述 在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件.秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行可以有效缓解系统压力.提高系统响应速度和负载能力. 二.配 ...

  8. Laravel7中Redis队列的使用

    一.配置文件 首先我们需要在配置文件中配置默认队列驱动为Redis,队列配置文件是config/queue.php: return ['default' => env('QUEUE_DRIVER ...

  9. java多线程消息队列_java多线程消息队列的实现

    1.定义一个队列缓存池: private static List queueCache = new LinkedList(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该 ...

  10. C++ 双端队列(deque)的使用

    双端队列(deque)是什么 双端队列(deque)是一种随机访问的数据类型,提供了在序列两端快速插入和删除的功能,deque类似于vector, 双端队列(deque)属于STL(Standard ...

最新文章

  1. 虚拟机生命周期八招巧管理
  2. 机房承重标准及承重计算方法
  3. 浅谈Android系统进程间通信(IPC)机制Binder中的Server和Client获得Service Manager接口之路
  4. oracle日志表设计,数据库设计 – 数据库日志表结构
  5. layui轮播图切换会有跳动_Layui中轮播图切换函数说明
  6. 厉害了!牛顿法深度学习优化器,效果比肩SGD和Adam
  7. 我的设计模式之旅(1)——学习的原则和一些笔记
  8. apscheduler 任务管理
  9. 剑指offer系列48---左旋转字符串
  10. hdu 1693 Eat the Trees 插头dp
  11. Java的结构之美【2】——销毁对象
  12. Google docs/slides的下载
  13. keil+proteus 制作计算器_设计费 | 工程设计费计算器使用指南
  14. 配置文件 ini toml yaml 以及 json对比
  15. 天正坐标标注怎么不显示_cad中坐标标注怎么显示不了xy的
  16. JAVA空间换时间以及时间换空间的例子
  17. 【.NET IoT】把达特甲醛传感器DART WZ-S接到树莓派RaspberryPi 3 b+上
  18. 2022年氧化工艺考试练习题模拟考试平台操作
  19. ERP中Bom的替代料
  20. ADC0809芯片简介

热门文章

  1. 应用泛函分析—距离空间
  2. 新版edge找不到internet选项
  3. 【Python蒙特卡罗算法】
  4. 深度学习之 7 深度前馈网络
  5. 蝙蝠聊天软件显示无法连接服务器失败,蝙蝠聊天软件为什么没有来信息提示音?...
  6. vue 自动播放视频
  7. html 判断undefined,JS中 “is not defined” 如何判断defined,defined和undefined 的区别
  8. win10文件资源管理器默认打开我的电脑及左侧导航设置
  9. 路由器,猫,交换机的区别
  10. 第15届“开源中国开源世界”高峰论坛成功举办,腾讯获得重要奖项