emqttd集群设置管理

一、先来看EMQ的文档定义:http://emqtt.com/docs/v1/cluster.html

emqttd集群设置管理

假设部署两台服务器s1.emqtt.io, s2.emqtt.io上部署集群:

节点名 主机名(FQDN) IP地址

emqttd@s1.emqtt.io 或emqttd@192.168.0.10    s1.emqtt.io 192.168.0.10
emqttd@s2.emqtt.io 或emqttd@192.168.0.20   s2.emqtt.io 192.168.0.20

Warning

节点名格式: Name@Host, Host必须是IP地址或FQDN(主机名.域名)

emqttd@s1.emqtt.io节点设置

emqttd/etc/vm.args:

-name emqttd@s1.emqtt.io

-name emqttd@192.168.0.10

节点加入集群

启动两台节点后,emqttd@s2.emqtt.io上执行:

$ ./bin/emqttd_ctl cluster join emqttd@s1.emqtt.io

Join the cluster successfully.Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}]

或,emqttd@s1.emqtt.io上执行:

$ ./bin/emqttd_ctl cluster join emqttd@s2.emqtt.ioJoin the cluster successfully.
Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}]

  

任意节点上查询集群状态:

$ ./bin/emqttd_ctl cluster statusCluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}]

节点退出集群

节点退出集群,两种方式:

  • leave: 本节点退出集群
  • remove: 从集群删除其他节点

emqttd@s2.emqtt.io主动退出集群:

$ ./bin/emqttd_ctl cluster leave

或emqttd@s1.emqtt.io节点上,从集群删除emqttd@s2.emqtt.io节点:

$ ./bin/emqttd_ctl cluster remove emqttd@s2.emqtt.io

emqttd_ctl是怎么使用的?

-module(emqttd_cli).有定义要加载的命令

-export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,listeners/1, vm/1, mnesia/1, trace/1]).
load() ->Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],[emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].

加入集群后,子节点mnesia数据库怎么办?

mnesia数据库天然支持分布式集群。子节点加入之后就类似MySQL数据库主从备份一样,主节点和子节点mnesia会保持同步。来看源码:

-module(emqttd_mnesia).

%% @doc Join the mnesia cluster
-spec(join_cluster(node()) -> ok).
join_cluster(Node) when Node =/= node() ->%% Stop mnesia and delete schema firstensure_ok(ensure_stopped()),ensure_ok(delete_schema()),%% Start mnesia and cluster to nodeensure_ok(ensure_started()),ensure_ok(connect(Node)),ensure_ok(copy_schema(node())),%% Copy tablescopy_tables(),ensure_ok(wait_for(tables)).

子节点加入之后,会先删除自己的mnesia数据库和各个表,然后copy一份主节点的库,再copy各个表数据。

%% @doc Cluster with node.
-spec(connect(node()) -> ok | {error, any()}).
connect(Node) ->case mnesia:change_config(extra_db_nodes, [Node]) of{ok, [Node]} -> ok;{ok, []}     -> {error, {failed_to_connect_node, Node}};Error        -> Errorend.%% @doc Copy schema.
copy_schema(Node) ->case mnesia:change_table_copy_type(schema, Node, disc_copies) of{atomic, ok} ->ok;{aborted, {already_exists, schema, Node, disc_copies}} ->ok;{aborted, Error} ->{error, Error}end.%% @doc Copy mnesia tables.
copy_tables() ->emqttd_boot:apply_module_attributes(copy_mnesia).

函数copy_tables(),会检索和执行emq工程目录下所有erl模块里面的mnesia(copy)函数,模块要求含有"-copy_mnesia({mnesia, [copy]})."关键字

例如:

-module(emqttd_backend).

mnesia(copy) ->ok = emqttd_mnesia:copy_table(retained_message),ok = emqttd_mnesia:copy_table(backend_subscription).

-module(emqttd_router).

-copy_mnesia({mnesia, [copy]}).mnesia(copy) ->ok = emqttd_mnesia:copy_table(route, ram_copies).

EMQ工程目录下,有关键字-boot_mnesia({mnesia, [boot]}).和-copy_mnesia({mnesia, [copy]}).的模块是:

-module(emqttd_backend).
-module(emqttd_pubsub).
-module(emqttd_router).
-module(emqttd_server).
-module(emqttd_sm).
-module(emqttd_trie).

其中,emqttd_backend模块新建的数据库retained_message和backend_subscription是disc_copies类型的,其他模块是ram_copies类型的。

emqttd的mnesia初始化

1、-module(emqttd_app).

start(_StartType, _StartArgs) ->
print_banner(),
emqttd_mnesia:start(),

  

