树莓派|云服务器-MQTT环境搭建

树莓派之 MQTT 搭建

每天出门总怀疑自己灯没关、空调没关,晚上睡觉的时候躺下才意识到灯没关、然而开关却理我特别远,这我可接受不了 2333 ,所以产生了这种刚需🤣,这里记录一下搭建MQTT环境的过程。

搭建的两种方法

  1. 树莓派作为 MQTT 服务器,用花生壳映射端口( 6 块钱)可以供给外网访问,这个方法的话最便宜的方案吧,不过外网的端口是不固定的,如果被改变了就得打开花生壳管理器查看分配的公网端口,嫌麻烦的话可以买固定的端口,得差不多 100 块这样子。不过这两个都是永久的加起来 100 多一点。当然如果不嫌麻烦,那就只要 6 元了。结构大概就是这样子,23333 画的不太好将就看。
    树莓派同时运行一个 MQTT 服务器和一个 MQTT 客户端。手机运行一个 MQTT 客户端,通过访问花生壳映射之后的域名和端口来访问内网的树莓派 MQTT 服务器进行通信。树莓派运行的 MQTT 客户端直接通过内网和服务器进行通信。
  2. 云主机作为 MQTT 服务器,这个方案的话因为我之前买了腾讯云的服务器 120 一年,续了三年。闲置着也是没有用,所以利用了起来。这个方法的话比较容易实现,不折腾那些个什么内网穿透,但是钱也会多很多,不过云服务器有公网 IP 还有以后可以有很多利用的地方。大概的关系就是如下图,比较容易理解的吧还是,树莓派运行的客户端通过网络和云主机上的服务器进行通信,手机上的MQTT客户端同样如此。

MQTT 的服务器我选择使用的是开源的 EMQTT ,官方提供了很多系统的安装包,很容易搭建的,管理也很方便。官网见 http://emqtt.com/.树莓派作为服务器的话安装使用需要采用源码的方式进行编译安装。

云主机作为MQTT服务器

