简介

最近读到一本好书,书名是《通过 Elixir 和 Nerves 搭建气象站》, 书中介绍了如何引入 Elixir 作为构建嵌入式应用程序的工具。

通过使用 Nerves,我们可以在支持网络的设备上运行 Elixir 代码,并且与一些控制软件交互。

上面提到的书主要关注点在 Nerves,使用 HTTP 协议进行网络交互。 尽管在许多情况下这是一个合理的选择,但我想介绍另一个广泛用于生产物联网 (IoT) 设置的选择:MQTT。

MQTT 协议

MQTT 是一种专为物联网 (IoT) 设备通信而设计的消息传输协议。 它广泛应用于许多领域,例如银行、石油和天然气、制造业等。

MQTT 协议有很多优点,部分如下所示:

  • 它是一种轻量级的二进制协议,通常在 TCP/IP 协议之上运行。
  • 它专为网络不可靠的场景设计,是户外安装的理想选择。
  • 它遵循发布/订阅模式,简化客户端逻辑。

我们将在设置中演示 MQTT 的一些优势。

MQTT Broker

MQTT 的一个重要特征是它简化了客户端逻辑,这对于嵌入式设备至关重要。 这是通过发布/订阅模式实现的:在 MQTT 中,没有“服务器”的概念。 相反,所有参与实体都是连接到所谓 broker 的客户端。 客户端订阅主题向它们发布消息,broker 进行路由(以及许多其他事情)。

一个好的用于生产的 broker,如 EMQ X,通常不仅提供 MQTT 路由功能,还提供许多其他有趣的功能,例如

  • 其他类型的连接方法,如 WebSockets;
  • 不同的认证和授权模式;
  • 将数据流传输到数据库;
  • 基于消息特征的自定义路由规则。

传感器设置

为简单起见,我们的设备将由一个普通的 Mix 应用程序表示:它可以轻松转换为 Nerves 应用程序。

首先,我们创建一个 Mix 项目:

mix new --sup weather_sensor
cd weather_sensor

为了与 MQTT broker 交互,我们需要一个 MQTT 客户端。 我们采用 emqtt。 将其添加到 mix.exs 作为依赖项:

defp deps do[{:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}]
end

我们将把所有的“传感器”代码放到主模块 WeatherSensor 中,所以我们需要将它添加到应用程序管理器 lib/weather_sensor/application.ex 中:

defmodule WeatherSensor.Application do@moduledoc falseuse Application@impl truedef start(_type, _args) dochildren = [WeatherSensor]opts = [strategy: :one_for_one, name: WeatherSensor.Supervisor]Supervisor.start_link(children, opts)end
end

现在,让我们在 lib/weather_sensor.ex 中实现主模块:

defmodule WeatherSensor do@moduledoc falseuse GenServerdef start_link([]) doGenServer.start_link(__MODULE__, [])enddef init([]) dointerval = Application.get_env(:weather_sensor, :interval)emqtt_opts = Application.get_env(:weather_sensor, :emqtt)report_topic = "reports/#{emqtt_opts[:clientid]}/temperature"{:ok, pid} = :emqtt.start_link(emqtt_opts)st = %{interval: interval,timer: nil,report_topic: report_topic,pid: pid}{:ok, set_timer(st), {:continue, :start_emqtt}}enddef handle_continue(:start_emqtt, %{pid: pid} = st) do{:ok, _} = :emqtt.connect(pid)emqtt_opts = Application.get_env(:weather_sensor, :emqtt)clientid = emqtt_opts[:clientid]{:ok, _, _} = :emqtt.subscribe(pid, {"commands/#{clientid}/set_interval", 1}){:noreply, st}enddef handle_info(:tick, %{report_topic: topic, pid: pid} = st) doreport_temperature(pid, topic){:noreply, set_timer(st)}enddef handle_info({:publish, publish}, st) dohandle_publish(parse_topic(publish), publish, st)enddefp handle_publish(["commands", _, "set_interval"], %{payload: payload}, st) donew_st = %{st | interval: String.to_integer(payload)}{:noreply, set_timer(new_st)}enddefp handle_publish(_, _, st) do{:noreply, st}enddefp parse_topic(%{topic: topic}) doString.split(topic, "/", trim: true)enddefp set_timer(st) doif st.timer doProcess.cancel_timer(st.timer)endtimer = Process.send_after(self(), :tick, st.interval)%{st | timer: timer}enddefp report_temperature(pid, topic) dotemperature = 10.0 + 2.0 * :rand.normal()message = {System.system_time(:millisecond), temperature}payload = :erlang.term_to_binary(message):emqtt.publish(pid, topic, payload)end
end

