| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877 |
- /*
- @module mqtt
- @summary mqtt客户端
- @version 1.0
- @date 2022.08.25
- @demo mqtt
- @tag LUAT_USE_NETWORK
- @usage
- -- 具体用法请查看demo
- -- 本库只支持 mqtt 3.1.1, 其他版本例如3.1 或 5 均不支持!!!
- -- 只支持纯MQTT/MQTTS通信, 不支持 mqtt over websocket
- -- 几个大前提:
- -- 本库是基于TCP链接的, 支持加密TCP和非加密TCP
- -- 任何通信失败都将断开连接, 如果开启了自动重连, 那么间隔N秒后开始自动重连
- -- 上行数据均为一次性的, 没有缓存机制, 更没有上行的重试/重发机制
- -- 如何获知发送成功: 触发 mqttc:on 中 event == "sent" 的事件
- -- 关于publish时QOS值的说明, 特制模块上行到云端/服务器端的行为:
- -- QOS0, 压入底层TCP发送堆栈,视为成功
- -- QOS1, 收到服务器回应PUBACK,视为成功
- -- QOS2, 收到服务器响应PUBREC,立即上行PUBCOMP压入TCP发送队列,视为成功
- -- 重要的事情说3次: 没有重发机制, 没有重发机制, 没有重发机制
- -- 1. MQTT协议中规定了重发机制, 但那是云端/服务器端才会实现的机制, 模块端是没有的
- -- 2. 上行失败, 唯一的可能性就是TCP链接出问题了, 而TCP链接出问题的解决办法就是重连
- -- 3. 模块端不会保存任何上行数据, 重连后也无法实现重发
- -- 那业务需要确定上行是否成功, 如何解决:
- -- 首先推荐使用 QOS1, 然后监听/判断sent事件,并选取一个超时时间, 就能满足99.9%的需求
- -- 使用QOS2,反而存在PUBCOMP上行失败导致服务器端不广播数据的理论可能
- -- demo里有演示等待sent事件的代码, 类似于 sys.waitUntil("mqtt_sent", 3000) 搜mqtt_sent关键字
- */
- #include "luat_base.h"
- #include "luat_network_adapter.h"
- #include "libemqtt.h"
- #include "luat_rtos.h"
- #include "luat_zbuff.h"
- #include "luat_mem.h"
- #include "luat_mqtt.h"
- #define LUAT_LOG_TAG "mqtt"
- #include "luat_log.h"
- #define LUAT_MQTT_CTRL_TYPE "MQTTCTRL*"
- #ifdef LUAT_USE_NETDRV
- #include "luat_netdrv.h"
- #include "luat_netdrv_event.h"
- #endif
- static const char *error_string[MQTT_MSG_NET_ERROR - MQTT_MSG_CON_ERROR + 1] =
- {
- "connect",
- "tx",
- "conack",
- "other"
- };
- static luat_mqtt_ctrl_t * get_mqtt_ctrl(lua_State *L){
- if (luaL_testudata(L, 1, LUAT_MQTT_CTRL_TYPE)){
- return ((luat_mqtt_ctrl_t *)luaL_checkudata(L, 1, LUAT_MQTT_CTRL_TYPE));
- }else{
- return ((luat_mqtt_ctrl_t *)lua_touserdata(L, 1));
- }
- }
- int32_t luatos_mqtt_callback(lua_State *L, void* ptr){
- (void)ptr;
- rtos_msg_t* msg = (rtos_msg_t*)lua_topointer(L, -1);
- luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)msg->ptr;
- switch (msg->arg1) {
- // case MQTT_MSG_TCP_TX_DONE:
- // if (mqtt_ctrl->mqtt_cb) {
- // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- // if (lua_isfunction(L, -1)) {
- // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- // lua_pushstring(L, "tcp_ack");
- // lua_call(L, 2, 0);
- // }
- // }
- // break;
- case MQTT_MSG_TIMER_PING : {
- luat_mqtt_ping(mqtt_ctrl);
- break;
- }
- case MQTT_MSG_CONN_TIMEOUT: {
- LLOGW("connect timeout %s %d!! expect conack in %ds", mqtt_ctrl->host, mqtt_ctrl->remote_port, mqtt_ctrl->conn_timeout);
- #ifdef LUAT_USE_NETDRV
- luat_netdrv_fire_socket_event_netctrl(EV_NW_TIMEOUT, mqtt_ctrl->netc, 4);
- #endif
- if (mqtt_ctrl->mqtt_ref) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "error");
- lua_pushstring(L, "timeout");
- lua_pushinteger(L, msg->arg2);
- lua_call(L, 4, 0);
- }
- }
- luat_mqtt_close_socket(mqtt_ctrl);
- break;
- }
- case MQTT_MSG_PINGRESP : {
- if (mqtt_ctrl->mqtt_cb) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "pong");
- lua_call(L, 2, 0);
- }
- lua_getglobal(L, "sys_pub");
- if (lua_isfunction(L, -1)) {
- lua_pushstring(L, "MQTT_PONG");
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_call(L, 2, 0);
- }
- }
- break;
- }
- case MQTT_MSG_RECONNECT : {
- luat_mqtt_reconnect(mqtt_ctrl);
- break;
- }
- case MQTT_MSG_PUBLISH : {
- luat_mqtt_msg_t *mqtt_msg =(luat_mqtt_msg_t *)msg->arg2;
- if (mqtt_ctrl->mqtt_cb) {
- // luat_mqtt_msg_t *mqtt_msg =(luat_mqtt_msg_t *)msg->arg2;
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "recv");
- lua_pushlstring(L, (const char*)(mqtt_msg->data),mqtt_msg->topic_len);
- lua_pushlstring(L, (const char*)(mqtt_msg->data+mqtt_msg->topic_len),mqtt_msg->payload_len);
- // 增加一个返回值meta,类型为table,包含qos、retain和dup
- // mqttc:on(function(mqtt_client, event, data, payload, meta)
- // if event == "recv" then
- // log.info("mqtt recv", "topic", data)
- // log.info("mqtt recv", 'payload', payload)
- // log.info("mqtt recv", 'meta.message_id', meta.message_id)
- // log.info("mqtt recv", 'meta.qos', meta.qos)
- // log.info("mqtt recv", 'meta.retain', meta.retain)
- // log.info("mqtt recv", 'meta.dup', meta.dup)
- lua_createtable(L, 0, 4);
- lua_pushliteral(L, "message_id");
- lua_pushinteger(L, mqtt_msg->message_id);
- lua_settable(L, -3);
- lua_pushliteral(L, "qos");
- lua_pushinteger(L, (mqtt_msg->flags & 0x06) >> 1);
- // lua_pushinteger(L, MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer));
- lua_settable(L, -3);
- lua_pushliteral(L, "retain");
- lua_pushinteger(L, mqtt_msg->flags & 0x01);
- // lua_pushinteger(L, MQTTParseMessageRetain(mqtt_ctrl->mqtt_packet_buffer));
- lua_settable(L, -3);
- lua_pushliteral(L, "dup");
- lua_pushinteger(L, (mqtt_msg->flags & 0x08)?1:0);
- // lua_pushinteger(L, MQTTParseMessageDuplicate(mqtt_ctrl->mqtt_packet_buffer));
- lua_settable(L, -3);
- lua_call(L, 5, 0);
- }
- }
- luat_heap_free(mqtt_msg);
- break;
- }
- case MQTT_MSG_CONNACK: {
- if (mqtt_ctrl->mqtt_cb) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "conack");
- lua_call(L, 2, 0);
- }
- // lua_getglobal(L, "sys_pub");
- // if (lua_isfunction(L, -1)) {
- // lua_pushstring(L, "MQTT_CONNACK");
- // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- // lua_call(L, 2, 0);
- // }
- }
- break;
- }
- case MQTT_MSG_PUBACK:
- case MQTT_MSG_PUBCOMP: {
- if (mqtt_ctrl->mqtt_cb) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "sent");
- lua_pushinteger(L, msg->arg2);
- lua_call(L, 3, 0);
- }
- }
- break;
- }
- case MQTT_MSG_RELEASE: {
- if (mqtt_ctrl->mqtt_ref) {
- luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- mqtt_ctrl->mqtt_ref = 0;
- }
- break;
- }
- case MQTT_MSG_CLOSE: {
- if (mqtt_ctrl->mqtt_ref) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "close");
- lua_call(L, 2, 0);
- }
- }
- break;
- }
- case MQTT_MSG_DISCONNECT: {
- if (mqtt_ctrl->mqtt_cb) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "disconnect");
- lua_pushinteger(L, mqtt_ctrl->error_state);
- lua_call(L, 3, 0);
- }
- // lua_getglobal(L, "sys_pub");
- // if (lua_isfunction(L, -1)) {
- // lua_pushstring(L, "MQTT_DISCONNECT");
- // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- // lua_pushinteger(L, mqtt_ctrl->error_state);
- // lua_call(L, 3, 0);
- // }
- }
- break;
- }
- case MQTT_MSG_SUBACK:
- if (mqtt_ctrl->mqtt_cb) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "suback");
- lua_pushboolean(L, (msg->arg2 <= 2));
- lua_pushinteger(L, msg->arg2);
- lua_call(L, 4, 0);
- }
- }
- break;
- case MQTT_MSG_UNSUBACK:
- if (mqtt_ctrl->mqtt_cb) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "unsuback");
- lua_call(L, 2, 0);
- }
- }
- break;
- case MQTT_MSG_CON_ERROR:
- case MQTT_MSG_TX_ERROR:
- case MQTT_MSG_CONACK_ERROR:
- case MQTT_MSG_NET_ERROR:
- if (mqtt_ctrl->mqtt_ref) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- if (lua_isfunction(L, -1)) {
- lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
- lua_pushstring(L, "error");
- lua_pushstring(L, error_string[msg->arg1 - MQTT_MSG_CON_ERROR]);
- lua_pushinteger(L, msg->arg2);
- lua_call(L, 4, 0);
- }
- }
- #ifdef LUAT_USE_NETDRV
- luat_netdrv_fire_socket_event_netctrl(EV_NW_SOCKET_ERROR, mqtt_ctrl->netc, 4);
- #endif
- break;
- default : {
- LLOGD("l_mqtt_callback error arg1:%d",msg->arg1);
- break;
- }
- }
- // lua_pushinteger(L, 0);
- return 0;
- }
- /*
- 订阅主题
- @api mqttc:subscribe(topic, qos)
- @string/table 主题
- @int topic为string时生效 0/1/2 默认0
- @return int 消息id,当qos为1/2时有效, 若底层返回失败,会返回nil
- @usage
- -- 订阅单个topic, 且qos=0
- mqttc:subscribe("/luatos/123456", 0)
- -- 订阅单个topic, 且qos=1
- mqttc:subscribe("/luatos/12345678", 1)
- -- 订阅多个topic, 且使用不同的qos
- mqttc:subscribe({["/luatos/1234567"]=1,["/luatos/12345678"]=2})
- */
- static int l_mqtt_subscribe(lua_State *L) {
- size_t len = 0;
- int ret = 1;
- uint16_t msgid = 0;
- luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_touserdata(L, 1);
- if (lua_isstring(L, 2)){
- const char * topic = luaL_checklstring(L, 2, &len);
- uint8_t qos = luaL_optinteger(L, 3, 0);
- ret = mqtt_subscribe(&(mqtt_ctrl->broker), topic, &msgid, qos);
- }else if(lua_istable(L, 2)){
- lua_pushnil(L);
- while (lua_next(L, 2) != 0) {
- ret &= mqtt_subscribe(&(mqtt_ctrl->broker), lua_tostring(L, -2), &msgid, luaL_optinteger(L, -1, 0)) == 1 ? 1 : 0;
- lua_pop(L, 1);
- }
- }
- if (ret == 1) {
- lua_pushinteger(L, msgid);
- return 1;
- }
- else {
- return 0;
- }
- }
- /*
- 取消订阅主题
- @api mqttc:unsubscribe(topic)
- @string/table 主题
- @usage
- mqttc:unsubscribe("/luatos/123456")
- mqttc:unsubscribe({"/luatos/1234567","/luatos/12345678"})
- */
- static int l_mqtt_unsubscribe(lua_State *L) {
- size_t len = 0;
- int ret = 0;
- uint16_t msgid = 0;
- luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_touserdata(L, 1);
- if (lua_isstring(L, 2)){
- const char * topic = luaL_checklstring(L, 2, &len);
- ret = mqtt_unsubscribe(&(mqtt_ctrl->broker), topic, &msgid);
- }else if(lua_istable(L, 2)){
- size_t count = lua_rawlen(L, 2);
- for (size_t i = 1; i <= count; i++){
- lua_geti(L, 2, i);
- const char * topic = luaL_checklstring(L, -1, &len);
- ret &= mqtt_unsubscribe(&(mqtt_ctrl->broker), topic, &msgid) == 1 ? 1 : 0;
- lua_pop(L, 1);
- }
- }
- if (ret == 1) {
- lua_pushinteger(L, msgid);
- return 1;
- }
- return 0;
- }
- /*
- 配置是否打开debug信息
- @api mqttc:debug(onoff)
- @boolean 是否打开debug开关
- @return nil 无返回值
- @usage mqttc:debug(true)
- */
- static int l_mqtt_set_debug(lua_State *L){
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- if (lua_isboolean(L, 2)){
- mqtt_ctrl->netc->is_debug = lua_toboolean(L, 2);
- }
- return 0;
- }
- /*
- mqtt客户端创建
- @api mqtt.create(adapter,host,port,ssl,opts)
- @int 适配器序号, 如果不填,会选择平台自带的方式,然后是最后一个注册的适配器,可选值请查阅socket库的常量表
- @string 服务器地址,可以是域名, 也可以是ip
- @int 端口号
- @bool/table 是否为ssl加密连接,默认不加密,true为无证书最简单的加密,table为有证书的加密 <br>server_cert 服务器ca证书数据 <br>client_cert 客户端证书数据 <br>client_key 客户端私钥加密数据 <br>client_password 客户端私钥口令数据 <br>verify 是否强制校验 0不校验/1可选校验/2强制校验 默认2
- @table mqtt扩展参数
- @return userdata 若成功会返回mqtt客户端实例,否则返回nil
- @usage
- -- 普通TCP链接
- mqttc = mqtt.create(nil,"120.55.137.106", 1884)
- -- 普通TCP链接,mqtt接收缓冲区4096
- mqttc = mqtt.create(nil,"120.55.137.106", 1884, nil, {rxSize = 4096})
- -- 加密TCP链接,不验证服务器证书
- mqttc = mqtt.create(nil,"120.55.137.106", 8883, true)
- -- 加密TCPTCP链接,单服务器证书验证
- mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt")})
- -- 加密TCPTCP链接,单服务器证书验证, 但可选认证
- mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt"), verify=1})
- -- 加密TCPTCP链接,双向证书验证
- mqttc = mqtt.create(nil,"120.55.137.106", 8883, {
- server_cert=io.readFile("/luadb/ca.crt"),
- client_cert=io.readFile("/luadb/client.pem"),
- client_key=io.readFile("/luadb/client.key"),
- client_password="123456",
- })
- -- opts参数说明
- -- ipv6 = true, -- 是否为ipv6连接,默认false
- -- rxSize = 4096, -- mqtt接收缓冲区大小,单位字节,默认32k
- -- conn_timeout = 15, -- 连接超时时间,按收到conack为止,单位秒, 默认30秒
- */
- static int l_mqtt_create(lua_State *L) {
- int ret = 0;
- int adapter_index = luaL_optinteger(L, 1, network_register_get_default());
- if (adapter_index < 0 || adapter_index >= NW_ADAPTER_QTY){
- LLOGE("尚无已注册的网络适配器");
- return 0;
- }
- luat_mqtt_ctrl_t *mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_newuserdata(L, sizeof(luat_mqtt_ctrl_t));
- if (!mqtt_ctrl){
- LLOGE("out of memory when malloc mqtt_ctrl");
- return 0;
- }
- mqtt_ctrl->app_cb = NULL;
- ret = luat_mqtt_init(mqtt_ctrl, adapter_index);
- if (ret) {
- LLOGE("mqtt init FAID ret %d", ret);
- return 0;
- }
- luat_mqtt_connopts_t opts = {0};
- // 连接参数相关
- // const char *ip;
- size_t ip_len = 0;
- network_set_ip_invaild(&mqtt_ctrl->ip_addr);
- if (lua_isinteger(L, 2)){
- network_set_ip_ipv4(&mqtt_ctrl->ip_addr, lua_tointeger(L, 2));
- // ip = NULL;
- ip_len = 0;
- }else{
- ip_len = 0;
- opts.host = luaL_checklstring(L, 2, &ip_len);
- // TODO 判断 host的长度,超过191就不行了
- }
- opts.port = luaL_checkinteger(L, 3);
- // 加密相关
- if (lua_isboolean(L, 4)){
- opts.is_tls = lua_toboolean(L, 4);
- }
- if (lua_istable(L, 4)){
- opts.is_tls = 1;
- opts.verify = 2;
- lua_pushstring(L, "verify");
- if (LUA_TNUMBER == lua_gettable(L, 4)) {
- opts.verify = luaL_checknumber(L, -1);
- }
- lua_pop(L, 1);
- lua_pushstring(L, "server_cert");
- if (LUA_TSTRING == lua_gettable(L, 4)) {
- opts.server_cert = luaL_checklstring(L, -1, &opts.server_cert_len);
- }
- lua_pop(L, 1);
- lua_pushstring(L, "client_cert");
- if (LUA_TSTRING == lua_gettable(L, 4)) {
- opts.client_cert = luaL_checklstring(L, -1, &opts.client_cert_len);
- }
- lua_pop(L, 1);
- lua_pushstring(L, "client_key");
- if (LUA_TSTRING == lua_gettable(L, 4)) {
- opts.client_key = luaL_checklstring(L, -1, &opts.client_key_len);
- }
- lua_pop(L, 1);
- lua_pushstring(L, "client_password");
- if (LUA_TSTRING == lua_gettable(L, 4)) {
- opts.client_password = luaL_checklstring(L, -1, &opts.client_password_len);
- }
- lua_pop(L, 1);
- }
-
- if (lua_isboolean(L, 5)){
- opts.is_ipv6 = lua_toboolean(L, 5);
- }else if(lua_istable(L, 5)){
- lua_pushstring(L, "ipv6");
- if (LUA_TBOOLEAN == lua_gettable(L, 5)) {
- opts.is_ipv6 = lua_toboolean(L, -1);
- }
- lua_pop(L, 1);
- lua_pushstring(L, "rxSize");
- if (LUA_TNUMBER == lua_gettable(L, 5)) {
- uint32_t len = luaL_checknumber(L, -1);
- if(len > 0)
- luat_mqtt_set_rxbuff_size(mqtt_ctrl, len);
- }
- lua_pop(L, 1);
- lua_pushstring(L, "conn_timeout");
- if (LUA_TNUMBER == lua_gettable(L, 5)) {
- opts.conn_timeout = luaL_checknumber(L, -1);
- if (opts.conn_timeout < 5)
- opts.conn_timeout = 5;
- else if (opts.conn_timeout > 120)
- opts.conn_timeout = 120;
- }
- lua_pop(L, 1);
- }
- ret = luat_mqtt_set_connopts(mqtt_ctrl, &opts);
- if (ret){
- LLOGE("设置mqtt参数失败");
- luat_mqtt_release_socket(mqtt_ctrl);
- return 0;
- }
- luaL_setmetatable(L, LUAT_MQTT_CTRL_TYPE);
- lua_pushvalue(L, -1);
- mqtt_ctrl->mqtt_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- return 1;
- }
- /*
- mqtt三元组配置及cleanSession
- @api mqttc:auth(client_id,username,password,cleanSession)
- @string 设备识别id,对于同一个mqtt服务器来说, 通常要求唯一,相同client_id会互相踢下线
- @string 账号 可选
- @string 密码 可选
- @bool 清除session,默认true,可选
- @return bool 成功返回true,否则返回nil. 注意, 返回值是2025.3.19新增的
- @usage
- -- 无账号密码登录,仅clientId
- mqttc:auth("123456789")
- -- 带账号密码登录
- mqttc:auth("123456789","username","password")
- -- 额外配置cleanSession,不清除
- mqttc:auth("123456789","username","password", false)
- -- 无clientId模式, 服务器随机生成id, cleanSession不可配置
- mqttc:auth()
- */
- static int l_mqtt_auth(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- const char *client_id = luaL_optstring(L, 2, "");
- const char *username = luaL_optstring(L, 3, "");
- const char *password = luaL_optstring(L, 4, "");
- int cleanSession = 1;
- if (lua_isboolean(L, 5) && !lua_toboolean(L, 5)) {
- cleanSession = 0;
- }
- if (client_id != NULL && strlen(client_id) > MQTT_CONF_CLIENT_ID_LENGTH) {
- LLOGE("mqtt client_id 太长或者无效!!!!");
- return 0;
- }
- if (username != NULL && strlen(username) > MQTT_CONF_USERNAME_LENGTH) {
- LLOGE("mqtt username 太长或者无效!!!!");
- return 0;
- }
- if (password != NULL && strlen(password) > MQTT_CONF_PASSWORD_LENGTH) {
- LLOGE("mqtt password 太长或者无效!!!!");
- return 0;
- }
- mqtt_init(&(mqtt_ctrl->broker), client_id);
- mqtt_init_auth(&(mqtt_ctrl->broker), username, password);
- mqtt_ctrl->broker.clean_session = cleanSession;
- lua_pushboolean(L, 1);
- return 1;
- }
- /*
- mqtt心跳设置
- @api mqttc:keepalive(time)
- @int 可选 单位s 默认240s. 最先15,最高600
- @return nil 无返回值
- @usage
- mqttc:keepalive(30)
- */
- static int l_mqtt_keepalive(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- int timeout = luaL_optinteger(L, 2, 240);
- if (timeout < 15)
- timeout = 15;
- if (timeout > 600)
- timeout = 600;
- mqtt_ctrl->keepalive = timeout;
- return 0;
- }
- /*
- 注册mqtt回调
- @api mqttc:on(cb)
- @function cb mqtt回调,参数包括mqtt_client, event, data, payload
- @return nil 无返回值
- @usage
- mqttc:on(function(mqtt_client, event, data, payload, metas)
- -- 用户自定义代码
- log.info("mqtt", "event", event, mqtt_client, data, payload)
- end)
- --[[
- event可能出现的值有
- conack -- 服务器鉴权完成, 表示mqtt连接已经建立, 可以订阅和发布数据了
- suback -- 订阅完成,data为应答结果, true成功,payload为0~2数字表示qos,data为false则失败,payload为失败码,一般是0x80
- unsuback -- 取消订阅完成
- recv -- 接收到数据,由服务器下发, data为topic值(string), payload为业务数据(string), metas是元数据(table), 一般不处理.
- -- metas包含以下内容
- -- message_id
- -- qos 取值范围0,1,2
- -- retain 取值范围 0,1
- -- dup 取值范围 0,1
- sent -- 发送完成, qos0会马上通知, qos1/qos2会在服务器应答会回调, data为消息id
- disconnect -- 服务器断开连接,网络问题或服务器踢了客户端,例如clientId重复,超时未上报业务数据
- pong -- 收到服务器心跳应答,没有附加数据
- error -- 严重的异常,会导致断开连接, data(string)为具体异常,有以下几种
- -- connect 服务器连接不上
- -- tx 发送数据失败
- -- conack 服务器鉴权失败,失败码在payload(int)
- -- other 其他异常
- ]]
- */
- static int l_mqtt_on(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- if (mqtt_ctrl->mqtt_cb != 0) {
- luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- mqtt_ctrl->mqtt_cb = 0;
- }
- if (lua_isfunction(L, 2)) {
- lua_pushvalue(L, 2);
- mqtt_ctrl->mqtt_cb = luaL_ref(L, LUA_REGISTRYINDEX);
- }
- return 0;
- }
- /*
- 连接服务器
- @api mqttc:connect()
- @return boolean 发起成功返回true, 否则返回false
- @usage
- -- 开始建立连接
- mqttc:connect()
- -- 本函数仅代表发起成功, 后续仍需根据ready函数判断mqtt是否连接正常
- */
- static int l_mqtt_connect(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- int ret = luat_mqtt_connect(mqtt_ctrl);
- if (ret) {
- LLOGE("socket connect ret=%d\n", ret);
- luat_mqtt_close_socket(mqtt_ctrl);
- lua_pushboolean(L, 0);
- return 1;
- }
- lua_pushboolean(L, 1);
- return 1;
- }
- /*
- 断开服务器连接(不会释放资源)
- @api mqttc:disconnect()
- @return boolean 发起成功返回true, 否则返回false
- @usage
- -- 断开连接
- mqttc:disconnect()
- */
- static int l_mqtt_disconnect(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- mqtt_disconnect(&(mqtt_ctrl->broker));
- luat_mqtt_close_socket(mqtt_ctrl);
- lua_pushboolean(L, 1);
- return 1;
- }
- /*
- 自动重连
- @api mqttc:autoreconn(reconnect, reconnect_time)
- @bool 是否自动重连
- @int 自动重连周期 单位ms 默认3000ms
- @usage
- mqttc:autoreconn(true)
- */
- static int l_mqtt_autoreconn(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- if (lua_isboolean(L, 2)){
- mqtt_ctrl->reconnect = lua_toboolean(L, 2);
- }
- mqtt_ctrl->reconnect_time = luaL_optinteger(L, 3, 3000);
- if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time < 1000)
- mqtt_ctrl->reconnect_time = 1000;
- return 0;
- }
- /*
- 发布消息
- @api mqttc:publish(topic, data, qos, retain)
- @string 主题,必填
- @string 消息,必填,但长度可以是0
- @int 消息级别 0/1 默认0
- @int 是否存档, 0/1,默认0
- @return int 消息id, 当qos为1或2时会有效值. 若底层返回有错误发生, 则会返回nil
- @usage
- mqttc:publish("/luatos/123456", "123")
- */
- static int l_mqtt_publish(lua_State *L) {
- uint16_t message_id = 0;
- size_t payload_len = 0;
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- const char * topic = luaL_checkstring(L, 2);
- const char * payload = NULL;
- luat_zbuff_t *buff = NULL;
- if (lua_isstring(L, 3)){
- payload = luaL_checklstring(L, 3, &payload_len);
- }else if (luaL_testudata(L, 3, LUAT_ZBUFF_TYPE)){
- buff = ((luat_zbuff_t *)luaL_checkudata(L, 3, LUAT_ZBUFF_TYPE));
- payload = (const char*)buff->addr;
- payload_len = buff->used;
- }else{
- LLOGD("only support string or zbuff");
- }
- // LLOGD("payload_len:%d",payload_len);
- uint8_t qos = luaL_optinteger(L, 4, 0);
- uint8_t retain = luaL_optinteger(L, 5, 0);
- int ret = mqtt_publish_with_qos(&(mqtt_ctrl->broker), topic, payload, payload_len, retain, qos, &message_id);
- if (ret != 1){
- return 0;
- }
- if (qos == 0){
- rtos_msg_t msg = {0};
- msg.handler = luatos_mqtt_callback;
- msg.ptr = mqtt_ctrl;
- msg.arg1 = MQTT_MSG_PUBACK;
- msg.arg2 = message_id;
- luat_msgbus_put(&msg, 0);
- }
- lua_pushinteger(L, message_id);
- return 1;
- }
- /*
- mqtt客户端关闭(关闭后资源释放无法再使用)
- @api mqttc:close()
- @usage
- mqttc:close()
- */
- static int l_mqtt_close(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- mqtt_disconnect(&(mqtt_ctrl->broker));
- luat_mqtt_close_socket(mqtt_ctrl);
- if (mqtt_ctrl->mqtt_cb != 0) {
- luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
- mqtt_ctrl->mqtt_cb = 0;
- }
- luat_mqtt_release_socket(mqtt_ctrl);
- return 0;
- }
- /*
- mqtt客户端是否就绪
- @api mqttc:ready()
- @return bool 客户端是否就绪
- @usage
- local error = mqttc:ready()
- */
- static int l_mqtt_ready(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- lua_pushboolean(L, mqtt_ctrl->mqtt_state == MQTT_STATE_READY ? 1 : 0);
- return 1;
- }
- /*
- mqtt客户端状态
- @api mqttc:state()
- @return number 客户端状态
- @usage
- local state = mqttc:state()
- -- 已知状态:
- -- 0: MQTT_STATE_DISCONNECT
- -- 1: MQTT_STATE_CONNECTING
- -- 2: MQTT_STATE_CONNECTED
- -- 3: MQTT_STATE_READY
- -- 4: MQTT_STATE_ERROR
- */
- static int l_mqtt_state(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- lua_pushinteger(L, mqtt_ctrl->mqtt_state);
- return 1;
- }
- /*
- 设置遗嘱消息
- @api mqttc:will(topic, payload, qos, retain)
- @string 遗嘱消息的topic
- @string 遗嘱消息的payload
- @string 遗嘱消息的qos, 默认0, 可以不填
- @string 遗嘱消息的retain, 默认0, 可以不填
- @return bool 成功返回true,否则返回false
- @usage
- -- 要在connect之前调用
- mqttc:will("/xxx/xxx", "xxxxxx")
- */
- static int l_mqtt_will(lua_State *L) {
- luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
- size_t payload_len = 0;
- const char* topic = luaL_checkstring(L, 2);
- const char* payload = luaL_checklstring(L, 3, &payload_len);
- int qos = luaL_optinteger(L, 4, 0);
- int retain = luaL_optinteger(L, 5, 0);
- lua_pushboolean(L, luat_mqtt_set_will(mqtt_ctrl, topic, payload, payload_len, qos, retain) == 0 ? 1 : 0);
- return 1;
- }
- static int _mqtt_struct_newindex(lua_State *L);
- void luat_mqtt_struct_init(lua_State *L) {
- luaL_newmetatable(L, LUAT_MQTT_CTRL_TYPE);
- lua_pushcfunction(L, _mqtt_struct_newindex);
- lua_setfield( L, -2, "__index" );
- lua_pop(L, 1);
- }
- #include "rotable2.h"
- static const rotable_Reg_t reg_mqtt[] =
- {
- {"create", ROREG_FUNC(l_mqtt_create)},
- {"auth", ROREG_FUNC(l_mqtt_auth)},
- {"keepalive", ROREG_FUNC(l_mqtt_keepalive)},
- {"on", ROREG_FUNC(l_mqtt_on)},
- {"connect", ROREG_FUNC(l_mqtt_connect)},
- {"autoreconn", ROREG_FUNC(l_mqtt_autoreconn)},
- {"publish", ROREG_FUNC(l_mqtt_publish)},
- {"subscribe", ROREG_FUNC(l_mqtt_subscribe)},
- {"unsubscribe", ROREG_FUNC(l_mqtt_unsubscribe)},
- {"disconnect", ROREG_FUNC(l_mqtt_disconnect)},
- {"close", ROREG_FUNC(l_mqtt_close)},
- {"ready", ROREG_FUNC(l_mqtt_ready)},
- {"will", ROREG_FUNC(l_mqtt_will)},
- {"debug", ROREG_FUNC(l_mqtt_set_debug)},
- {"state", ROREG_FUNC(l_mqtt_state)},
- //@const STATE_DISCONNECT number mqtt 断开
- {"STATE_DISCONNECT",ROREG_INT(MQTT_STATE_DISCONNECT)},
- //@const STATE_SCONNECT number mqtt socket连接中
- {"STATE_SCONNECT", ROREG_INT(MQTT_STATE_SCONNECT)},
- //@const STATE_MQTT number mqtt socket已连接 mqtt连接中
- {"STATE_MQTT", ROREG_INT(MQTT_STATE_MQTT)},
- //@const STATE_READY number mqtt mqtt已连接
- {"STATE_READY", ROREG_INT(MQTT_STATE_READY)},
- { NULL, ROREG_INT(0)}
- };
- static int _mqtt_struct_newindex(lua_State *L) {
- const rotable_Reg_t* reg = reg_mqtt;
- const char* key = luaL_checkstring(L, 2);
- while (1) {
- if (reg->name == NULL)
- return 0;
- if (!strcmp(reg->name, key)) {
- lua_pushcfunction(L, reg->value.value.func);
- return 1;
- }
- reg ++;
- }
- //return 0;
- }
- const rotable_Reg_t reg_mqtt_emtry[] =
- {
- { NULL, ROREG_INT(0)}
- };
- LUAMOD_API int luaopen_mqtt( lua_State *L ) {
- #ifdef LUAT_USE_NETWORK
- luat_newlib2(L, reg_mqtt);
- luat_mqtt_struct_init(L);
- return 1;
- #else
- luat_newlib2(L, reg_mqtt_emtry);
- return 1;
- LLOGE("mqtt require network enable!!");
- #endif
- }
|