现在,memcache于server缓存广泛应用。下面我来介绍一下memcache消息队列中等待的样本实现,有需要了解的朋友可以参考。

memche消息队列原则key上做文章。后消息或者日志。

然后通过定时程序将内容落地到文件或者数据库。

php实现消息队列的用处比方在做发送邮件时发送大量邮件非常费时间的问题。那么能够採取队列。

方便实现队列的轻量级队列server是:
starling支持memcache协议的轻量级持久化server
https://github.com/starling/starling
Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列
http://kr.github.com/beanstalkd/
php中也能够使用memcache/memcached来实现消息队列。

<?

php /** * Memcache 消息队列类 */ class QMC { const PREFIX = 'ASDFASDFFWQKE'; /** * 初始化mc * @staticvar string $mc * @return Memcache */ static private function mc_init() { static $mc = null; if (is_null($mc)) { $mc = new Memcache; $mc->connect('127.0.0.1', 11211); } return $mc; } /** * mc 计数器,添加计数并返回新的计数 * @param string $key 计数器 * @param int $offset 计数增量,可为负数.0为不改变计数 * @param int $time 时间 * @return int/false 失败是返回false,成功时返回更新计数器后的计数 */ static public function set_counter( $key, $offset, $time=0 ){ $mc = self::mc_init(); $val = $mc->get($key); if( !is_numeric($val) || $val < 0 ){ $ret = $mc->set( $key, 0, $time ); if( !$ret ) return false; $val = 0; } $offset = intval( $offset ); if( $offset > 0 ){ return $mc->increment( $key, $offset ); }elseif( $offset < 0 ){ return $mc->decrement( $key, -$offset ); } return $val; } /** * 写入队列 * @param string $key * @param mixed $value * @return bool */ static public function input( $key, $value ){ $mc = self::mc_init(); $w_key = self::PREFIX.$key.'W'; $v_key = self::PREFIX.$key.self::set_counter($w_key, 1); return $mc->set( $v_key, $value ); } /** * 读取队列里的数据 * @param string $key * @param int $max 最多读取条数 * @return array */ static public function output( $key, $max=100 ){ $out = array(); $mc = self::mc_init(); $r_key = self::PREFIX.$key.'R'; $w_key = self::PREFIX.$key.'W'; $r_p = self::set_counter( $r_key, 0 );//读指针 $w_p = self::set_counter( $w_key, 0 );//写指针 if( $r_p == 0 ) $r_p = 1; while( $w_p >= $r_p ){ if( --$max < 0 ) break; $v_key = self::PREFIX.$key.$r_p; $r_p = self::set_counter( $r_key, 1 ); $out[] = $mc->get( $v_key ); $mc->delete($v_key); } return $out; } } /** 用法: QMC::input($key, $value );//写入队列 $list = QMC::output($key);//读取队列 */ ?>

 
 

基于PHP共享内存实现的消息队列:

<?php
/**
* 使用共享内存的PHP循环内存队列实现
* 支持多进程, 支持各种数据类型的存储
* 注: 完毕入队或出队操作,尽快使用unset(), 以释放临界区
*
* @author wangbinandi@gmail.com
* @created 2009-12-23
*/
class ShmQueue
{
private $maxQSize = 0; // 队列最大长度
private $front = 0; // 队头指针
private $rear = 0;  // 队尾指针
private $blockSize = 256;  // 块的大小(byte)
private $memSize = 25600;  // 最大共享内存(byte)
private $shmId = 0;
private $filePtr = './shmq.ptr';
private $semId = 0;
public function __construct()
{
$shmkey = ftok(__FILE__, 't');
$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );
$this->maxQSize = $this->memSize / $this->blockSize;
// 申?一个信号量
$this->semId = sem_get($shmkey, 1);
sem_acquire($this->semId); // 申请进入临界区
$this->init();
}
private function init()
{
if ( file_exists($this->filePtr) ){
$contents = file_get_contents($this->filePtr);
$data = explode( '|', $contents );
if ( isset($data[0]) && isset($data[1])){
$this->front = (int)$data[0];
$this->rear  = (int)$data[1];
}
}
}
public function getLength()
{
return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
}
public function enQueue( $value )
{
if ( $this->ptrInc($this->rear) == $this->front ){ // 队满
return false;
}
$data = $this->encode($value);
shmop_write($this->shmId, $data, $this->rear );
$this->rear = $this->ptrInc($this->rear);
return true;
}
public function deQueue()
{
if ( $this->front == $this->rear ){ // 队空
return false;
}
$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
$this->front = $this->ptrInc($this->front);
return $this->decode($value);
}
private function ptrInc( $ptr )
{
return ($ptr + $this->blockSize) % ($this->memSize);
}
private function encode( $value )
{
$data = serialize($value) . "__eof";
echo '';
echo strlen($data);
echo '';
echo $this->blockSize -1;
echo '';
if ( strlen($data) > $this->blockSize -1 ){
throw new Exception(strlen($data)." is overload block size!");
}
return $data;
}
private function decode( $value )
{
$data = explode("__eof", $value);
return unserialize($data[0]);
}
public function __destruct()
{
$data = $this->front . '|' . $this->rear;
file_put_contents($this->filePtr, $data);
sem_release($this->semId); // 出临界区, 释放信号量
}
}
/*
// 进队操作
$shmq = new ShmQueue();
$data = 'test data';
$shmq->enQueue($data);
unset($shmq);
// 出队操作
$shmq = new ShmQueue();
$data = $shmq->deQueue();
unset($shmq);
*/
?>
   
 

对于一个非常大的消息队列,频繁进行进行大数据库的序列化 和 反序列化。有太耗费。

以下是我用PHP 实现的一个消息队列。仅仅须要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。可是,这个消息队列不是线程安全的,我仅仅是尽量的避免了冲突的可能性。假设消息不是非常的密集,比方几秒钟才一个。还是能够考虑这样使用的。 
假设你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。以下是代码: 
代码例如以下:

class Memcache_Queue
{
private $memcache;
private $name;
private $prefix;
function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")
{
if ($memcache == null) {
throw new Exception("memcache object is null, new the object first.");
}
$this->memcache = $memcache;
$this->name = $name;
$this->prefix = $prefix;
$this->maxSize = $maxSize;
$this->front = 0;
$this->real = 0;
$this->size = 0;
}
function __get($name)
{
return $this->get($name);
}
function __set($name, $value)
{
$this->add($name, $value);
return $this;
}
function isEmpty()
{
return $this->size == 0;
}
function isFull()
{
return $this->size == $this->maxSize;
}
function enQueue($data)
{
if ($this->isFull()) {
throw new Exception("Queue is Full");
}
$this->increment("size");
$this->set($this->real, $data);
$this->set("real", ($this->real + 1) % $this->maxSize);
return $this;
}
function deQueue()
{
if ($this->isEmpty()) {
throw new Exception("Queue is Empty");
}
$this->decrement("size");
$this->delete($this->front);
$this->set("front", ($this->front + 1) % $this->maxSize);
return $this;
}
function getTop()
{
return $this->get($this->front);
}
function getAll()
{
return $this->getPage();
}
function getPage($offset = 0, $limit = 0)
{
if ($this->isEmpty() || $this->size < $offset) {
return null;
}
$keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);
$num = 1;
for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
{
$keys[] = $this->getKeyByPos($pos);
$num++;
if ($limit > 0 && $limit == $num) {
break;
}
}
return array_values($this->memcache->get($keys));
}
function makeEmpty()
{
$keys = $this->getAllKeys();
foreach ($keys as $value) {
$this->delete($value);
}
$this->delete("real");
$this->delete("front");
$this->delete("size");
$this->delete("maxSize");
}
private function getAllKeys()
{
if ($this->isEmpty())
{
return array();
}
$keys[] = $this->getKeyByPos($this->front);
for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
{
$keys[] = $this->getKeyByPos($pos);
}
return $keys;
}
private function add($pos, $data)
{
$this->memcache->add($this->getKeyByPos($pos), $data);
return $this;
}
private function increment($pos)
{
return $this->memcache->increment($this->getKeyByPos($pos));
}
private function decrement($pos)
{
$this->memcache->decrement($this->getKeyByPos($pos));
}
private function set($pos, $data)
{
$this->memcache->set($this->getKeyByPos($pos), $data);
return $this;
}
private function get($pos)
{
return $this->memcache->get($this->getKeyByPos($pos));
}
private function delete($pos)
{
return $this->memcache->delete($this->getKeyByPos($pos));
}
private function getKeyByPos($pos)
{
return $this->prefix . $this->name . $pos;
}
} 
   
 