之前搞优惠活动的时候买的腾讯云的服务器,服务器的配置是 2G 内存 1Mbps 带宽 40GB 硬盘,跑这个绰绰有余。具体的购买流程就不说了,傻瓜式的,我选择的是 centos 7 作为系统,因为据说这个系统做服务器多?这个后续后悔了还可以免费改的。
买了服务器之后要设置安全组的规则,我是把所有端口都打开了的(安全?不存在考虑的),最起码要把 22 端口开放,因为这个是 SSH 登录要用的端口。完成初步的配置就可以用了。搞到服务器的密码就可以用账户 root + 密码进行 SSH 访问了。之后的都是用 SSH 进行配置的。SSH 软件的使用可以看之前写的树莓派安装系统和配置下的->使用SSH进行树莓派的连接
接下来的操作都是在 centos 7系统的终端里运行的。

  1. 通过 rpm 安装包进行安装。(使用简单快捷)教程可以看这里.

    1
    2
    3
    4
    yum install wget //安装下载工具这个应该都是预置了的
    yum install lksctp-tools //安装Erlang/OTP R19依赖库
    wget http://emqtt.com/downloads/latest/centos7-rpm -O emqttd-centos7-x86_64.rpm //下载安装包
    rpm -ivh emqttd-centos7-x86_64.rpm //安装

    安装完成之后呢就可以运行了,是不是超级简单。通过下面的命令可以启动、停止、重启、查看状态整个 EMQTT 服务。这个方式安装,配置文件存放在/etc/emqttd/下的。

    1
    systemctl start|stop|restart|status emqttd.service
  2. 通用包方式安装。(绿色解压版)。教程可以看这里.绿色版嘛当然是要手动执行文件启动的啦。这个方式下的配置文件是存放在解压目录下的etc下,不在/etc/emqttd/下。

    1
    2
    3
    4
    5
    6
    wget http://emqtt.com/downloads/latest/centos7 -O emqttd-centos7.zip //下载压缩包
    unzip emqttd-centos7.zip //解压安装

    cd 解压目录/emqttd
    ./bin/emqttd console //运行控制台 ctrl+c退出
    ./bin/emqttd start|stop|restart //后台守护进程方式
  3. 访问服务器控制界面
    Dashboard 插件可查询 EMQ 消息服务器基本信息、统计数据、度量数据,查询系统客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)。EMQ 消息服务器默认加载 Dashboard 插件。
    安装完成之后访问在浏览器地址栏输入 服务器IP:18083/ 即可访问网页端的控制界面,如下图所示。会提示你输入登录账户和密码,默认的账户是admin 密码是 public。登录之后点击侧边 USER 栏,可以看到 admin 账户点击 Edit 即可更改密码。页面右上角可以切换主题配色,语言。

  4. MQTT 服务器的配置。
    无论通过何种方式安装都需要修改服务器的一些插件和配置文件才能为你所用。这里呢主要是配置客户端的登录服务器验证方式和客户端的ID等信息。
    其认证方式如下,客户端接入服务器呢,需要进行用户名密码的认证,客户端ID的认证,如果你两个都不开(ignore)的话就是匿名认证。用户名的认证需要打开 emq_auth_username 插件。客户端ID的认证需要打开 emq_auth_clientid 插件。服务器默认打开匿名认证,我选择打开用户名密码认证,客户端ID认证。

    1
    2
    3
    4
    5
    6
              ----------------           ----------------           ------------
    Client --> | Username认证 | -ignore-> | ClientID认证 | -ignore-> | 匿名认证 |
    ---------------- ---------------- ------------
    | | |
    \|/ \|/ \|/
    allow | deny allow | deny allow | deny
  • 添加配置列表-用户名/密码.
    既然使能了用户名密码验证,当然是需要添加用户名和密码的啦,配置文件为 emq_auth_username.conf,在各自存放配置文件的目录下的plugins目录下。软件安装版为/etc/emqttd/plugins/emq_auth_username.conf。解压绿色版为解压目录/emqttd/etc/plugins/emq_auth_username.conf。 使用下面命令打开文件(自行学习vim)。

    1
    2
    vim /etc/emqttd/plugins/emq_auth_username.conf   //软件安装版
    vim 解压目录/emqttd/etc/plugins/emq_auth_username.conf //解压绿色版

    打开文件可以看到给了示例的模板,按照模板增加即可。 贴一个示例如下。修改之后保存就好了。

    1
    2
    3
    4
    5
    6
    7
    ##--------------------------------------------------------------------
    ## Username Authentication Plugin
    ##--------------------------------------------------------------------
    auth.user.1.username = BeGild_test
    auth.user.1.password = 123456
    auth.user.2.username = BeGild_test_1
    auth.user.2.password = 123456
  • 添加配置列表-客户端ID
    既然使能了用客户端ID 验证,当然是需要添加客户端ID列表和密码的啦,配置文件为 emq_auth_clientid.conf ,在各自存放配置文件的目录下的plugins目录下。软件安装版为/etc/emqttd/plugins/emq_auth_clientid.conf 。解压绿色版为解压目录/emqttd/etc/plugins/emq_auth_clientid.conf 。 使用下面命令打开文件(自行学习vim)。

    1
    2
    vim /etc/emqttd/plugins/emq_auth_clientid.conf    //软件安装版
    vim 解压目录/emqttd/etc/plugins/emq_auth_clientid.conf //解压绿色版

    打开文件可以看到给了示例的模板,按照模板增加即可。 贴一个示例如下。修改之后保存就好了。看起来和用户名密码是差不多的,他俩之间的对应关系还不太清楚,之后搞懂了会再更新。

    1
    2
    3
    4
    5
    6
    7
    ##--------------------------------------------------------------------
    ## ClientId Authentication Plugin
    ##--------------------------------------------------------------------
    auth.client.1.clientid = test1
    auth.client.1.password = test1_pwd
    auth.client.2.clientid = test2
    auth.client.2.password = test2_pwd
  • 打开认证插件
    插件打开可以点击网页的控制台侧边插件栏页面,可以看到很多插件第1,9个就是,点击右侧边启动按钮进行打开,打开成功就会变成绿色的运行中
    启用插件,配置完成之后即可投入使用啦。EMQ2.0支持的认证插件有可以看这里这些.

树莓派作为 MQTT 服务器