2、-module(emqttd_mnesia)

start() ->
ensure_ok(ensure_data_dir()),
ensure_ok(init_schema()),
ok = mnesia:start(),
init_tables(),
wait_for(tables).%% @doc Init mnesia schema or tables.
init_schema() ->
case mnesia:system_info(extra_db_nodes) of
[] -> mnesia:create_schema([node()]);
[_|_] -> ok
end.%% @private
%% @doc Init mnesia tables.
init_tables() ->
case mnesia:system_info(extra_db_nodes) of
[] -> create_tables();
[_|_] -> copy_tables()
end.

3、数据库启动和拷贝的例子-module(emqttd_backend).

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).%% Mnesia callbacks
%%--------------------------------------------------------------------mnesia(boot) ->
ok = emqttd_mnesia:create_table(retained_message, [
{type, ordered_set},
{disc_copies, [node()]},
{record_name, retained_message},
{attributes, record_info(fields, retained_message)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 1000}]}]}]),
ok = emqttd_mnesia:create_table(backend_subscription, [
{type, bag},
{disc_copies, [node()]},
{record_name, mqtt_subscription},
{attributes, record_info(fields, mqtt_subscription)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 5000}]}]}]);mnesia(copy) ->
ok = emqttd_mnesia:copy_table(retained_message),
ok = emqttd_mnesia:copy_table(backend_subscription).
4、数据库启动和拷贝的例子-module(emqttd_router).-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).mnesia(boot) ->
ok = emqttd_mnesia:create_table(route, [
{type, bag},
{ram_copies, [node()]},
{record_name, mqtt_route},
{attributes, record_info(fields, mqtt_route)}]);mnesia(copy) ->
ok = emqttd_mnesia:copy_table(route, ram_copies).

4、数据库启动和拷贝的例子-module(emqttd_router).

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).mnesia(boot) ->ok = emqttd_mnesia:create_table(route, [{type, bag},{ram_copies, [node()]},{record_name, mqtt_route},{attributes, record_info(fields, mqtt_route)}]);mnesia(copy) ->ok = emqttd_mnesia:copy_table(route, ram_copies).

注意事项

1、如果EMQ所在服务器的IP地址是192.168.0.10

那么节点名称A:emqttd@192.168.0.10和节点名称B:emqttd@127.0.0.1是相同的意思,如果EMQ以节点A启动服务器,那么再以节点B启动是会失败的。

此时只能把A或B其中一个更名一下。即节点名格式: Name@Host里面的Name要加以区分。

2、集群的信息会记录在工程目录下,/rel/emqttd/data/mnesia/emqttd@172.16.6.161/schema.DAT

即,当子节点A连接了主节点B,集群信息会分别记录在schema.DAT。如果子节点A没有主动断开集群,下次重启时,仍然会主动连接主节点B。

★有几个遗留问题待确认,不知道EMQ V2版本有无修正:

问题(1)如果子节点A没有主动断开集群,下次重启时,如果B不存在,那么A就会启动失败!好可怕!

问题(2)A连接上B之后。A目录下的文件/rel/emqttd/data/mnesia/emqttd@172.16.6.161/retained_message.DCD和backend_subscription.DCD就自我删除了。以后也见不着了,彻底消失了,奇怪!请注意,这两个Mnesia数据库表类型是持久化,disc_copies。

★2018/05/17实测emq2.3.7,A主B从,结论如下:

(1)B join A之后,会主动删除B的Mnesia表,然后从A拷贝一份过来。B leave A之后,也会删除B的Mnesia表。

(2)B join A之后,A和B的Mnesia表会始终保持一致性。

添加或删除或更新A的表数据,B会同步。

添加或删除或更新B的表数据,A会同步。

(3)集群的信息会记录在工程目录下,/rel/emqttd/data/mnesia/emqttd@172.16.6.161/schema.DAT。A或B进程退出后,再次启动时,仍然保持集群状态。

3、常用命令

./emqttd console
./emqttd start
./emqttd stop
./emqttd_ctl cluster join emqttd@172.16.6.161
./emqttd_ctl cluster status

./emqttd_ctl cluster leave

werl -name firecat@127.0.0.1 -setcookie emqsecretcookie
observer:start().
./_rel/emqttd/bin/emqttd console
./_rel/emqttd/bin/emqttd start
./_rel/emqttd/bin/emqttd_ctl status
./_rel/emqttd/bin/emqttd stop
./_rel/emqttd/bin/emqttd_ctl cluster join emq@192.168.0.116
./_rel/emqttd/bin/emqttd_ctl cluster status
./_rel/emqttd/bin/emqttd_ctl cluster leave