版权声明:本文博主原创文章,博客,未经同意不得转载。

PHP memcache实现消息队列实例相关推荐

  1. PHP使用topthink/think-queue消息队列实例

    ​ 常住队列消费命令 sudo nohup php7.2 think queue:work --daemon --queue createAdminLogQueue --tries 2 > ou ...

  2. PHP下用Memcache 实现消息队列

    Memcache 一般用于缓存服务.但是很多时候,比如一个消息广播系统,需要一个消息队列.直接从数据库取消息,负载往往不行.如果将整个消息队列用一个key缓存到memcache里面, 对于一个很大的消 ...

  3. linux 消息队列实例

    前言: 消息队列就是一个消息的链表.可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向其中按照一定的规则添加新消息:对消息队列有读权限的进程则可以从消息队列中读走消 ...

  4. 消息队列服务器 轻量,PHP的轻量消息队列php-resque使用说明

    消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操 ...

  5. php redis消息队列用哪种好,phpredis提高消息队列的实时性方法(推荐)

    搜索热词 数据库存贮都用list形式 要存2个队列 1个用作消息队列保存到数据 还有个 就是用来实时读取数据在redis $redis->lpush($queenkey,json_encode( ...

  6. jmstemplate 获取队列id_学习Linux(38)消息队列

    消息队列.共享内存 和 信号量 被统称为 system-V IPC,V 是罗马数字 5,是 Unix 的AT&T 分支的其中一个版本,一般习惯称呼他们为 IPC 对象,这些对象的操作接口都比较 ...

  7. RabbitMQ 消息队列

    背景:python的Queue消息队列,只能python自己用. --线程threading Queue 只能在一个进程间交互数据 --进程Queue最多可以父进程和多个子进程进行交互 常见队列工具: ...

  8. Linux 环境进程间通信(三):消息队列

    本系列文章中的前两部分,我们探讨管道及信号两种通信机制,本文将深入第三部分,介绍系统 V 消息队列及其相应 API. 消息队列(也叫做报文队列)能够克服早期unix通信机制的一些缺点.作为早期unix ...

  9. php 内存队列,memcache构建简单的内存消息队列_PHP教程

    本文章来给各位同学介绍使用memcache构建简单的内存消息队列,用一个比较不错的实例来给大家介绍,希望此方法对大家有帮助哦. memcache功能太简单了,只能 set get 和delete, 只 ...

最新文章

  1. 企业Shell实战-MySQL分库分表备份脚本
  2. JavaScript中的常量:什么时候使用它,有必要吗?
  3. JavaScript 操作 COM 控件
  4. Zookeeper集群搭建分布式
  5. subclipse用法
  6. 腾讯吃鸡 android,腾讯吃鸡手游《光荣使命》正式上线:安卓/iOS不限号测试
  7. mysql 多项式_mysql主从复制原理及实现
  8. linux内核数据结构实现--链表、队列和哈希
  9. 记一次为公司搭建maven私服的过程
  10. Gin Web框架简单介绍
  11. 怎么用光驱给服务器装系统,如何用光驱重装系统?
  12. pp助手苹果版本_腾讯桌球安卓和苹果系统如何进28以及怎么解决没有金币时的烦恼...
  13. java毕业设计宠物店管理系统设计与实现源码+系统+数据库+lw文档+调试运行
  14. 二项分布期望和方差的公式推导
  15. js table 生成序号_JS自动为表格增加序号
  16. Linux 重定向和追加(、 指令)
  17. 第五十九章 CSP的常见问题 - 会话和许可证,为什么我要经常登录?
  18. 调取python背景减法库:MOG2和KNN,非常好用
  19. Python绘图记录专栏
  20. 芮瑶学编程05-画七彩虹

热门文章

  1. jQuery数组处理详解(含实例演示)
  2. 用.NET调用oracle的存储过程返回记录集
  3. Git提交到多个远程仓库(多看两个文档)
  4. Cocos Creator 为Button添加事件的两种方法
  5. Python命令行补全设置
  6. 一次性搞定Session
  7. 防止******ADSL的一些技巧
  8. 转义符,re模块,rangdom随机数模块,
  9. 笔记JavaScript基本概念
  10. percona xtrabackupd定期做全备,增量备份shell脚本