树莓派作为 MQTT 服务器同样需要安装服务器的软件,不过只能通过源码编译安装的方式进行,编译源码依赖于 Erlang 工具,所以得走两步,安装编译工具,下载源码编译安装。

  1. 安装 erlang 编译环境.emmmm 这个工具编译完毕之后就可以卸磨杀驴!
    1
    2
    sudo apt-get install erlang //安装 erlang
    sudo apt-get autoremove erlang //卸载 erlang
  2. 挑一个自己喜欢的目录执行下面指令进行源码克隆和编译.
    1
    2
    3
    git clone https://github.com/emqtt/emq-relx.git
    cd emq-relx
    make
  3. 待编译成功后会生成_rel目录存放生成的一系列可执行文件和配置文件等,执行下面语句执行控制台方式启动.
    1
    2
    cd _rel/emqttd
    ./bin/emqttd console
    可支持的指令和组合有如下.
    1
    Usage: emqttd {start|start_boot <file>|foreground|stop|restart|reboot|pid|ping|console|console_clean|console_boot <file>|attach|remote_console|upgrade|escript|rpc|rpcterms|eval}
  4. 列一下常用指令正常的表现吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//这是控制台方式启动 EMQTT 会输出一些启动,运行的信息。
pi@BeGild_Raspi:~/Documents/MQtt/server/emq-relx/_rel/emqttd $ ./bin/emqttd console
Exec: /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/erts-8.2.1/bin/erlexec -boot /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/releases/2.3.11/emqttd -mode embedded -boot_var ERTS_LIB_DIR /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/erts-8.2.1/../lib -mnesia dir "/home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/data/mnesia/emq@127.0.0.1" -config /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/data/configs/app.2018.08.30.15.18.31.config -args_file /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/data/configs/vm.2018.08.30.15.18.31.args -vm_args /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd/data/configs/vm.2018.08.30.15.18.31.args -- console
Root: /home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd
/home/pi/Documents/MQtt/server/emq-relx/_rel/emqttd
Erlang/OTP 19 [erts-8.2.1] [source] [smp:4:4] [async-threads:32] [kernel-poll:true]


=INFO REPORT==== 30-Aug-2018::15:18:52 ===
alarm_handler: {set,{system_memory_high_watermark,[]}}
starting emqttd on node 'emq@127.0.0.1'
emqttd ctl is starting...[ok]
emqttd hook is starting...[ok]
emqttd router is starting...[ok]
emqttd pubsub is starting...[ok]
emqttd stats is starting...[ok]
emqttd metrics is starting...[ok]
emqttd pooler is starting...[ok]
emqttd trace is starting...[ok]
emqttd client manager is starting...[ok]
emqttd session manager is starting...[ok]
emqttd session supervisor is starting...[ok]
emqttd wsclient supervisor is starting...[ok]
emqttd broker is starting...[ok]
emqttd alarm is starting...[ok]
emqttd mod supervisor is starting...[ok]
emqttd bridge supervisor is starting...[ok]
emqttd access control is starting...[ok]
emqttd system monitor is starting...[ok]
emqttd 2.3.11 is running now
Eshell V8.2.1 (abort with ^G)
(emq@127.0.0.1)1> Load emq_mod_presence module successfully.
dashboard:http listen on 0.0.0.0:18083 with 4 acceptors.
mqtt:tcp listen on 127.0.0.1:11883 with 4 acceptors.
mqtt:tcp listen on 0.0.0.0:1883 with 16 acceptors.
mqtt:ws listen on 0.0.0.0:8083 with 4 acceptors.
mqtt:ssl listen on 0.0.0.0:8883 with 16 acceptors.
mqtt:wss listen on 0.0.0.0:8084 with 4 acceptors.
mqtt:api listen on 0.0.0.0:8080 with 4 acceptors.

(emq@127.0.0.1)1>
//这是后台守护进程方式启动 EMQTT 服务器 启动有点慢,稍等稍等。
pi@BeGild_Raspi:~/Documents/MQtt/server/emq-relx/_rel/emqttd $ ./bin/emqttd start
emqttd 2.3.11 is started successfully!
//这是停止当前启动了的 EMQTT 服务器
pi@BeGild_Raspi:~/Documents/MQtt/server/emq-relx/_rel/emqttd $ ./bin/emqttd stop
ok
//这是重启已经启动了的 EMQTT 服务器
pi@BeGild_Raspi:~/Documents/MQtt/server/emq-relx/_rel/emqttd $ ./bin/emqttd restart
ok

