LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服

1. LiteOS MQTT组件

概述

MQTT AL用来解耦基于MQTT的业务和MQTT的具体实现,具体来说以后的MQTT业务层应该有且只能使用MQTT AL提供的相关功能(API 数据结构 流程等)。MQTT AL定义MQTT的标准,用来屏蔽各个MQTT协议实现的差异(如软件库 或者硬件),让上层业务无需关心MQTT的实现部分。

MQTT AL的api接口声明在中,使用相关的接口需要包含该头文件,关于函数的详细参数请参考该头文件的声明。

配置并连接

对接服务器的所有信息保存在结构体mqtt_al_conpara_t中,其定义在mqtt_al.h中,如下:

/** @brief defines the paramter for the mqtt connect */
typedef struct
{
	mqtt_al_string_t               serveraddr;   ///< mqtt server:support domain name and dot format
	int                            serverport;   ///< mqtt server port
	mqtt_al_security_para_t       *security;     ///< if NULL,will use en_mqtt_security_none
	en_mqtt_al_verison             version;      ///< mqtt version will be used
	mqtt_al_string_t               clientid;     ///< mqtt connect client identifier
	mqtt_al_string_t               user;         ///< mqtt connect user
	mqtt_al_string_t               passwd;       ///< mqtt connect passwd
	int                            cleansession; ///< 1 clean the session while 0 not
	mqtt_al_willmsg_t             *willmsg;      ///< mqtt connect will message
	unsigned short                 keepalivetime;///< keep alive time
	char                           conret;       ///< mqtt connect code, return by server
	int                            timeout;      ///< how much time will be blocked
}mqtt_al_conpara_t;

其中的一些参数值已经使用枚举给出:

  • security:安全连接参数(使用此需要确保mbedtls组件开启)

枚举值如下:

/** @brief  this enum all the transport encode we support now*/
typedef enum
{
	en_mqtt_al_security_none = 0,    ///< no encode
	en_mqtt_al_security_psk,         ///< use the psk mode in transport layer
	en_mqtt_al_security_cas,	     ///< use the ca mode in transport layer,only check the server
	en_mqtt_al_security_cacs,	     ///< use the ca mode in transport layer,both check the server and client
	en_mqtt_al_security_end,         ///< the end for the mqtt
}en_mqtt_al_security_t;
  • version:使用的MQTT协议版本

枚举值如下:

/** @brief enum the mqtt version*/
typedef enum
{
	en_mqtt_al_version_3_1_0 = 0,
	en_mqtt_al_version_3_1_1,
}en_mqtt_al_verison;

另外,在复制的时候还需要注意,很多字符串参数都是使用mqtt_al_string_t类型,其定义如下:

/** brief defines for all the ascii or data used in the mqtt engine */
typedef struct
{
	char *data;      ///< buffer to storage the data
	int   len;       ///< buffer data length
}mqtt_al_string_t;   //used to represent any type string (maybe not ascii)

在配置结构体完成之后,调用配置函数进行配置并连接,API如下:

/**
 *@brief: you could use this function to connect to the mqtt server
 *
 *@param[in] conparam  the parameter we will use in connect, refer to the data mqtt_al_conpara_t
 *@
 *@return: first you should check the return value then the return code in conparam
 *
 *@retval NULL which means you could not get the connect to the server,maybe network reason
 *@retval handle, which means you get the context, please check the conparam for more
 */
void * mqtt_al_connect( mqtt_al_conpara_t *conparam);

连接之后,首先应该检查返回的handle指针是否为空,其次应该检查mqtt_al_conpara_t结构体中conret的值,有以下枚举值:

/** @brief defines for the mqtt connect code returned by the server */
#define cn_mqtt_al_con_code_ok                0   ///< has been accepted by the server
#define cn_mqtt_al_con_code_err_version       1   ///< server not support the version
#define cn_mqtt_al_con_code_err_clientID      2   ///< client identifier is error
#define cn_mqtt_al_con_code_err_netrefuse     3   ///< server service not ready yet
#define cn_mqtt_al_con_code_err_u_p           4   ///< bad user name or password
#define cn_mqtt_al_con_code_err_auth          5   ///< the client is not authorized
#define cn_mqtt_al_con_code_err_unkown        -1  ///< unknown reason
#define cn_mqtt_al_con_code_err_network      0x80 ///< network reason,you could try once more

订阅消息

EMQ-X服务器有心跳机制,实际应用中订阅之前应该先检查连接状态,本实验中暂不检查。

连接成功后,首先订阅消息,设置回调函数,方便接收下发的命令。

订阅消息的API如下:

/**
 * @brief you could use this function subscribe a topic from the server
 *
 * @param[in] handle the handle we get from mqtt_al_connect
 *
 * @param[in] subpara  refer to the data mqtt_al_subpara_t
 *
 * @return 0 success  -1  failed
 *
 */
int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);

两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,subpara参数需要重点讲述。

mqtt_al_subpara_t的定义如下:

/** @brief defines the mqtt subscribe parameter*/
typedef struct
{
	mqtt_al_string_t       topic;     ///< topic will be subscribe
	en_mqtt_al_qos_t       qos;       ///< qos requested
	fn_mqtt_al_msg_dealer  dealer;    ///< message dealer:used to deal the received message
	void                  *arg;       ///< used for the message dealer
	char                   subret;    ///< subscribe result code
	int                    timeout;   ///< how much time will be blocked
}mqtt_al_subpara_t;

其中订阅消息质量qos的枚举值如下:

/** @brief enum all the qos supported for the application */
typedef enum
{
	en_mqtt_al_qos_0 = 0,     ///< mqtt QOS 0
	en_mqtt_al_qos_1,         ///< mqtt QOS 1
	en_mqtt_al_qos_2,         ///< mqtt QOS 2
	en_mqtt_al_qos_err
}en_mqtt_al_qos_t;

dealer是一个函数指针,接收到下发命令之后会被回调,arg是回调函数参数,其定义如下:

/** @brief  defines the mqtt received message dealer, called by mqtt engine*/
typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);

订阅之后,可以通过mqtt_al_subpara_t结构体中的subret值查看是否订阅成功。

发布消息

发布消息的API如下:

/**
 * @brief you could use this function to publish a message to the server
 *
 * @param[in] handle the handle we get from mqtt_al_connect
 *
 * @param[in] msg  the message we will publish, see the data mqtt_al_pubpara_t
 *
 * @return 0 success  -1  failed
 *
 */
int	mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);

两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,pubpara参数需要重点讲述。

mqtt_al_pubpara_t的定义如下:

/** @brief defines for the mqtt publish */
typedef struct
{
	mqtt_al_string_t    topic;    ///< selected publish topic
	mqtt_al_string_t    msg;      ///< message to be published
	en_mqtt_al_qos_t    qos;      ///< message qos
	int                 retain;   ///< message retain :1 retain while 0 not
	int                 timeout;  ///< how much time will blocked
}mqtt_al_pubpara_t;

MQTT组件自动初始化

MQTT在配置之后,会自动初始化。

在SDK目录中的IoT_LINK_1.0.0iot_linklink_main.c文件中可以看到:

图片描述

2. 配置准备

Makefile配置

因为本次实验用到的组件较多:

  • AT框架
  • ESP8266设备驱动
  • 串口驱动框架
  • cJSON组件
  • SAL组件
  • MQTT组件

这些实验代码全部编译下来,有350KB,而小熊派开发板所使用的主控芯片STM32L431RCT6的 Flash 仅有256KB,会导致编译器无法链接出可执行文件,所以要在makefile中修改优化选项,修改为-Os参数,即最大限度的优化代码尺寸,并去掉-g参数,即代码只能下载运行,无法调试,如图:

图片描述

ESP8266设备配置

在工程目录中的OS_CONFIG/iot_link_config.h文件中,配置ESP8266设备的波特率和设备名称:

图片描述

WIFI对接信息配置

SDK:C:UsersAdministrator.icodesdkIoT_LINK_1.0.0(其中Administrator是实验电脑的用户名)。

在SDK目录中的iot_linknetworktcpipesp8266_socketesp8266_socket_imp.c文件中,配置连接信息:

图片描述

之后修改同路径下的esp8266_socket_imp.mk文件,如图,将 TOP_DIR 改为 SDK_DIR :

图片描述

修改paho_mqtt文件路径

在SDK目录中的iot_linknetworkmqttpaho_mqttpaho_mqtt.mk文件中,如图,将 TOP_DIR 改为 SDK_DIR :

图片描述

3. 使用mqtt.fx对接EMQ-X

配置

对接信息配置如下:

图片描述

其中ClientID随机生成一个即可。

订阅主题

使用mqtt.fx连接客户端,订阅本次实验中的两个主题:

  • 主题led_cmd:用于发布控制命令
  • 主题lightness:用于上报亮度

图片描述

4. 上云实验

编写实验文件

在 Demo 文件夹下创建cloud_test_demo文件夹,在其中创建emqx_mqtt_demo.c文件。

编写代码:

#include 
#include 
#include 


#define DEFAULT_LIFETIME            60
#define DEFAULT_SERVER_IPV4         "122.51.89.94"
#define DEFAULT_SERVER_PORT         1883
#define CN_MQTT_EP_CLIENTID         "emqx-test-001"
#define CN_MQTT_EP_USERNAME         "mculover666"
#define CN_MQTT_EP_PASSWD           "123456789"
#define CN_MQTT_EP_SUB_TOPIC1       "led_cmd"
#define CN_MQTT_EP_PUB_TOPIC1       "lightness"

#define recv_buf_len 100
static char recv_buffer[recv_buf_len];   //下发数据接收缓冲区
static int  recv_datalen;                //表示接收数据长度

osal_semp_t recv_sync;  //命令接收回调函数和处理函数之间的信号量

char lightness_buf[10];

static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg)
{
    if((msg->msg.len) < recv_buf_len)
    {
        //保存数据
        memcpy(recv_buffer,msg->msg.data,msg->msg.len );
        recv_buffer[msg->msg.len] = '
请使用浏览器的分享功能分享到微信等