EMQ学习 ---集群相关推荐

  1. 深度学习-深度学习集群管理方案

    相比之前如火如荼的大数据作业和负载以及集群硬件情况.深度学习平台的作业和硬件环境有了一些新的不同和趋势: 作业: 相比大数据作业,工作流workflow相比之前的大数据workflow来看相对简化,而 ...

  2. emq 重启_EMQ集群

    Erlang/OTP 语言平台的分布式程序,由分布互联的 Erlang 运行系统组成,每个 Erlang 运行系统被称为节点(Node),节点(Node) 间通过 TCP 互联,消息传递的方式通信: ...

  3. KubeFATE 部署多集群联邦学习平台 FATE

    题图摄于北京G6高速 (本文作者系VMware中国研发中心云原生实验室工程师) 相关文章: VMware招聘多名云原生开发工程师(北京) 用KubeFATE在Kubernetes上部署联邦学习集群 用 ...

  4. Redis创建高可用集群教程【Windows环境】

    模仿的过程中,加入自己的思考和理解,也会有进步和收获. 在这个互联网时代,在高并发和高流量可能随时爆发的情况下,单机版的系统或者单机版的应用已经无法生存,越来越多的应用开始支持集群,支持分布式部署了. ...

  5. Redis初学17:集群

    集群 简介 Redis 集群实现了对Redis的水平扩容,即启动 N 个 redis 节点,将整个数据库分布存储在这 N 个节点中,每个节点存储总数据的 1/N . Redis 集群通过分区(part ...

  6. mqtt 负载均衡_EMQ百万级MQTT消息服务(分布式集群)

    在强大的单机也比不上集群,EMQ的集群模式很粗暴,只需要把EMQ服务关联在一起然后负载均衡就可以达到集群的效果,这样就算面对1000CK问题也迎刃而解 附上: 喵了个咪的博客:w-blog.cnEMQ ...

  7. 如何部署一个Kubernetes集群

    来源 | 无敌码农 责编 | 寇雪芹 头图 | 下载于视觉中国 在上一篇文章<Kubernetes和Docker的关系是什么?>中,和大家分享了关于Kubernetes的基本系统架构以及关 ...

  8. redis mysql 集群_Redis(五)、Redis数据库集群相关

    Redis数据库集群 第1章 集群简介 Redis 集群是一个分布式(distributed).容错(fault-tolerant)的 Redis 实现, 集群可以使用的功能是普通单机 Redis 所 ...

  9. Redis基础与高可用集群架构进阶详解

    一.NoSQL简介 1.问题引入 每年到了过年期间,大家都会自觉自发的组织一场活动,叫做春运!以前我们买票都是到火车站排队,后来呢,有了 12306,有了它以后就更方便了,我们可以在网上买票,但是带来 ...

最新文章

  1. 1.一些 贪心算法 的简单思维题:
  2. *:教育产品 规范销售
  3. 设计模式C++实现 —— 外观模式、组合模式
  4. android 自动 键盘,关于Android中的软键盘
  5. Docker 深入理解概念
  6. python程序-调试Python程序代码的几种方法总结
  7. Desktop Video for Mac(桌面视频)最新版
  8. Java-多线程第一篇多线程相关认识(1)
  9. c++ 2D小球 碰撞模拟
  10. 福利大放送:空间统计插值大数据PPT
  11. 关于tomcat报错500的问题记录(classnotfoundexception)
  12. 在c语言中是闰年的条件为,C语言如何判断是闰年,闰年判断条件?
  13. DXF文件格式——DXF 格式
  14. [02-27][08奥斯卡四项大奖][老无所依][DVD中英双字][已测][17:21]
  15. java 批量文件编码转换 GBK to UTF-8(swing界面)
  16. 面试数据库知识点总结
  17. 不要浮躁,获得充实感
  18. 20230304 CF855 div3 vp
  19. sqlplus 中的spool命令
  20. OPENCV实现色带检测

热门文章

  1. [MOSS开发]:WSS v3授权
  2. 微服务限流Sentinel讲解(二)
  3. ntpdate[27350]: no server suitable for synchronization found
  4. Flink1.4.0连接Kafka0.10.2时遇到的问题
  5. 使用Axis2创建一个简单的WebService服务
  6. 邓西百度网盘多帐号文件一键搜索工具
  7. Hibernate的DetachedCriteria使用(含Criteria)
  8. 使用 Ant 自动生成项目构建版本
  9. C#调用浏览器的原理及实现浅析
  10. VC10中的C++0x特性简介