并在 config/config.exs 中添加一些选项:

import Configconfig :weather_sensor, :emqtt,host: '127.0.0.1',port: 1883,clientid: "weather_sensor",clean_start: false,name: :emqttconfig :weather_sensor, :interval, 1000

让我们总结一下 WeatherSensor 中发生的事情:

  • 它实现了GenServer 行为。
  • 启动时,它有如下动作:
    • 打开一个 MQTT 连接;
    • 订阅 commands/weather_sensor/set_interval 主题以接收命令,将接收到的数据将通过 :emqtt 发送到进程,作为 {:publish, publish }消息。
    • 以预定义的时间间隔设置计时器。
  • 在定时器超时时,它发布 {Timestamp, Temperature} 元组到 reports/weather_sensor/temperature 主题。
  • 在收到来自 commands/weather_sensor/set_interval 主题的消息时,它会更新计时器间隔。

由于我们的应用程序不是真正的 Nerves 应用程序,它连接了 BMP280 之类的传感器,因此我们生成温度数据。

在这里我们已经可以看到相对于 HTTP 交互的一个优势:我们不仅可以发送数据,还可以实时接收一些命令。

我们需要一个 broker 来运行节点; 我们稍后会开始。

控制台设置

由于 MQTT 中没有“服务器”,因此我们的控制台也将是一个 MQTT 客户端。 但它会订阅reports/weather_sensor/temperature主题和发布命令到 commands/weather_sensor/set_interval。

对于控制台,我们将设置 Phoenix LiveView 应用程序。

创建过程如下:

mix phx.new --version
Phoenix installer v1.6.2
mix phx.new weather_dashboard --no-ecto --no-gettext --no-dashboard --live
cd weather_dashboard

向 mix.exs 添加依赖项

  defp deps do[...{:jason, "~> 1.2"},{:plug_cowboy, "~> 2.5"},{:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]},{:contex, github: "mindok/contex"} # We will need this for SVG charts]end

向 config/dev.exs 添加一些设置:

config :weather_dashboard, :emqtt,host: '127.0.0.1',port: 1883config :weather_dashboard, :sensor_id, "weather_sensor"# Period for chart
config :weather_dashboard, :timespan, 60

现在我们生成一个 LiveView 控制器:

mix phx.gen.live Measurements Temperature temperatures  --no-schema --no-context

这会生成很多文件,但并非都是必需的,我们需要的是一个带有图表的单页应用程序。

rm lib/weather_dashboard_web/live/temperature_live/form_component.*
rm lib/weather_dashboard_web/live/temperature_live/show.*
rm lib/weather_dashboard_web/live/live_helpers.ex

同时从lib/weather_dashboard_web.ex 中删除import WeatherDashboardWeb.LiveHelpers。

更新我们页面的模板(lib/weather_dashboard_web/live/temperature_live/index.html.heex):

<div><%= if @plot do %><%= @plot %><% end %>
</div><div><form phx-submit="set-interval"><label for="interval">Interval</label><input type="text" name="interval" value={@interval}/><input type="submit" value="Set interval"/></form>
</div>

我们有一个图表和输入控件,用于向此页面上的“设备”发送命令。

现在更新主要部分 LiveView 控制器(lib/weather_dashboard_web/live/temperature_live/index.ex):

