通过paho-mqtt软件包入门rt-thread的sal

一、paho-mqtt软件包程序流程
1.1 paho_mqtt_start
在rt_wlan_register_event_handler函数注册好rt_wlan_evt_ready的回调函数paho_mqtt_start,当wifi准备好后调用mq_start启动mqtt。在mq_start中,初始化mqttclient结构体,设置mqtt连接的参数:mqtt的uri、mqtt的用户名(username)和密码(password)、mqtt发布和订阅的主题topic、消息质量等级qos,最后调用paho_mqtt_start创建处理mqtt的线程paho_mqtt_thread。
static void mq_start(void)
{
/* init condata param by using mqttpacket_connectdata_initializer /
mqttpacket_connectdata condata = mqttpacket_connectdata_initializer;
static char cid[20] = { 0 };
static int is_started = 0;
if (is_started)
{
return;
}
/ config mqtt context param /
{
client.isconnected = 0;
client.uri = mqtt_uri;
/ generate the random client id /
rt_snprintf(cid, sizeof(cid), rtthread%d, rt_tick_get());
/ config connect param /
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientid.cstring = cid;
client.condata.keepaliveinterval = 60;
client.condata.cleansession = 1;
client.condata.username.cstring = mqtt_username;
client.condata.password.cstring = mqtt_password;
/ config mqtt will param. /
client.condata.willflag = 1;
client.condata.will.qos = 1;
client.condata.will.retained = 0;
client.condata.will.topicname.cstring = mqtt_pubtopic;
client.condata.will.message.cstring = mqtt_willmsg;
/ malloc buffer. /
client.buf_size = client.readbuf_size = 1024;
client.buf = malloc(client.buf_size);
client.readbuf = malloc(client.readbuf_size);
if (!(client.buf && client.readbuf))
{
log_e(no memory for mqtt client buffer!);
goto _exit;
}
/ set event callback function /
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/ set subscribe table and event callback /
client.messagehandlers[0].topicfilter = mqtt_subtopic;
client.messagehandlers[0].callback = mqtt_sub_callback;
client.messagehandlers[0].qos = qos1;
/ set default subscribe event callback /
client.defaultmessagehandler = mqtt_sub_default_callback;
}
/ run mqtt client /
paho_mqtt_start(&client);
is_started = 1;
_exit:
return;
}
rt_wlan_register_event_handler(rt_wlan_evt_ready, (void ( )(int, struct rt_wlan_buff *, void *))mq_start, rt_null);
1.2 paho_mqtt_thread
在paho_mqtt_thread中调用paho-mqtt提供的接口和rt-thread的sal的接口完成与mqtt服务器的交互,包括以下几个方面:与服务器的连接、订阅主题、向服务器发送心跳包、处理服务器发送下来的消息(connack、puback、suback、publish、pubrec、pubcomp、pingresp)、回环服务器通过topic发送下来的消息。
static void paho_mqtt_thread(void *param)
{
mqttclient *c = (mqttclient )param;
int i, rc, len;
int rc_t = 0;
c->pub_sock = socket(af_inet, sock_dgram, 0);
if (c->pub_sock == -1)
{
debug_printf(create pub_sock error!n);
goto _mqtt_exit;
}
/ bind publish socket. */
{
struct sockaddr_in pub_server_addr;
c->pub_port = pub_port;
pub_port ++;
pub_server_addr.sin_family = af_inet;
pub_server_addr.sin_port = htons((c->pub_port));
pub_server_addr.sin_addr.s_addr = inaddr_any;
memset(&(pub_server_addr.sin_zero), 0, sizeof(pub_server_addr.sin_zero));
rc = bind(c->pub_sock, (struct sockaddr *)&pub_server_addr, sizeof(struct sockaddr));
if (rc == -1)
{
debug_printf(pub_sock bind error!n);
goto _mqtt_exit;
}
}
_mqtt_start:
if (c->connect_callback)
{
c->connect_callback(c);
}
rc = net_connect(c);
if (rc != 0)
{
goto _mqtt_restart;
}
rc = mqttconnect(c);
if (rc != 0)
{
goto _mqtt_restart;
}
for (i = 0; i messagehandlers[i].topicfilter;
if(topic == rt_null)
continue;
rc = mqttsubscribe(c, topic, qos2);
debug_printf(subscribe #%d %s %s!n, i, topic, (rc online_callback)
{
c->online_callback(c);
}
c->tick_ping = rt_tick_get();
while (1)
{
int res;
rt_tick_t tick_now;
fd_set readset;
struct timeval timeout;
tick_now = rt_tick_get();
if (((tick_now - c->tick_ping) / rt_tick_per_second) > (c->keepaliveinterval - 5))
{
timeout.tv_sec = 1;
//debug_printf(tick close to ping.n);
}
else
{
timeout.tv_sec = c->keepaliveinterval - 10 - (tick_now - c->tick_ping) / rt_tick_per_second;
//debug_printf(timeount for ping: %dn, timeout.tv_sec);
}
timeout.tv_usec = 0;
fd_zero(&readset);
fd_set(c->sock, &readset);
fd_set(c->pub_sock, &readset);
/ int select(maxfdp1, readset, writeset, exceptset, timeout); /
res = select(((c->pub_sock > c->sock) ? c->pub_sock : c->sock) + 1,
&readset, rt_null, rt_null, &timeout);
if (res == 0)
{
len = mqttserialize_pingreq(c->buf, c->buf_size);
rc = sendpacket(c, len);
if (rc != 0)
{
debug_printf([%d] send ping rc: %d n, rt_tick_get(), rc);
goto _mqtt_disconnect;
}
/ wait ping response. /
timeout.tv_sec = 5;
timeout.tv_usec = 0;
fd_zero(&readset);
fd_set(c->sock, &readset);
res = select(c->sock + 1, &readset, rt_null, rt_null, &timeout);
if (res pub_sock, c->readbuf, c->readbuf_size, msg_dontwait,
(struct sockaddr *)&pub_client_addr, &addr_len);
if (pub_client_addr.sin_addr.s_addr != *((uint32_t )(&netif_default->ip_addr)))
{
#if 1
char client_ip_str[16]; / ###.###.###.### */
strcpy(client_ip_str,
inet_ntoa(*((struct in_addr *) & (pub_client_addr.sin_addr))));
debug_printf(pub_sock recvfrom len: %s, skip!n, client_ip_str);
#endif
continue;
}
if (len readbuf[len] = '�';
debug_printf(pub_sock recv %d byte: %sn, len, c->readbuf);
if (strcmp((const char *)c->readbuf, disconnect) == 0)
{
debug_printf(disconnectn);
goto _mqtt_disconnect_exit;
}
continue;
}
message = (mqttmessage *)c->readbuf;
message->payload = c->readbuf + sizeof(mqttmessage);
topic.cstring = (char *)c->readbuf + sizeof(mqttmessage) + message->payloadlen;
//debug_printf(pub_sock topic:%s, payloadlen:%dn, topic.cstring, message->payloadlen);
len = mqttserialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char *)message->payload, message->payloadlen);
if (len offline_callback)
{
c->offline_callback(c);
}
net_disconnect(c);
rt_thread_delay(rt_tick_per_second * 5);
debug_printf(restart!n);
goto _mqtt_start;
_mqtt_disconnect_exit:
mqttdisconnect(c);
net_disconnect(c);
_mqtt_exit:
debug_printf(thread exitn);
return;
}
二、与mqtt broker的交互
paho-mqtt软件包提供了两种发布消息到mqtt broker的方式:udp和管道。在mqttclient结构体中有三个成员与通信有关:sock、pub_sock、pub_pipe,其中sock是与mqtt broker通信的套接字,pub_sock和pub_pipe是两种不同的发布方式:pub_sock是通过udp的方式发布消息;pub_pipe是通过管道,最终由sock发布消息。如下面的代码所示,使用哪种方式可以通过宏来配置。下面展开描述这两种方式如何与mqtt broker交互的。
/* publish interface */#if defined(rt_using_posix) && (defined(rt_using_dfs_net) || defined(sal_using_posix))
int pub_pipe[2];
#else
int pub_sock;
int pub_port;
#endif
2.1 管道(pipe)方式
在paho_mqtt_pipe.c中的paho_mqtt_thread,下面的代码完成了发布消息、接收订阅消息、处理心跳包的工作。下面以三个点细说。
当需要发布消息时,应用层需要调用mqttpublish,这个函数会调用write向管道的写端pub_pipe[1]写入待发送的数据。而管道的读端pub_pipe[0]在select中被监听,当mqttpublish被调用时,select可以往下执行,首先调用read从管道中读取数据,接着调用mqttserialize_publish将数据封包,最后调用sendpacket将数据发送出去。
当接收到订阅的消息时,select会往下执行,接着调用mqtt_cycle读取并解析出数据。
select的超时时间是50s,如果50s没有消息处理,则向broker发送心跳包。
fd_zero(&readset);
fd_set(c->sock, &readset);
fd_set(c->pub_pipe[0], &readset);
/* int select(maxfdp1, readset, writeset, exceptset, timeout); /
res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1,
&readset, rt_null, rt_null, &timeout);
if (res == 0)
{
len = mqttserialize_pingreq(c->buf, c->buf_size);
rc = sendpacket(c, len);
if (rc != 0)
{
log_e([%d] send ping rc: %d , rt_tick_get(), rc);
goto _mqtt_disconnect;
}
/ wait ping response. /
timeout.tv_sec = 5;
timeout.tv_usec = 0;
fd_zero(&readset);
fd_set(c->sock, &readset);
res = select(c->sock + 1, &readset, rt_null, rt_null, &timeout);
if (res pub_pipe[0], c->readbuf, c->readbuf_size);
if (len readbuf[len] = '�';
log_d(pub_sock recv %d byte: %s, len, c->readbuf);
if (strcmp((const char *)c->readbuf, disconnect) == 0)
{
log_d(disconnect);
goto _mqtt_disconnect_exit;
}
continue;
}
message = (mqttmessage *)c->readbuf;
message->payload = c->readbuf + sizeof(mqttmessage);
topic.cstring = (char *)c->readbuf + sizeof(mqttmessage) + message->payloadlen;
//log_d(pub_sock topic:%s, payloadlen:%d, topic.cstring, message->payloadlen);
len = mqttserialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char *)message->payload, message->payloadlen);
if (len <= 0)
{
log_d(mqttserialize_publish len: %d, len);
goto _mqtt_disconnect;
}
if ((rc = sendpacket(c, len)) != paho_success) // send the subscribe packet
{
log_d(mqttserialize_publish sendpacket rc: %d, rc);
goto _mqtt_disconnect;
}
}
2.2 udp方式
udp方式中,处理流程与管道方式基本相似。下面说明一下这种方式两个套接字的工作流程。
mqttclient结构体中有两个socket,一个是基于tcp的负责控制与服务器连接的sock,另一个是基于udp协议的负责消息发布的pub_sock。
2.2.1 sock
连接:在net_connect调用socket、connet函数建立与服务器的tcp连接。
处理:sock接收到服务器的数据后,在mqtt_cycle中处理来自服务器的connack、puback、suback、publish、pubrec、pubcomp、pingresp消息。
断开连接:在net_disconnect函数中调用closesocket关闭与服务器的tcp连接。
2.2.2 pub_sock
连接:分为pub_sock的绑定和mqtt连接的建立
1、调用socket创建pub_sock,之后调用bind绑定pub_sock到udp端口。
2、在mqttconnect函数中,通过sock发送connect消息给服务器,建立mqtt连接。
处理:先recvfrom将接受的数据拷贝到mqttclient的readbuf,再将数据回环发布到服务器。
断开连接:通过sock向服务器发送disconnect消息,断开mqtt连接。

realmeX拍照评测 对白平衡的把控超预期
PCB技术:Logic绘制实心符号的方法介绍
Littelfuse楼宇自动化产品在贸泽开售 开启楼宇智能互联时代
摩拜150城新用户免押金试骑获网友好评ofo8城略显诚意不足
今日魅族全量推送Flyme6全新系统,升级前你必须了解这些!
通过paho-mqtt软件包入门rt-thread的sal
中国无人机行业现状及未来展望解读
深圳先进电子材料国际创新研究院落户宝安 将构建协同联动的产业生态系统
彩电业务收入下滑 长虹正在向智能制造领域转型
Pasternack公司推出一系列射频负载新产品
智能家居WIFI温控器如何解决
恶性竞争及外销订单骤减 大陆LED厂掀倒闭潮
传统食堂的弊端和智慧食堂的优势,助力食堂降本增效
直流充电桩的工作原理/状态
苹果新专利:未来潮湿的手指也能使用触摸屏
2020年Q1全球智能音箱出货量达2820万台,中国厂商Q2份额将再次上升
动力及储能电池生产商孚能科技发布2021年报
第四届中国国际进口博览会(CIIE)如期顺利在上海举行
自制简易CMOS双边带收信机电路
光纤跳线的研磨方式有哪些区别