1.基本介绍

通知/协调机制通常有两种方式。

  • 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注冊了这个节点的Watcher的全部client。
  • 工作汇报模式:这个情况是每一个工作进程都在某个文件夹下创建一个暂时节点,并携带工作的进度数据。

    这样汇总的进程能够监控文件夹子节点的变化获得工作进度的实时的全局情况。

总的来说,利用Zookeeper的watcher注冊和异步通知功能,通知的发送者创建一个节点。并将通知的数据写入的该节点;通知的接受者则对该节点注冊watch,当节点变化时,就算作通知的到来。

场景实践

通过上面的说明,事实上实现还是很easy的。看下关键的几个地方:

  • g_monitor_child:变量等于0标识仅仅监控节点,等于1标识监控全部子节点。
  • show_notify(zh,g_path);:打印接受到的通知
  • show_list(zh,g_path);:打印全部子节点的进度

再来看监控函数:

void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
{  //监控节点数据变化if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 0){show_notify(zh,g_path);//监控子节点个数变化}else if(type == ZOO_CHILD_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);//监控子节点数据变化}else if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);}
}

以下是完整的代码:

#include<stdio.h>
#include<string.h>
#include<unistd.h>
#include"zookeeper.h"
#include"zookeeper_log.h"  char g_host[512]= "172.17.0.36:2181";
char g_path[512]= "/Notify";
int g_monitor_child = 0;//watch function when child list changed
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
void show_notify(zhandle_t *zkhandle,const char *path);
//show all process ip:pid
void show_list(zhandle_t *zkhandle,const char *path);void print_usage();
void get_option(int argc,const char* argv[]);/**********unitl*********************/
void print_usage()
{printf("Usage : [notify] [-h] [-c] [-p path][-s ip:port] \n");printf("        -h Show help\n");printf("        -p path\n");printf("        -c monitor the child nodes\n");printf("        -s zookeeper server ip:port\n");printf("For example:\n");printf("notify -s172.17.0.36:2181 -p /Notify\n");
}void get_option(int argc,const char* argv[])
{extern char    *optarg;int            optch;int            dem = 1;const char    optstring[] = "hcp:s:";while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 ){switch( optch ){case 'h':print_usage();exit(-1);case '?':print_usage();printf("unknown parameter: %c\n", optopt);exit(-1);case ':':print_usage();printf("need parameter: %c\n", optopt);exit(-1);case 'c':g_monitor_child = 1;break;case 's':strncpy(g_host,optarg,sizeof(g_host));break;case 'p':strncpy(g_path,optarg,sizeof(g_path));break;default:break;}}
}
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)
{  /*printf("watcher event\n");  printf("type: %d\n", type);  printf("state: %d\n", state);  printf("path: %s\n", path);  printf("watcherCtx: %s\n", (char *)watcherCtx);  */if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 0){show_notify(zh,g_path);}else if(type == ZOO_CHILD_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);}else if(type == ZOO_CHANGED_EVENT &&state == ZOO_CONNECTED_STATE &&g_monitor_child == 1){show_list(zh,g_path);}
}
void show_notify(zhandle_t *zkhandle,const char *path)
{char notify_buffer[1024]={0};int  notify_len = sizeof(notify_buffer);int ret = zoo_get(zkhandle,g_path,1,notify_buffer,¬ify_len,NULL);if(ret != ZOK){fprintf(stderr,"failed to get the data of path %s!\n",g_path);}else{printf("Notice:%s\n",notify_buffer);}
}
void show_list(zhandle_t *zkhandle,const char *path)
{struct String_vector children;int i = 0;int ret = zoo_get_children(zkhandle,path,1,&children);if(ret == ZOK){char child_path[512] ={0};char notify_buffer[1024] = {0};int notify_len = sizeof(notify_buffer);printf("--------------\n");for(i = 0; i < children.count; ++i){sprintf(child_path,"%s/%s",g_path,children.data[i]);ret = zoo_get(zkhandle,child_path,1,notify_buffer,¬ify_len,NULL);if(ret != ZOK){fprintf(stderr,"failed to get the data of path %s!\n",child_path);}else{printf("%s:%s\n",children.data[i],notify_buffer);}}}else{fprintf(stderr,"failed to get the children of path %s!\n",path);}for(i = 0; i < children.count; ++i){free(children.data[i]);children.data[i] = NULL;}
}int main(int argc, const char *argv[])
{  int timeout = 30000;  char path_buffer[512];  int bufferlen=sizeof(path_buffer);  zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其它信息  get_option(argc,argv);zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Notify Test", 0);  if (zkhandle ==NULL)  {  fprintf(stderr, "Error when connecting to zookeeper servers...\n");  exit(EXIT_FAILURE);  }  int ret = zoo_exists(zkhandle,g_path,0,NULL); if(ret != ZOK){ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),  &ZOO_OPEN_ACL_UNSAFE,0,  path_buffer,bufferlen);  if(ret != ZOK){fprintf(stderr,"failed to create the path %s!\n",g_path);}else{printf("create path %s successfully!\n",g_path);}}if(ret == ZOK && g_monitor_child == 0){show_notify(zkhandle,g_path);}else if(ret == ZOK && g_monitor_child == 1){show_list(zkhandle,g_path);}getchar();zookeeper_close(zkhandle); return 0;
}