defmodule WeatherDashboardWeb.TemperatureLive.Index douse WeatherDashboardWeb, :live_viewrequire Logger@impl truedef mount(_params, _session, socket) doreports = []emqtt_opts = Application.get_env(:weather_dashboard, :emqtt){:ok, pid} = :emqtt.start_link(emqtt_opts){:ok, _} = :emqtt.connect(pid)# Listen reports{:ok, _, _} = :emqtt.subscribe(pid, "reports/#"){:ok, assign(socket,reports: reports,pid: pid,plot: nil,interval: nil)}end@impl truedef handle_params(_params, _url, socket) do{:noreply, socket}end@impl truedef handle_event("set-interval", %{"interval" => interval_s}, socket) docase Integer.parse(interval_s) do{interval, ""} ->id = Application.get_env(:weather_dashboard, :sensor_id)# Send command to devicetopic = "commands/#{id}/set_interval":ok = :emqtt.publish(socket.assigns[:pid],topic,interval_s,retain: true){:noreply, assign(socket, interval: interval)}_ ->{:noreply, socket}endenddef handle_event(name, data, socket) doLogger.info("handle_event: #{inspect([name, data])}"){:noreply, socket}end@impl truedef handle_info({:publish, packet}, socket) dohandle_publish(parse_topic(packet), packet, socket)enddefp handle_publish(["reports", id, "temperature"], %{payload: payload}, socket) doif id == Application.get_env(:weather_dashboard, :sensor_id) doreport = :erlang.binary_to_term(payload){reports, plot} = update_reports(report, socket){:noreply, assign(socket, reports: reports, plot: plot)}else{:noreply, socket}endenddefp update_reports({ts, val}, socket) donew_report = {DateTime.from_unix!(ts, :millisecond), val}now = DateTime.utc_now()deadline = DateTime.add(DateTime.utc_now(), - 2 * Application.get_env(:weather_dashboard, :timespan), :second)reports =[new_report | socket.assigns[:reports]]|> Enum.filter(fn {dt, _} -> DateTime.compare(dt, deadline) == :gt end)|> Enum.sort(){reports, plot(reports, deadline, now)}enddefp parse_topic(%{topic: topic}) doString.split(topic, "/", trim: true)enddefp plot(reports, deadline, now) dox_scale =Contex.TimeScale.new()|> Contex.TimeScale.domain(deadline, now)|> Contex.TimeScale.interval_count(10)y_scale =Contex.ContinuousLinearScale.new()|> Contex.ContinuousLinearScale.domain(0, 30)options = [smoothed: false,custom_x_scale: x_scale,custom_y_scale: y_scale,custom_x_formatter: &x_formatter/1,axis_label_rotation: 45]reports|> Enum.map(fn {dt, val} -> [dt, val] end)|> Contex.Dataset.new()|> Contex.Plot.new(Contex.LinePlot, 600, 250, options)|> Contex.Plot.to_svg()enddefp x_formatter(datetime) dodatetime|> Calendar.strftime("%H:%M:%S")endend

特别说明如下:

  • 我们创建了一个 LiveView 处理程序来为我们的应用程序的主页提供服务。
  • 通常,Phoenix.PubSub 用于更新 LiveView 进程状态。 但是,我们做了一个特殊设置:由于 MQTT broker 已经提供了一个发布订阅模式,我们直接从 LiveView 进程连接到它。
  • 收到新的温度数据后,服务器更新温度图表。
  • 收到用户的表单更新后,我们会向命令主题发送更新的时间间隔。

最后,在lib/weather_dashboard_web/router.ex 中设置路由,以便我们的控制器能够处理根页面:

  scope "/", WeatherDashboardWeb dopipe_through :browserlive "/", TemperatureLive.Indexend

模块集成

现在,我们已准备设置并运行所有内容。

我们运行一个 MQTT broker。 由于我们不需要任何特定的设置,最简单的方法是使用 docker 运行代 broker。

docker run -d --name emqx -p 1883:1883 emqx/emqx:4.3.10

现在运行我们的“设备”:

cd weather_sensor
export BUILD_WITHOUT_QUIC=1
iex -S mix
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]....13:17:24.461 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/commands/weather_sensor/set_interval", %{nl: 0, qos: 1, rap: 0, rh: 0}}]}, :undefined}13:17:24.463 [debug] emqtt(weather_sensor): RECV Data: <<144, 3, 0, 2, 1>>13:17:25.427 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 179, 156, 178, 158, 125, 1, 70, 64, 38, 106, 91, 64, 234, 212, 185>>}13:17:26.428 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 156, 160, 178, 158, 125, 1, 70, 64, 39, 115, 221, 187, 144, 192, 31>>}
...