如果运行的时候出现下面错误,那你一定是运行的目录不对,一定要是 _rel/emqttd/目录下才是编译生成的目录,才可以运行 EMQTT 服务器!!!。

1
2
pi@BeGild_Raspi:~/Documents/MQtt/server/emq-relx/bin $ ./emqttd restart
./emqttd: 20: ./emqttd: runner_log_dir: not found
  1. 访问服务器控制界面。
    在浏览器输入树莓派IP:18083,输入账号 admin 和默认密码 public 进入管理控制界面,其余操作和以云主机作为 MQTT 服务器上一样,打开两个认证插件即可。
  2. MQTT 服务器的配置。
    配置文件的修改参照上面以云主机为 MQTT 服务器的配置例子,只是配置文件在_rel/emqttd/etc/下面。
  3. 花生壳外网访问的配置。参照树莓派-花生壳-内网穿透中的添加端口映射1883到外网。

安装 MQTT 客户端依赖库

安装好服务器可以使用了,就进入客户端代码的编写了呀,客户端在树莓派上运行一个,在手机上运行一个。

  1. 在树莓派上使用 MQTT 客户端当然是要安装 MQTT 客户端的库呀,首先运行下面几条命令做一些编译前的准备。

    1
    2
    3
    4
    5
    6
    cd /home/pi/Documents && mkdir emqtt //创建一个mqtt的目录
    cd emqtt && git clone https://github.com/eclipse/paho.mqtt.c.git //克隆mqtt客户端源码
    sudo apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui //安装编译工具
    sudo apt-get install fakeroot fakeroot devscripts dh-make lsb-release //安装软件包构建工具
    sudo apt-get install libssl-dev //安装openssl
    sudo apt-get install doxygen graphviz //安装文档生成工具
  2. 编译 MQTT 客户端源码。官方的教程看这里paho.mqtt.c

    1
    2
    3
    4
    cd paho.mqtt.c && make //转到源码目录进行编译
    /***********等待漫长的时间没有报错就OK了*********************/
    sudo make html //编译生成文档
    sudo make install //安装到系统

    备注:需要编译生成文档不然安装时会提示如下错误的。

    1
    2
    3
    4
    5
    6
    7
    8
    install -m 644 build/output/doc/MQTTClient/man/man3/MQTTClient.h.3 /usr/local/share/man/man3
    install: 无法获取'build/output/doc/MQTTClient/man/man3/MQTTClient.h.3' 的文件状态(stat): 没有那个文件或目录
    Makefile:273: recipe for target 'install' failed
    make: [install] Error 1 (ignored)
    install -m 644 build/output/doc/MQTTAsync/man/man3/MQTTAsync.h.3 /usr/local/share/man/man3
    install: 无法获取'build/output/doc/MQTTAsync/man/man3/MQTTAsync.h.3' 的文件状态(stat): 没有那个文件或目录
    Makefile:273: recipe for target 'install' failed
    make: [install] Error 1 (ignored)

    安装成功显示如下信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    pi@BeGild_Raspi:~/Documents/MQtt/paho.mqtt.c $ sudo make install
    mkdir -p build/output/samples
    mkdir -p build/output/test
    echo OSTYPE is Linux
    OSTYPE is Linux
    mkdir -p /usr/local/include
    install -m 644 build/output/libpaho-mqtt3c.so.1.0 /usr/local/lib
    install -m 644 build/output/libpaho-mqtt3cs.so.1.0 /usr/local/lib
    install -m 644 build/output/libpaho-mqtt3a.so.1.0 /usr/local/lib
    install -m 644 build/output/libpaho-mqtt3as.so.1.0 /usr/local/lib
    install build/output/paho_c_version /usr/local/bin
    install build/output/samples/paho_c_pub /usr/local/bin
    install build/output/samples/paho_c_sub /usr/local/bin
    install build/output/samples/paho_cs_pub /usr/local/bin
    install build/output/samples/paho_cs_sub /usr/local/bin
    /sbin/ldconfig /usr/local/lib
    ln -s libpaho-mqtt3c.so.1 /usr/local/lib/libpaho-mqtt3c.so
    ln -s libpaho-mqtt3cs.so.1 /usr/local/lib/libpaho-mqtt3cs.so
    ln -s libpaho-mqtt3a.so.1 /usr/local/lib/libpaho-mqtt3a.so
    ln -s libpaho-mqtt3as.so.1 /usr/local/lib/libpaho-mqtt3as.so
    install -m 644 src/MQTTAsync.h /usr/local/include
    install -m 644 src/MQTTClient.h /usr/local/include
    install -m 644 src/MQTTClientPersistence.h /usr/local/include
    install -m 644 src/MQTTProperties.h /usr/local/include
    install -m 644 src/MQTTReasonCodes.h /usr/local/include
    install -m 644 src/MQTTSubscribeOpts.h /usr/local/include
    install -m 644 doc/man/man1/paho_c_pub.1 /usr/local/share/man/man1
    install -m 644 doc/man/man1/paho_c_sub.1 /usr/local/share/man/man1
    install -m 644 doc/man/man1/paho_cs_pub.1 /usr/local/share/man/man1
    install -m 644 doc/man/man1/paho_cs_sub.1 /usr/local/share/man/man1
    install -m 644 build/output/doc/MQTTClient/man/man3/MQTTClient.h.3 /usr/local/share/man/man3
    install -m 644 build/output/doc/MQTTAsync/man/man3/MQTTAsync.h.3 /usr/local/share/man/man3