程序由3个參数选项

  • -s:指定Zookeeper的server的ip:port
  • -p:指定要监控的路径,默觉得/Notify
  • -c:使用此项表示监控子节点列表

notify -s172.17.0.36:2181 -p /Notify
当你在client改动数据的时候。程序就能收到相应的通知了。

转载于:https://www.cnblogs.com/jzdwajue/p/6772590.html

Zookeeper场景实践:(5)分布式通知/协调相关推荐

  1. ZooKeeper场景实践:(6)集群监控和Master选举

    1. 集群机器监控 这通经常使用于那种对集群中机器状态,机器在线率有较高要求的场景,可以高速对集群中机器变化作出响应.这种场景中,往往有一个监控系统,实时检測集群机器是否存活. 利用ZooKeeper ...

  2. ZooKeeper分布式应用程序的分布式协调服务:概述,入门,发布版本

    ZooKeeper概述适用于客户端开发人员,管理员和贡献者的技术概述文档 概述 -ZooKeeper的鸟瞰图,包括设计概念和体系结构 入门 -教程风格的指南,供开发人员安装,运行和编程到ZooKeep ...

  3. 每秒上千订单场景下的分布式锁高并发优化实践!

    本文授权转自石杉的架构笔记 背景引入 首先,我们一起来看看这个问题的背景? 前段时间有个朋友在外面面试,然后有一天找我聊说:有一个国内不错的电商公司,面试官给他出了一个场景题: 假如下单时,用分布式锁 ...

  4. etcd分布式之分布式通知与协调

    分布式之分布式通知与协调:        通过etcd进行低耦合的心跳检测        通过etcd完成系统调度        通过etcd完成工作汇报

  5. Java架构-每秒上千订单场景下的分布式锁高并发优化实践!

    "上一篇文章我们聊了聊Redisson这个开源框架对Redis分布式锁的实现原理,如果有不了解的兄弟可以看一下:<拜托,面试请不要再问我Redis分布式锁实现原理>. 今天就给大 ...

  6. 金融业分布式数据库选型及HTAP场景实践

    作为数据基础设施的重要组成部分,数据库在其中扮演着重要的角色.近些年来,数据库整体发展也呈现出较之以往很大的不同.其一.是开源数据库受到更为广泛的关注,从多家机构的最新报告来看,开源数据库无论从产品数 ...

  7. 编程狂人|金融业分布式数据库选型及HTAP场景实践

    作为数据基础设施的重要组成部分,数据库在其中扮演着重要的角色.近些年来,数据库整体发展也呈现出较之以往很大的不同.其一.是开源数据库受到更为广泛的关注,从多家机构的最新报告来看,开源数据库无论从产品数 ...

  8. Zookeeper和Redis实现分布式锁,附我的可靠性分析

    作者:今天你敲代码了吗 链接:https://www.jianshu.com/p/b6953745e341 在分布式系统中,为保证同一时间只有一个客户端可以对共享资源进行操作,需要对共享资源加锁来实现 ...

  9. Zookeeper概念学习系列之分布式事务

    不多说,直接上干货! 初学者来说,肯定会有这么一个疑问.为什么会在zookeeper里牵扯到分布式事务? zookeeper到底是什么? zookeeper实际上是yahoo开发的,用于分布式中一致性 ...

最新文章

  1. json字段顺序读取 python_如何利用Python批量读取视频文件的时间长度?
  2. unicode字符串 转 中文
  3. java位运算符取反_java运算符
  4. php函数有哪三种,【后端开发】php函数可以分为哪三种
  5. Windows 系统如何查看本机的 IP 地址
  6. STM32——库函数开发小结
  7. TCP/ITX协议面试总结
  8. python-运算符-比较运算符
  9. python中的序列类型数据结构元素的切片_第四章 Python字符串以及(split,rsplit,replace,strip.....)...
  10. 静态方法中调用spring容器中的对象
  11. 八个小技巧教你做出舒服的MG动画
  12. Mysql查询去空格方法汇总
  13. 计算机辅助设计与制造实习周记,计算机辅助设计与制造专业毕业实习周记范文原创全套.pdf...
  14. 中兴交换机8912E配置
  15. Hdu 5064 Find Sequence 解题报告
  16. JAVA核心基础笔记(上)
  17. linux 批量监控软件,Linux/Unix/Windows批量管理监控服务器软件
  18. 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
  19. 大家好,我是浪啦啦啦啦啦!
  20. wincc c 语言改颜色,wincc常用c脚本小草设置

热门文章

  1. php中array怎么用,php array函数怎么用
  2. mysql my.cnf 官网_MySQL my.cnf 的配置
  3. kafka 出现Java heap space的解决方法
  4. C++ 11 创建和使用共享 weak_ptr
  5. 沙老师的作业系列——Crackme3
  6. java类结构图_java 集合类结构图
  7. LTE各场景下的密钥处理
  8. vlookup查找值不唯一时怎么办
  9. linux用户带密码迁移 LDAP
  10. Tomcat Server.xml 标签详解 .