我们看到我们的传感器立即开始发送报告。

现在运行我们的控制台:

cd weather_dashboard
export BUILD_WITHOUT_QUIC=1
iex -S mix phx.server
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace][info] Running WeatherDashboardWeb.Endpoint with cowboy 2.9.0 at 127.0.0.1:4000 (http)
[info] Access WeatherDashboardWeb.Endpoint at http://localhost:4000
Interactive Elixir (1.12.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [watch] build finished, watching for changes...

让我们导航到 http://localhost:4000。

我们看到相应的 LiveView 进程挂载,连接到代理,并开始接收温度数据:

[info] GET /
[info] Sent 200 in 145ms
[info] CONNECTED TO Phoenix.LiveView.Socket in 129µsTransport: :websocketSerializer: Phoenix.Socket.V2.JSONSerializerParameters: %{"_csrf_token" => "cwoROxAwKFo7NEcSdgMwFlgaZ1AlBxUa6FIRhAbjHA6XORIF-EUiIRqU", "_mounts" => "0", "_track_static" => %{"0" => "http://localhost:4000/assets/app.css", "1" => "http://localhost:4000/assets/app.js"}, "vsn" => "2.0.0"}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 1, false, 0, false}, {:mqtt_packet_connect, "MQTT", 4, false, true, false, 0, false, 60, %{}, "emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130", :undefined, :undefined, :undefined, :undefined, :undefined}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<32, 2, 0, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/reports/#", %{nl: 0, qos: 0, rap: 0, rh: 0}}]}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<144, 3, 0, 2, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<48, 58, 0, 35, 47, 114, 101, 112, 111, 114, 116, 115, 47, 119, 101, 97, 116,104, 101, 114, 95, 115, 101, 110, 115, 111, 114, 47, 116, 101, 109, 112, 101,114, 97, 116, 117, 114, 101, 131, 104, 2, 110, 6, 0, 180, 251, 188, 158, 125,
...

此外,该页面立即开始更新:

如果我们更新间隔,我们看到设备节点立即收到命令并开始更频繁地更新:

现在我们演示一件重要的事情:让我们停止我们的“设备”节点,稍等片刻,然后重新启动它。 我们看到节点继续以更新的频率发送数据。

怎么会这样? 其实很简单,秘诀就在于我们发送到命令主题的命令消息的 retain 标志。

:ok = :emqtt.publish(socket.assigns[:pid],topic,interval_s,retain: true
)

当我们向主题发送带有 retain 标志的消息时,该消息也成为“默认”消息,并保留在 broker 上。该主题的每个订阅者都会在订阅时收到此消息。

对于可能经常离线且没有任何易于使用的本地存储来保持其状态的嵌入式设备,此功能非常重要。 这是在连接时正确配置它们的方法。

结论

这篇文章介绍了如下内容:

  • 展示了一种与嵌入式设备交互的流行方式——MQTT 协议;
  • 我们介绍了它在 Elixir 中的用法;
  • 我们还展示了 MQTT 的一些优势,例如发布订阅模式和消息保留。

即使在简单的设置中,我们也可能想要使用的强大功能是:

  • 将主题数据流传输到数据库中,这样我们可以显示连接历史,无需“手动”保存;
  • 使用 MQTT.js 通过 WebSockets 从前端直接连接到 broker。

所有代码都可以在 https://github.com/savonarola/mqtt-article 上查阅。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/mqtt-for-elixir