编写 MQTT 客户端代码

  1. 客户端代码。流程就是,根据服务器地址,用户名、密码、客户端ID登录到服务器,进行主题的订阅或者向某个主题进行消息的发布这样子。从网上抄了代码(只有ctrl+c ctrl+v才能够勉强维持生活这样子),魔改了一下贴上。这个程序在运行之后会创建一个发送线程TestSend,每隔 3s 发送一次信息,发送成功会通过回调函数MQTTClient_deliveryToken进行打印报告。收到信息会通过回调函数MsgArrived_CallBack进行信息的打印。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#include <unistd.h>
/*类型兼容***********************************************/
typedef unsigned long uint64_t;
typedef unsigned int uint32_t;
typedef unsigned short uint16_t;
typedef unsigned char uint8_t;
typedef unsigned long u64;
typedef unsigned int u32;
typedef unsigned short u16;
typedef unsigned char u8;
/***********************************************类型兼容*/

#define NUM_THREADS 2 //线程个数
#define ADDRESS "tcp://*.*.*.*:1883" //云主机作为mqtt服务器地址
//#define ADDRESS "tcp://localhost:1883" //树莓派作为mqtt服务器地址,直接通过内网访问。
#define CLIENTID "test_1" //客户端ID
#define TOPIC_PUBLISH "Hello_1" //发送的主题
#define TOPIC_SUBSCRIBE "Hello_2" //订阅的主题
#define PAYLOAD "Hello! My Name is "CLIENTID //发送的信息
#define QOS 1 //服务质量 <<0-最多发送一次 1-至少保证送达一次 2-保证只送达1次
#define TIMEOUT 10000L //超时时间
#define USERNAME "username" //登录用户名
#define PASSWORD "password" //登录用户名对应密码

