概述

beanstalk是多年前使用过的一个分布式任务队列,通过C实现,十分高效。和Redis(默认)的事件驱动框架一样,都是通过异步的epoll来实现,所以,能够高效的处理大量请求。
但不知什么原因,作者几年前已经不再维护其代码了。但我发现国内还是有一些人在使用该软件,为了能够更好的理解其运行机制,几年前对其代码进行了一些研究。先把分析的几篇发出来。
另外,我对其代码进行了fork,若有问题可以一起探讨解决之道。

beanstalk数据结构概览

基本数据结构解析

socket结构

beanstalk 对底层socket做了一层封装,通过面向对象的思想把操作封装在一起。

struct Socket {int    fd;   //监听socket fdHandle f;void   *x;int    added;
};

Handle是一个函数指针,参数是void* 和一个整数(这里是以完成三次握手的socket的fd)

typedef void(*Handle)(void*, int rw);

tube结构

该结构定义一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。

struct tube {uint refs;char name[MAX_TUBE_NAME_LEN];    //队列名字,最大长度为201个字节Heap ready; //最小堆,保存job指针的地方。ready堆用来保存已经状态是Ready的jobHeap delay; //最小堆。delay堆用来保存延迟消费的jobstruct ms waiting;  /* set of conns */ //用来保存所有等待的Conn实体指针struct stats stat;uint using_ct; uint watching_ct;int64 pause;int64 deadline_at;struct job buried;
};

// 队列名的最大长度为201个字符?作者为什么要限制到201?

#define MAX_TUBE_NAME_LEN 201
  • 状态信息结构
struct stats {uint urgent_ct;uint waiting_ct;uint buried_ct;uint reserved_ct;uint pause_ct;uint64   total_delete_ct;uint64   total_jobs_ct;
};
  • ms结构定义

该结构是一个顶层结构,系统定义了一个struct ms tubes变量,用来管理所有的tube实体。
也就是说,beanstalkd把所有的tube都保存在items的指针数组中。

struct ms {size_t used, cap, last;void **items;ms_event_fn oninsert, onremove;
};
typedef void(*ms_event_fn)(ms a, void *item, size_t i);
  • Heap(堆)结构定义

该结构用来保存job,每个tube有两个

struct Heap {int     cap;int     len;void    **data;Less    less;    // 函数指针,用来操作dataRecord  rec;    // 函数指针,对data进行操作
};
typedef int(*Less)(void*, void*);
typedef void(*Record)(void*, int);

job结构

该结构定义一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。

struct job {Jobrec r; // persistent fields; these get written to the wal/* bookeeping fields; these are in-memory only */char pad[6];  // 该字段不保存任何数据,为内存对齐而使用的填充位tube tube;  //该job所属的tube的指针job prev, next; /* linked list of jobs */ // 每个job都被保存到一个双向链表中job ht_next; /* Next job in a hash table list */ //job还被添加到hash表中size_t heap_index; /* where is this job in its current heap */ //该job所在tube的堆的索引值File *file;job  fnext; job  fprev;void *reserver;int walresv;int walused;char body[]; // written separately to the wal
};
  • job描述结构
// if you modify this struct, you must increment Walver above
struct Jobrec {uint64 id;uint32 pri;int64  delay;int64  ttr;int32  body_size;int64  created_at;int64  deadline_at;uint32 reserve_ct;uint32 timeout_ct;uint32 release_ct;uint32 bury_ct;uint32 kick_ct;byte   state;
};
参数名 说明
deadline_at job的处理时间,初始值是ttr+now(),若是越大job处于ready状态的时间就越长,否则job的状态会转换成其他状态。
ttr job的过期时间
delay job的delay时间
pri job的优先级
id job的id
body_size job的body的大小
*_ct 这些参数job的统计数据
state job的状态
  • ms结构
struct ms {size_t used, cap, last;void **items;ms_event_fn oninsert, onremove;
};

Server结构

该结构保存服务器的一些配置信息。

struct Server {char *port;    // 服务的端口char *addr;    // 服务的绑定地址char *user;    // 服务的启动用户Wal    wal;Socket sock;   //服务的socket信息Heap   conns;
};

Wal结构

struct Wal {int    filesize;int    use;char   *dir;File   *head;File   *cur;File   *tail;int    nfile;int    next;int    resv;  // bytes reservedint    alive; // bytes in useint64  nmig;  // migrationsint64  nrec;  // records written everint    wantsync;int64  syncrate;int64  lastsync;int    nocomp; // disable binlog compaction?
};

File结构

该结构维护了打开的文件和job的关系。

struct File {File *next;uint refs;int  seq;int  iswopen; // is open for writingint  fd;int  free;int  resv;char *path;Wal  *w;struct job jlist; // jobs written in this file
};

连接结构Conn

struct Conn {Server *srv;  // 该Conn结构对应的Server实体指针Socket sock;char   state;char   type;  //连接的类型:消费者,还是生产者Conn   *next; // 下一个Conn的指针tube   use; //该连接使用的tube指针int64  tickat;      // time at which to do more workint    tickpos;     // position in srv->connsjob    soonest_job; // memoization of the soonest jobint    rw;          // currently want: 'r', 'w', or 'h'int    pending_timeout;char   halfclosed;char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminatedint  cmd_len;int  cmd_read;char *reply;int  reply_len;int  reply_sent;char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated// How many bytes of in_job->body have been read so far. If in_job is NULL// while in_job_read is nonzero, we are in bit bucket mode and// in_job_read's meaning is inverted -- then it counts the bytes that// remain to be thrown away.int in_job_read;job in_job;  // a job to be read from the clientjob out_job;int out_job_sent;struct ms  watch;struct job reserved_jobs; // 该连接保持的reserved的job链表头指针
};