MQTT 在 Elixir 中的应用相关推荐

  1. 如何处理Elixir中的异常

    对于任何软件开发方法而言,异常处理都是一种不错的做法. 无论是用于基于测试的开发,敏捷的sprint还是仅包含一个很好的旧待办事项列表的黑客会议,我们都可以从确保我们的基础覆盖有健全的缺陷处理方法中受 ...

  2. MQTT在matlab中的应用

    MQTT在matlab中的应用 前言 一.准备MQTT工具箱 二.导入matlab中 1.导入目录 2.添加目录 三.代码测试 四.实验结果 前言 本文只介绍在matlab中如何使用mqtt协议进行通 ...

  3. [elixir! #0007] [译] 理解Elixir中的宏——part.5 重塑AST by Saša Jurić

    上一章我们提出了一个基本版的deftraceable宏,能让我们编写可跟踪的函数.宏的最终版本有一些剩余的问题,今天我们将解决其中的一个--参数模式匹配. 今天的练习表明我们必须仔细考虑宏可能接收到的 ...

  4. MQTT 固定报头 中 剩余长度字段的计算

    剩余长度 - 简介 位置:固定报头中,从第2个字节开始. 剩余长度等于可变报头的长度(10字节)加上有效载荷的长度. 剩余长度(Remaining Length)表示当前报文剩余部分的字节数,包括可变 ...

  5. apollo mqtt linux qt,MQTT第5版更新,以及如何应用到Qt MQTT模块中

    之前我曾写过在MQTT消息中的topic可能会对发布的数据量产生较大影响.从那之后,MQTT已经发布了第5版标准并且诞生了第一个实现.当然,Qt MQTT也随之跟进,这篇文章将介绍Qt for Aut ...

  6. html消息发送接收,在html页面中 如何应用mqtt协议发送/接收消息

    经过前面几篇文章的介绍,在很多场景下利用NodeMCU加持mqtt协议来控制几乎所有需要传感器监控的行业都能极大地简化物联的成本.在这样一个基础上,还能拓展出很多好玩的.实际运用的甚至能够作为商业化运 ...

  7. MQTT(3)---MQTT协议及其在物联网中的应用

    MQTT (Message Queuing Telemetry Transport,消息队列遥测传输) 是一种标准化的发布/订阅消息传输协议,设计于1999年,最初是为了在卫星之类的物体上使用.它是一 ...

  8. ESP32运行MicroPython通过MQTT上报温湿度到中移OneNET物联网平台(附源码)

    前言:MQTT是当下物联网用的比较多的协议,本篇聊一聊用esp32通过MQTT连接到中移OneNET物联网平台. OneNET平台创建产品和设备 1.​创建产品:开发者中心->全部产品-> ...

  9. 如何将 Elixir 模块风格应用在 JS 中

    原文:A Proposal: Elixir-Style Modules in JavaScript 作者:Will Ockelmann-Wagner 发表时间:13th August 2018 译者: ...

最新文章

  1. python打印万年历_python青苔计划(六)打印万年历
  2. 以色列研究人员实现利用计算机风扇噪音窃听
  3. Redhate5.4下Oracle 11g安装
  4. 在WPF中实现平滑滚动
  5. 034_jdbc-mysql-C3P0
  6. js生成批次号_js生成验证码
  7. sqlite字段是否存在_学习廖雪峰的JAVA教程---反射(访问字段)
  8. Visual Studio内存泄漏检测
  9. python中有那几种赋值_详解Python列表赋值复制深拷贝及5种浅拷贝
  10. LVS三种请求转发方式和八种调度算法简介
  11. 字节跳动将推出汽车云业务,计划2025年追赶腾讯
  12. 图解TCPIP-TCP IP
  13. python异步回调函数的实现
  14. 工程从进场到竣工 资料报验的一般程序
  15. 2-visio使用与卸载
  16. spnc币吧_CCIEC币-目前是CPUsolo独自开采。CPU服务器独自开采钱包上挖新币教程!...
  17. 在火狐浏览器里怎么看请求头
  18. Vue中v-model和checkbook的使用
  19. 「玩转Python」突破封锁继续爬取百万妹子图
  20. 诗歌《船》 ---白桦 勉励自己

热门文章

  1. python在屏幕上画画_想学画画?python满足你!
  2. xff头使用sqlmap注入(心血来潮的分享)
  3. ORACLE 数据、表误删恢复(转)
  4. 列表推导式详解:[i for i in L]
  5. 【网络流24题-2】太空飞行计划 网络流
  6. 朴素贝叶斯代码实现python
  7. 城市轨道交通高峰时段客流协调控制的强化学习方法
  8. (项目)在线教育平台(十二)
  9. Excel 特殊分组汇总示例
  10. 传奇如何读取服务器信息,传奇服务器修改之命令服务脚本详细使用方法介绍