volatile MQTTClient_deliveryToken gDeliveredToken;
/** void deliveryComplete_CallBack(void *context, MQTTClient_deliveryToken deliveryToken)
* @brief MQTT发送信息成功回调函数
* @note 客户端发送信息成功时将会调用该函数
* @param[in] context 上下文指针
* @param[in] deliveryToken 发送信息的Token值
* @date 2018/08/25 17:20
* @auth BeGild
* @email yucang_bao@126.com
*/
void deliveryComplete_CallBack(void *context, MQTTClient_deliveryToken deliveryToken)
{
printf("\rMessage with token %5d Send Success\n", deliveryToken);
gDeliveredToken = deliveryToken;
}
/** int MsgArrived_CallBack(void *context, char *topicName, int topicLen, MQTTClient_message *message)
* @brief 接收到MQTT数据的回调函数
* @param[in] context 上下文指针
* @param[in] topicName 本次接收到信息的主题
* @param[in] topicLen 本次接收到信息的主题长度
* @param[in] message 本次接收到数据的存储结构体
* 数据实体存放于 message->payload
* 数据长度为 message->payloadlen
* @date 2018/08/25 17:20
* @auth BeGild
* @email yucang_bao@126.com
* return 返回值
*/
int MsgArrived_CallBack(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
uint8_t *payloadptr;
printf("Recive Message\n");
printf(" topic: %s\n", topicName);
printf(" message: \n>>\n");
payloadptr = message->payload;
//打印接受到的数据
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
printf("\n<<\n");
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
/** void ConnectLost_CallBack(void *context, char *cause)
* @brief 连接丢失回调函数
* @note 可能原因:
* 1.被挤下线
* 2.网络异常
* @date 2018/08/25 16:18
* @auth BeGild
* @email yucang_bao@126.com
*/
void ConnectLost_CallBack(void *context, char *cause)
{
printf("\nConnection lost ");
printf("\t cause: %s\n", cause);
}
/** void PublishMessage(MQTTClient client,uint8_t* TopicName, uint8_t *DataBuf,uint32_t BufLen)
* @brief 推送信息
* @param[in] client 发送的客户端ID
* @param[in] TopicName 发送的主题
* @param[in] DataBuf 发送的数据缓冲区
* @param[in] BufLen 发送的数据长度
* @date 2018/08/25 17:49
* @auth BeGild
* @email yucang_bao@126.com
*/
void PublishMessage(MQTTClient client,uint8_t* TopicName, uint8_t *DataBuf,uint32_t BufLen)
{
MQTTClient_message Publishmsg = MQTTClient_message_initializer;
//声明消息token
MQTTClient_deliveryToken token;
Publishmsg.payload = DataBuf;
Publishmsg.payloadlen = BufLen;
Publishmsg.qos = QOS;
Publishmsg.retained = 0;
MQTTClient_publishMessage(client, TopicName, &Publishmsg, &token);//发送数据
// rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
// printf("Message with delivery token %d delivered\n", token);
}
/** void *TestSend(void *arg)
* @brief 测试发送
* @param[in] arg 传入参数指针
* @return 返回值
* @date 2018/08/25 18:58
* @auth BeGild
* @email yucang_bao@126.com
*/
void *TestSend(void *arg)
{
char MsgBuff [] = PAYLOAD;
MQTTClient *client = (MQTTClient *)arg;//客户端句柄
while(1)
{
PublishMessage(*client,TOPIC_PUBLISH,MsgBuff,strlen(MsgBuff));//发送信息
usleep(3000000L);//睡眠3s
}
pthread_exit(NULL);//退出线程
return NULL;
}
/**int main(int argc, char* argv[])
* @brief 主函数
* @param[in] argc参数个数
* @param[in] argv 参数列表
*/
int main(int argc, char* argv[])
{
pthread_t threads[NUM_THREADS];
char SubTopic[] = TOPIC_SUBSCRIBE;
char ClientID [] = CLIENTID;
long t;
MQTTClient client;//客户端句柄
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;//连接参数
int rc;
int ch;

MQTTClient_create(&client, ADDRESS, ClientID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);//根据服务器地址客户端ID创建一个可用连接
/*填写连接参数*/
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME;
conn_opts.password = PASSWORD;
/*设置回调函数*/
MQTTClient_setCallbacks(client, NULL,
ConnectLost_CallBack,/*连接丢失*/
MsgArrived_CallBack,/*收到信息*/
deliveryComplete_CallBack);/*信息成功发出*/
/*尝试连接服务器*/
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);//连接失败退出
}
printf("[-------------------------------");
printf(" Connect to EMQTT server complete");
printf("--------------------------------]\n\n");
//订阅主题
printf("Client [%s] ---- using QoS[%d] ---- Subscribing to topic [%s]\n\n", ClientID,QOS,SubTopic);
MQTTClient_subscribe(client, SubTopic, QOS);

//创建测试发送线程
pthread_create(&threads[0], NULL, TestSend,&client);