总结

本文描述了beanstalk的基本数据结构,要理解系统的设计原理,首先要理解其数据结构的设计。

Beanstalk源码分析--数据结构设计相关推荐

  1. 【VUE】源码分析 - 数据劫持的基本原理

    tips:本系列博客的代码部分(示例等除外),均出自vue源码内容,版本为2.6.14.但是为了增加易读性,会对不相关内容做选择性省略.如果大家想了解完整的源码,建议自行从官方下载.https://g ...

  2. 风讯dotNETCMS源码分析—数据存取篇

    前几天突然对CMS感兴趣,就去下载了风讯dotNETCMS源码.当前版本是dotnetcms1.0 sp5免费版,风讯的官方主页上可以下载. 用Visual Studio 2008打开后,初步分析了它 ...

  3. android+小米文件管理器源码,[MediaStore]小米文件管理器android版源码分析——数据来源...

    打开小米的文件管理器,我们很快会看到如下图所示的界面: 其中,会把各种文件分类显示.并且显示出每种文件的个数. 这是怎么做到的呢?当然不是每次启动都查询sdcard和应用程序data目录文件啦,那样实 ...

  4. Nginx源码分析--数据对齐posix_memalign和memalign函数

    posix_memalign函数() /*  * 背景:  *      1)POSIX 1003.1d  *      2)POSIX 标明了通过malloc( ), calloc( ), 和 re ...

  5. galler3d的源码分析——数据来源

    我们这里主要讲本地sd卡的数据,pisaca看情况后续再作分析. 数据操作设计的类包括:CacheService,MediaFeed,LocalDataSource,DiskCache,MediaIt ...

  6. ConcurrentHashMap的源码分析-数据迁移阶段的实现分析

    通过分配好迁移的区间之后,开始对数据进行迁移.在看这段代码之前,先来了解一下原理 synchronized (f) {//对数组该节点位置加锁,开始处理数组该位置的迁移工作 if (tabAt(tab ...

  7. Memcached源码分析 - 内存存储机制Slabs(5)

    Memcached源码分析 - 网络模型(1) Memcached源码分析 - 命令解析(2) Memcached源码分析 - 数据存储(3) Memcached源码分析 - 增删改查操作(4) Me ...

  8. Influxdb源码分析-TSM Engine WAL

    前言 influxdb安装和使用 influxdb概念详解1 influxdb概念详解2 influxdb源码编译 influxdb启动分析 influxdb源码分析-meta部分 infludb源码 ...

  9. influxdb源码解析-数据写入细节

    前言 ~~  这是一个分析inlfuxdb源码的系列.在此之前,已经分析了数据的基本模型,以及写入流程.在上一章数据写入部分,我们分析的是数据写入的基本流程,怎么从一个http的请求解析数据,然后计算 ...

最新文章

  1. 2022-2028年中国石油钻井井下工具行业市场研究及前瞻分析报告
  2. java中Collections.sort排序详解
  3. JVM之JVM内存区域与内存分配(转载)
  4. SimpleDateFormat 格式图
  5. SAP Cloud for Customer里如何根据产品ID拿到其UUID
  6. vim编辑和命令模式、实践
  7. WTM系列视频教程:View和Taghelper
  8. javaweb简要介绍,虚拟路径,虚拟主机
  9. Go Concurrency Patterns: Pipelines and cancellation
  10. 大数据之-Hadoop完全分布式_完全分布式配置总结---大数据之hadoop工作笔记0040
  11. BeanUtils笔记
  12. pandas求协方差、相关系数、显著性检验
  13. 互联网公司面试流程面试技巧(附被无良HR欺骗的经历)
  14. vue项目中对于Scroll事件的节流优化
  15. 亲历医院蹩脚程序(项目)的糟糕
  16. (附源码)mysql+ssm学生选课系统 毕业设计 170920
  17. 6.10 通过屏幕截图功能快速插入网页图片 [原创Excel教程]
  18. PullScrollView详解(三)——PullScrollView实现
  19. 关于接口连续调用,查询数据库数据不一致的情况
  20. Python浪漫520表白代码

热门文章

  1. 统计机器翻译与神经机器翻译区别_如果每个人都献出一点爱,就会拥有一套超级牛的机器翻译系统...
  2. 【语音识别】基于功率谱和倍频法实现男女生声音识别含Matlab源码
  3. ArcGIS 10.1 如何连接数据库(转)
  4. 2013年数学建模国赛B题(碎纸片拼接复原的设计与实现)优秀论文.doc
  5. 算法竞赛:Online Judge介绍
  6. Java对接大华摄像头SDK
  7. matlab中scope是什么,关于simulink中scope参数设置的总结
  8. CSS中content属性的妙用
  9. 中本聪更新个人状态;中国区块链技术应用反诈骗中心正式成立
  10. 对如何分析项目的思考