/*主循环loop*/
printf( "Press Q<Enter> to quit\n\n");
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
pthread_exit(&threads[0]);
MQTTClient_unsubscribe(client, SubTopic);//取消订阅
MQTTClient_disconnect(client, 10000);//断开连接
MQTTClient_destroy(&client);//销毁当前clien
pthread_exit(NULL);//退出主线程main
}

点击上述代码段右上角拷贝保存为mqtttest_1.c,执行一面这条语句编译生成可执行文件mqtttest_1

1
gcc -o mqtttest_1 mqtttest_1.c -lpaho-mqtt3c -lpthread

MQTT 客户端通信测试

MQTT 的测试的话就是看两个客户端之间是否可以互相通信嘛。这里采用的方式是树莓派运行上面的客户端程序作为客户端1,用一个测试工具作为客户端2。客户端1订阅 Hello_2 主题 向 Hello_1 主题发送消息,客户端2订阅主题 Hello_1 向主题 Hello_2 发送消息。

  1. 执行./mqtttest_1运行树莓上的客户端1,成功运行会打印大致如下的信息。
    1
    2
    3
    4
    5
    6
    7
    8
    pi@BeGild_Raspi:~/Documents/MQtt/mqtttest $ ./mqtttest_1
    [------------------------------- Connect to EMQTT server complete--------------------------------]

    Client [test_1] ---- using QoS[1] ---- Subscribing to topic [Hello_2]

    Press Q<Enter> to quit

    Message with token 2 Send Success
    如果连接失败返回错误码-1、4 检查
  • 是否运行了 EMQTT 服务器、服务器地址是否填写正确。
  • 服务器是否可达(ping 一下),如果是云主机查看端口(1883)是否打开允许访问。
  • 检查 EMQTT 服务器配置文件中是否添加了对应的usernamepassword 以及clientidpassword
  1. 运行 Mqtt 客户端测试工具 MQTT.fx 作为客户端2。
  • 点击http://www.jensd.de/apps/mqttfx/下载你想要的版本。

  • 点击安装程序。

  • 一个紫色的清秀小图标,打开 MQTT.fx 稍等片刻。

  • 点击主界面小齿轮按钮进行相关服务器登录配置。如果是树莓派作为MQTT服务器的话这里的服务器地址就替换为在花生壳里添加映射的壳域名和分配的端口号(不是1883)。

  • 点击小齿轮左边的连接列表,选择刚创建的连接。

  • 点击小齿轮右边的连接按钮进行连接服务器。连接成功后,下面的订阅和发布功能就都可以用了。

  1. 功能测试
  • MQTT.fx 点击切换到订阅(Subscribe)界面,在输入框输入需要订阅的主题,因为这里是作为客户端2当然是订阅Hello_1。订阅完成之后应该就可以在右下角的界面看到收到了客户端1每隔3s发过来的 Hello! My Name is test_1
  • MQTT.fx 切换到发布(Pubilsh)界面,在输入框输入需要推送消息的主题。因为树莓派运行客户端1订阅的是主题Hello_2所以这里输入Hello_2。在右侧选择消息通信的质量 Qos1 (保证至少一次送达),在下方输入框任意敲入字符。点击 Pubilsh 按钮进行发布。可以在树莓派端看到其接收到了来自Hello_2的消息。
  • 都能收发就测试成功了。

这里需要注意一下,如果订阅的主题就是发送的主题的话,自己也会收到自己发送的消息的。每个订阅了同一主题的客户端都能收到发布者的消息。

后记

我发现记录过程可比实现还难,这我都断断续续写了一个星期了,记录了MQTT服务器的搭建、插件启动、配置文件修改、MQTT 客户端在树莓派上的安装和使用、两个客户端之间的通信。这一系列工作完成基本的远程通信功能就通了。
后续再增加手机端 MQTT 客户端的开发(本来是打算用安卓 APP 的,后来想我做的 APP 界面惨不忍睹,已申请微信小程序明日开发)、通信协议制定、MCU 端的开发和硬件的制作,然后就可以远程控制了。我可真是个小机灵鬼。


树莓派|云服务器-MQTT环境搭建
http://example.com/2018/08/26/树莓派—云服务器-EMQTT-环境搭建/
作者
Ekko bao
发布于
2018年8月26日
许可协议