/* @module mqtt @summary mqtt客户端 @version 1.0 @date 2022.08.25 @demo mqtt @tag LUAT_USE_NETWORK @usage -- 具体用法请查看demo -- 本库只支持 mqtt 3.1.1, 其他版本例如3.1 或 5 均不支持!!! -- 只支持纯MQTT/MQTTS通信, 不支持 mqtt over websocket -- 几个大前提: -- 本库是基于TCP链接的, 支持加密TCP和非加密TCP -- 任何通信失败都将断开连接, 如果开启了自动重连, 那么间隔N秒后开始自动重连 -- 上行数据均为一次性的, 没有缓存机制, 更没有上行的重试/重发机制 -- 如何获知发送成功: 触发 mqttc:on 中 event == "sent" 的事件 -- 关于publish时QOS值的说明, 特制模块上行到云端/服务器端的行为: -- QOS0, 压入底层TCP发送堆栈,视为成功 -- QOS1, 收到服务器回应PUBACK,视为成功 -- QOS2, 收到服务器响应PUBREC,立即上行PUBCOMP压入TCP发送队列,视为成功 -- 重要的事情说3次: 没有重发机制, 没有重发机制, 没有重发机制 -- 1. MQTT协议中规定了重发机制, 但那是云端/服务器端才会实现的机制, 模块端是没有的 -- 2. 上行失败, 唯一的可能性就是TCP链接出问题了, 而TCP链接出问题的解决办法就是重连 -- 3. 模块端不会保存任何上行数据, 重连后也无法实现重发 -- 那业务需要确定上行是否成功, 如何解决: -- 首先推荐使用 QOS1, 然后监听/判断sent事件,并选取一个超时时间, 就能满足99.9%的需求 -- 使用QOS2,反而存在PUBCOMP上行失败导致服务器端不广播数据的理论可能 -- demo里有演示等待sent事件的代码, 类似于 sys.waitUntil("mqtt_sent", 3000) 搜mqtt_sent关键字 */ #include "luat_base.h" #include "luat_network_adapter.h" #include "libemqtt.h" #include "luat_rtos.h" #include "luat_zbuff.h" #include "luat_mem.h" #include "luat_mqtt.h" #define LUAT_LOG_TAG "mqtt" #include "luat_log.h" #define LUAT_MQTT_CTRL_TYPE "MQTTCTRL*" #ifdef LUAT_USE_NETDRV #include "luat_netdrv.h" #include "luat_netdrv_event.h" #endif static const char *error_string[MQTT_MSG_NET_ERROR - MQTT_MSG_CON_ERROR + 1] = { "connect", "tx", "conack", "other" }; static luat_mqtt_ctrl_t * get_mqtt_ctrl(lua_State *L){ if (luaL_testudata(L, 1, LUAT_MQTT_CTRL_TYPE)){ return ((luat_mqtt_ctrl_t *)luaL_checkudata(L, 1, LUAT_MQTT_CTRL_TYPE)); }else{ return ((luat_mqtt_ctrl_t *)lua_touserdata(L, 1)); } } int32_t luatos_mqtt_callback(lua_State *L, void* ptr){ (void)ptr; rtos_msg_t* msg = (rtos_msg_t*)lua_topointer(L, -1); luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)msg->ptr; switch (msg->arg1) { // case MQTT_MSG_TCP_TX_DONE: // if (mqtt_ctrl->mqtt_cb) { // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); // if (lua_isfunction(L, -1)) { // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); // lua_pushstring(L, "tcp_ack"); // lua_call(L, 2, 0); // } // } // break; case MQTT_MSG_TIMER_PING : { luat_mqtt_ping(mqtt_ctrl); break; } case MQTT_MSG_CONN_TIMEOUT: { LLOGW("connect timeout %s %d!! expect conack in %ds", mqtt_ctrl->host, mqtt_ctrl->remote_port, mqtt_ctrl->conn_timeout); #ifdef LUAT_USE_NETDRV luat_netdrv_fire_socket_event_netctrl(EV_NW_TIMEOUT, mqtt_ctrl->netc, 4); #endif if (mqtt_ctrl->mqtt_ref) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "error"); lua_pushstring(L, "timeout"); lua_pushinteger(L, msg->arg2); lua_call(L, 4, 0); } } luat_mqtt_close_socket(mqtt_ctrl); break; } case MQTT_MSG_PINGRESP : { if (mqtt_ctrl->mqtt_cb) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "pong"); lua_call(L, 2, 0); } lua_getglobal(L, "sys_pub"); if (lua_isfunction(L, -1)) { lua_pushstring(L, "MQTT_PONG"); lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_call(L, 2, 0); } } break; } case MQTT_MSG_RECONNECT : { luat_mqtt_reconnect(mqtt_ctrl); break; } case MQTT_MSG_PUBLISH : { luat_mqtt_msg_t *mqtt_msg =(luat_mqtt_msg_t *)msg->arg2; if (mqtt_ctrl->mqtt_cb) { // luat_mqtt_msg_t *mqtt_msg =(luat_mqtt_msg_t *)msg->arg2; lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "recv"); lua_pushlstring(L, (const char*)(mqtt_msg->data),mqtt_msg->topic_len); lua_pushlstring(L, (const char*)(mqtt_msg->data+mqtt_msg->topic_len),mqtt_msg->payload_len); // 增加一个返回值meta,类型为table,包含qos、retain和dup // mqttc:on(function(mqtt_client, event, data, payload, meta) // if event == "recv" then // log.info("mqtt recv", "topic", data) // log.info("mqtt recv", 'payload', payload) // log.info("mqtt recv", 'meta.message_id', meta.message_id) // log.info("mqtt recv", 'meta.qos', meta.qos) // log.info("mqtt recv", 'meta.retain', meta.retain) // log.info("mqtt recv", 'meta.dup', meta.dup) lua_createtable(L, 0, 4); lua_pushliteral(L, "message_id"); lua_pushinteger(L, mqtt_msg->message_id); lua_settable(L, -3); lua_pushliteral(L, "qos"); lua_pushinteger(L, (mqtt_msg->flags & 0x06) >> 1); // lua_pushinteger(L, MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer)); lua_settable(L, -3); lua_pushliteral(L, "retain"); lua_pushinteger(L, mqtt_msg->flags & 0x01); // lua_pushinteger(L, MQTTParseMessageRetain(mqtt_ctrl->mqtt_packet_buffer)); lua_settable(L, -3); lua_pushliteral(L, "dup"); lua_pushinteger(L, (mqtt_msg->flags & 0x08)?1:0); // lua_pushinteger(L, MQTTParseMessageDuplicate(mqtt_ctrl->mqtt_packet_buffer)); lua_settable(L, -3); lua_call(L, 5, 0); } } luat_heap_free(mqtt_msg); break; } case MQTT_MSG_CONNACK: { if (mqtt_ctrl->mqtt_cb) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "conack"); lua_call(L, 2, 0); } // lua_getglobal(L, "sys_pub"); // if (lua_isfunction(L, -1)) { // lua_pushstring(L, "MQTT_CONNACK"); // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); // lua_call(L, 2, 0); // } } break; } case MQTT_MSG_PUBACK: case MQTT_MSG_PUBCOMP: { if (mqtt_ctrl->mqtt_cb) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "sent"); lua_pushinteger(L, msg->arg2); lua_call(L, 3, 0); } } break; } case MQTT_MSG_RELEASE: { if (mqtt_ctrl->mqtt_ref) { luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); mqtt_ctrl->mqtt_ref = 0; } break; } case MQTT_MSG_CLOSE: { if (mqtt_ctrl->mqtt_ref) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "close"); lua_call(L, 2, 0); } } break; } case MQTT_MSG_DISCONNECT: { if (mqtt_ctrl->mqtt_cb) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "disconnect"); lua_pushinteger(L, mqtt_ctrl->error_state); lua_call(L, 3, 0); } // lua_getglobal(L, "sys_pub"); // if (lua_isfunction(L, -1)) { // lua_pushstring(L, "MQTT_DISCONNECT"); // lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); // lua_pushinteger(L, mqtt_ctrl->error_state); // lua_call(L, 3, 0); // } } break; } case MQTT_MSG_SUBACK: if (mqtt_ctrl->mqtt_cb) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "suback"); lua_pushboolean(L, (msg->arg2 <= 2)); lua_pushinteger(L, msg->arg2); lua_call(L, 4, 0); } } break; case MQTT_MSG_UNSUBACK: if (mqtt_ctrl->mqtt_cb) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "unsuback"); lua_call(L, 2, 0); } } break; case MQTT_MSG_CON_ERROR: case MQTT_MSG_TX_ERROR: case MQTT_MSG_CONACK_ERROR: case MQTT_MSG_NET_ERROR: if (mqtt_ctrl->mqtt_ref) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); if (lua_isfunction(L, -1)) { lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref); lua_pushstring(L, "error"); lua_pushstring(L, error_string[msg->arg1 - MQTT_MSG_CON_ERROR]); lua_pushinteger(L, msg->arg2); lua_call(L, 4, 0); } } #ifdef LUAT_USE_NETDRV luat_netdrv_fire_socket_event_netctrl(EV_NW_SOCKET_ERROR, mqtt_ctrl->netc, 4); #endif break; default : { LLOGD("l_mqtt_callback error arg1:%d",msg->arg1); break; } } // lua_pushinteger(L, 0); return 0; } /* 订阅主题 @api mqttc:subscribe(topic, qos) @string/table 主题 @int topic为string时生效 0/1/2 默认0 @return int 消息id,当qos为1/2时有效, 若底层返回失败,会返回nil @usage -- 订阅单个topic, 且qos=0 mqttc:subscribe("/luatos/123456", 0) -- 订阅单个topic, 且qos=1 mqttc:subscribe("/luatos/12345678", 1) -- 订阅多个topic, 且使用不同的qos mqttc:subscribe({["/luatos/1234567"]=1,["/luatos/12345678"]=2}) */ static int l_mqtt_subscribe(lua_State *L) { size_t len = 0; int ret = 1; uint16_t msgid = 0; luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_touserdata(L, 1); if (lua_isstring(L, 2)){ const char * topic = luaL_checklstring(L, 2, &len); uint8_t qos = luaL_optinteger(L, 3, 0); ret = mqtt_subscribe(&(mqtt_ctrl->broker), topic, &msgid, qos); }else if(lua_istable(L, 2)){ lua_pushnil(L); while (lua_next(L, 2) != 0) { ret &= mqtt_subscribe(&(mqtt_ctrl->broker), lua_tostring(L, -2), &msgid, luaL_optinteger(L, -1, 0)) == 1 ? 1 : 0; lua_pop(L, 1); } } if (ret == 1) { lua_pushinteger(L, msgid); return 1; } else { return 0; } } /* 取消订阅主题 @api mqttc:unsubscribe(topic) @string/table 主题 @usage mqttc:unsubscribe("/luatos/123456") mqttc:unsubscribe({"/luatos/1234567","/luatos/12345678"}) */ static int l_mqtt_unsubscribe(lua_State *L) { size_t len = 0; int ret = 0; uint16_t msgid = 0; luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_touserdata(L, 1); if (lua_isstring(L, 2)){ const char * topic = luaL_checklstring(L, 2, &len); ret = mqtt_unsubscribe(&(mqtt_ctrl->broker), topic, &msgid); }else if(lua_istable(L, 2)){ size_t count = lua_rawlen(L, 2); for (size_t i = 1; i <= count; i++){ lua_geti(L, 2, i); const char * topic = luaL_checklstring(L, -1, &len); ret &= mqtt_unsubscribe(&(mqtt_ctrl->broker), topic, &msgid) == 1 ? 1 : 0; lua_pop(L, 1); } } if (ret == 1) { lua_pushinteger(L, msgid); return 1; } return 0; } /* 配置是否打开debug信息 @api mqttc:debug(onoff) @boolean 是否打开debug开关 @return nil 无返回值 @usage mqttc:debug(true) */ static int l_mqtt_set_debug(lua_State *L){ luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); if (lua_isboolean(L, 2)){ mqtt_ctrl->netc->is_debug = lua_toboolean(L, 2); } return 0; } /* mqtt客户端创建 @api mqtt.create(adapter,host,port,ssl,opts) @int 适配器序号, 如果不填,会选择平台自带的方式,然后是最后一个注册的适配器,可选值请查阅socket库的常量表 @string 服务器地址,可以是域名, 也可以是ip @int 端口号 @bool/table 是否为ssl加密连接,默认不加密,true为无证书最简单的加密,table为有证书的加密
server_cert 服务器ca证书数据
client_cert 客户端证书数据
client_key 客户端私钥加密数据
client_password 客户端私钥口令数据
verify 是否强制校验 0不校验/1可选校验/2强制校验 默认2 @table mqtt扩展参数 @return userdata 若成功会返回mqtt客户端实例,否则返回nil @usage -- 普通TCP链接 mqttc = mqtt.create(nil,"120.55.137.106", 1884) -- 普通TCP链接,mqtt接收缓冲区4096 mqttc = mqtt.create(nil,"120.55.137.106", 1884, nil, {rxSize = 4096}) -- 加密TCP链接,不验证服务器证书 mqttc = mqtt.create(nil,"120.55.137.106", 8883, true) -- 加密TCPTCP链接,单服务器证书验证 mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt")}) -- 加密TCPTCP链接,单服务器证书验证, 但可选认证 mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt"), verify=1}) -- 加密TCPTCP链接,双向证书验证 mqttc = mqtt.create(nil,"120.55.137.106", 8883, { server_cert=io.readFile("/luadb/ca.crt"), client_cert=io.readFile("/luadb/client.pem"), client_key=io.readFile("/luadb/client.key"), client_password="123456", }) -- opts参数说明 -- ipv6 = true, -- 是否为ipv6连接,默认false -- rxSize = 4096, -- mqtt接收缓冲区大小,单位字节,默认32k -- conn_timeout = 15, -- 连接超时时间,按收到conack为止,单位秒, 默认30秒 */ static int l_mqtt_create(lua_State *L) { int ret = 0; int adapter_index = luaL_optinteger(L, 1, network_register_get_default()); if (adapter_index < 0 || adapter_index >= NW_ADAPTER_QTY){ LLOGE("尚无已注册的网络适配器"); return 0; } luat_mqtt_ctrl_t *mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_newuserdata(L, sizeof(luat_mqtt_ctrl_t)); if (!mqtt_ctrl){ LLOGE("out of memory when malloc mqtt_ctrl"); return 0; } mqtt_ctrl->app_cb = NULL; ret = luat_mqtt_init(mqtt_ctrl, adapter_index); if (ret) { LLOGE("mqtt init FAID ret %d", ret); return 0; } luat_mqtt_connopts_t opts = {0}; // 连接参数相关 // const char *ip; size_t ip_len = 0; network_set_ip_invaild(&mqtt_ctrl->ip_addr); if (lua_isinteger(L, 2)){ network_set_ip_ipv4(&mqtt_ctrl->ip_addr, lua_tointeger(L, 2)); // ip = NULL; ip_len = 0; }else{ ip_len = 0; opts.host = luaL_checklstring(L, 2, &ip_len); // TODO 判断 host的长度,超过191就不行了 } opts.port = luaL_checkinteger(L, 3); // 加密相关 if (lua_isboolean(L, 4)){ opts.is_tls = lua_toboolean(L, 4); } if (lua_istable(L, 4)){ opts.is_tls = 1; opts.verify = 2; lua_pushstring(L, "verify"); if (LUA_TNUMBER == lua_gettable(L, 4)) { opts.verify = luaL_checknumber(L, -1); } lua_pop(L, 1); lua_pushstring(L, "server_cert"); if (LUA_TSTRING == lua_gettable(L, 4)) { opts.server_cert = luaL_checklstring(L, -1, &opts.server_cert_len); } lua_pop(L, 1); lua_pushstring(L, "client_cert"); if (LUA_TSTRING == lua_gettable(L, 4)) { opts.client_cert = luaL_checklstring(L, -1, &opts.client_cert_len); } lua_pop(L, 1); lua_pushstring(L, "client_key"); if (LUA_TSTRING == lua_gettable(L, 4)) { opts.client_key = luaL_checklstring(L, -1, &opts.client_key_len); } lua_pop(L, 1); lua_pushstring(L, "client_password"); if (LUA_TSTRING == lua_gettable(L, 4)) { opts.client_password = luaL_checklstring(L, -1, &opts.client_password_len); } lua_pop(L, 1); } if (lua_isboolean(L, 5)){ opts.is_ipv6 = lua_toboolean(L, 5); }else if(lua_istable(L, 5)){ lua_pushstring(L, "ipv6"); if (LUA_TBOOLEAN == lua_gettable(L, 5)) { opts.is_ipv6 = lua_toboolean(L, -1); } lua_pop(L, 1); lua_pushstring(L, "rxSize"); if (LUA_TNUMBER == lua_gettable(L, 5)) { uint32_t len = luaL_checknumber(L, -1); if(len > 0) luat_mqtt_set_rxbuff_size(mqtt_ctrl, len); } lua_pop(L, 1); lua_pushstring(L, "conn_timeout"); if (LUA_TNUMBER == lua_gettable(L, 5)) { opts.conn_timeout = luaL_checknumber(L, -1); if (opts.conn_timeout < 5) opts.conn_timeout = 5; else if (opts.conn_timeout > 120) opts.conn_timeout = 120; } lua_pop(L, 1); } ret = luat_mqtt_set_connopts(mqtt_ctrl, &opts); if (ret){ LLOGE("设置mqtt参数失败"); luat_mqtt_release_socket(mqtt_ctrl); return 0; } luaL_setmetatable(L, LUAT_MQTT_CTRL_TYPE); lua_pushvalue(L, -1); mqtt_ctrl->mqtt_ref = luaL_ref(L, LUA_REGISTRYINDEX); return 1; } /* mqtt三元组配置及cleanSession @api mqttc:auth(client_id,username,password,cleanSession) @string 设备识别id,对于同一个mqtt服务器来说, 通常要求唯一,相同client_id会互相踢下线 @string 账号 可选 @string 密码 可选 @bool 清除session,默认true,可选 @return bool 成功返回true,否则返回nil. 注意, 返回值是2025.3.19新增的 @usage -- 无账号密码登录,仅clientId mqttc:auth("123456789") -- 带账号密码登录 mqttc:auth("123456789","username","password") -- 额外配置cleanSession,不清除 mqttc:auth("123456789","username","password", false) -- 无clientId模式, 服务器随机生成id, cleanSession不可配置 mqttc:auth() */ static int l_mqtt_auth(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); const char *client_id = luaL_optstring(L, 2, ""); const char *username = luaL_optstring(L, 3, ""); const char *password = luaL_optstring(L, 4, ""); int cleanSession = 1; if (lua_isboolean(L, 5) && !lua_toboolean(L, 5)) { cleanSession = 0; } if (client_id != NULL && strlen(client_id) > MQTT_CONF_CLIENT_ID_LENGTH) { LLOGE("mqtt client_id 太长或者无效!!!!"); return 0; } if (username != NULL && strlen(username) > MQTT_CONF_USERNAME_LENGTH) { LLOGE("mqtt username 太长或者无效!!!!"); return 0; } if (password != NULL && strlen(password) > MQTT_CONF_PASSWORD_LENGTH) { LLOGE("mqtt password 太长或者无效!!!!"); return 0; } mqtt_init(&(mqtt_ctrl->broker), client_id); mqtt_init_auth(&(mqtt_ctrl->broker), username, password); mqtt_ctrl->broker.clean_session = cleanSession; lua_pushboolean(L, 1); return 1; } /* mqtt心跳设置 @api mqttc:keepalive(time) @int 可选 单位s 默认240s. 最先15,最高600 @return nil 无返回值 @usage mqttc:keepalive(30) */ static int l_mqtt_keepalive(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); int timeout = luaL_optinteger(L, 2, 240); if (timeout < 15) timeout = 15; if (timeout > 600) timeout = 600; mqtt_ctrl->keepalive = timeout; return 0; } /* 注册mqtt回调 @api mqttc:on(cb) @function cb mqtt回调,参数包括mqtt_client, event, data, payload @return nil 无返回值 @usage mqttc:on(function(mqtt_client, event, data, payload, metas) -- 用户自定义代码 log.info("mqtt", "event", event, mqtt_client, data, payload) end) --[[ event可能出现的值有 conack -- 服务器鉴权完成, 表示mqtt连接已经建立, 可以订阅和发布数据了 suback -- 订阅完成,data为应答结果, true成功,payload为0~2数字表示qos,data为false则失败,payload为失败码,一般是0x80 unsuback -- 取消订阅完成 recv -- 接收到数据,由服务器下发, data为topic值(string), payload为业务数据(string), metas是元数据(table), 一般不处理. -- metas包含以下内容 -- message_id -- qos 取值范围0,1,2 -- retain 取值范围 0,1 -- dup 取值范围 0,1 sent -- 发送完成, qos0会马上通知, qos1/qos2会在服务器应答会回调, data为消息id disconnect -- 服务器断开连接,网络问题或服务器踢了客户端,例如clientId重复,超时未上报业务数据 pong -- 收到服务器心跳应答,没有附加数据 error -- 严重的异常,会导致断开连接, data(string)为具体异常,有以下几种 -- connect 服务器连接不上 -- tx 发送数据失败 -- conack 服务器鉴权失败,失败码在payload(int) -- other 其他异常 ]] */ static int l_mqtt_on(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); if (mqtt_ctrl->mqtt_cb != 0) { luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); mqtt_ctrl->mqtt_cb = 0; } if (lua_isfunction(L, 2)) { lua_pushvalue(L, 2); mqtt_ctrl->mqtt_cb = luaL_ref(L, LUA_REGISTRYINDEX); } return 0; } /* 连接服务器 @api mqttc:connect() @return boolean 发起成功返回true, 否则返回false @usage -- 开始建立连接 mqttc:connect() -- 本函数仅代表发起成功, 后续仍需根据ready函数判断mqtt是否连接正常 */ static int l_mqtt_connect(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); int ret = luat_mqtt_connect(mqtt_ctrl); if (ret) { LLOGE("socket connect ret=%d\n", ret); luat_mqtt_close_socket(mqtt_ctrl); lua_pushboolean(L, 0); return 1; } lua_pushboolean(L, 1); return 1; } /* 断开服务器连接(不会释放资源) @api mqttc:disconnect() @return boolean 发起成功返回true, 否则返回false @usage -- 断开连接 mqttc:disconnect() */ static int l_mqtt_disconnect(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); mqtt_disconnect(&(mqtt_ctrl->broker)); luat_mqtt_close_socket(mqtt_ctrl); lua_pushboolean(L, 1); return 1; } /* 自动重连 @api mqttc:autoreconn(reconnect, reconnect_time) @bool 是否自动重连 @int 自动重连周期 单位ms 默认3000ms @usage mqttc:autoreconn(true) */ static int l_mqtt_autoreconn(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); if (lua_isboolean(L, 2)){ mqtt_ctrl->reconnect = lua_toboolean(L, 2); } mqtt_ctrl->reconnect_time = luaL_optinteger(L, 3, 3000); if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time < 1000) mqtt_ctrl->reconnect_time = 1000; return 0; } /* 发布消息 @api mqttc:publish(topic, data, qos, retain) @string 主题,必填 @string 消息,必填,但长度可以是0 @int 消息级别 0/1 默认0 @int 是否存档, 0/1,默认0 @return int 消息id, 当qos为1或2时会有效值. 若底层返回有错误发生, 则会返回nil @usage mqttc:publish("/luatos/123456", "123") */ static int l_mqtt_publish(lua_State *L) { uint16_t message_id = 0; size_t payload_len = 0; luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); const char * topic = luaL_checkstring(L, 2); const char * payload = NULL; luat_zbuff_t *buff = NULL; if (lua_isstring(L, 3)){ payload = luaL_checklstring(L, 3, &payload_len); }else if (luaL_testudata(L, 3, LUAT_ZBUFF_TYPE)){ buff = ((luat_zbuff_t *)luaL_checkudata(L, 3, LUAT_ZBUFF_TYPE)); payload = (const char*)buff->addr; payload_len = buff->used; }else{ LLOGD("only support string or zbuff"); } // LLOGD("payload_len:%d",payload_len); uint8_t qos = luaL_optinteger(L, 4, 0); uint8_t retain = luaL_optinteger(L, 5, 0); int ret = mqtt_publish_with_qos(&(mqtt_ctrl->broker), topic, payload, payload_len, retain, qos, &message_id); if (ret != 1){ return 0; } if (qos == 0){ rtos_msg_t msg = {0}; msg.handler = luatos_mqtt_callback; msg.ptr = mqtt_ctrl; msg.arg1 = MQTT_MSG_PUBACK; msg.arg2 = message_id; luat_msgbus_put(&msg, 0); } lua_pushinteger(L, message_id); return 1; } /* mqtt客户端关闭(关闭后资源释放无法再使用) @api mqttc:close() @usage mqttc:close() */ static int l_mqtt_close(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); mqtt_disconnect(&(mqtt_ctrl->broker)); luat_mqtt_close_socket(mqtt_ctrl); if (mqtt_ctrl->mqtt_cb != 0) { luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb); mqtt_ctrl->mqtt_cb = 0; } luat_mqtt_release_socket(mqtt_ctrl); return 0; } /* mqtt客户端是否就绪 @api mqttc:ready() @return bool 客户端是否就绪 @usage local error = mqttc:ready() */ static int l_mqtt_ready(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); lua_pushboolean(L, mqtt_ctrl->mqtt_state == MQTT_STATE_READY ? 1 : 0); return 1; } /* mqtt客户端状态 @api mqttc:state() @return number 客户端状态 @usage local state = mqttc:state() -- 已知状态: -- 0: MQTT_STATE_DISCONNECT -- 1: MQTT_STATE_CONNECTING -- 2: MQTT_STATE_CONNECTED -- 3: MQTT_STATE_READY -- 4: MQTT_STATE_ERROR */ static int l_mqtt_state(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); lua_pushinteger(L, mqtt_ctrl->mqtt_state); return 1; } /* 设置遗嘱消息 @api mqttc:will(topic, payload, qos, retain) @string 遗嘱消息的topic @string 遗嘱消息的payload @string 遗嘱消息的qos, 默认0, 可以不填 @string 遗嘱消息的retain, 默认0, 可以不填 @return bool 成功返回true,否则返回false @usage -- 要在connect之前调用 mqttc:will("/xxx/xxx", "xxxxxx") */ static int l_mqtt_will(lua_State *L) { luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L); size_t payload_len = 0; const char* topic = luaL_checkstring(L, 2); const char* payload = luaL_checklstring(L, 3, &payload_len); int qos = luaL_optinteger(L, 4, 0); int retain = luaL_optinteger(L, 5, 0); lua_pushboolean(L, luat_mqtt_set_will(mqtt_ctrl, topic, payload, payload_len, qos, retain) == 0 ? 1 : 0); return 1; } static int _mqtt_struct_newindex(lua_State *L); void luat_mqtt_struct_init(lua_State *L) { luaL_newmetatable(L, LUAT_MQTT_CTRL_TYPE); lua_pushcfunction(L, _mqtt_struct_newindex); lua_setfield( L, -2, "__index" ); lua_pop(L, 1); } #include "rotable2.h" static const rotable_Reg_t reg_mqtt[] = { {"create", ROREG_FUNC(l_mqtt_create)}, {"auth", ROREG_FUNC(l_mqtt_auth)}, {"keepalive", ROREG_FUNC(l_mqtt_keepalive)}, {"on", ROREG_FUNC(l_mqtt_on)}, {"connect", ROREG_FUNC(l_mqtt_connect)}, {"autoreconn", ROREG_FUNC(l_mqtt_autoreconn)}, {"publish", ROREG_FUNC(l_mqtt_publish)}, {"subscribe", ROREG_FUNC(l_mqtt_subscribe)}, {"unsubscribe", ROREG_FUNC(l_mqtt_unsubscribe)}, {"disconnect", ROREG_FUNC(l_mqtt_disconnect)}, {"close", ROREG_FUNC(l_mqtt_close)}, {"ready", ROREG_FUNC(l_mqtt_ready)}, {"will", ROREG_FUNC(l_mqtt_will)}, {"debug", ROREG_FUNC(l_mqtt_set_debug)}, {"state", ROREG_FUNC(l_mqtt_state)}, //@const STATE_DISCONNECT number mqtt 断开 {"STATE_DISCONNECT",ROREG_INT(MQTT_STATE_DISCONNECT)}, //@const STATE_SCONNECT number mqtt socket连接中 {"STATE_SCONNECT", ROREG_INT(MQTT_STATE_SCONNECT)}, //@const STATE_MQTT number mqtt socket已连接 mqtt连接中 {"STATE_MQTT", ROREG_INT(MQTT_STATE_MQTT)}, //@const STATE_READY number mqtt mqtt已连接 {"STATE_READY", ROREG_INT(MQTT_STATE_READY)}, { NULL, ROREG_INT(0)} }; static int _mqtt_struct_newindex(lua_State *L) { const rotable_Reg_t* reg = reg_mqtt; const char* key = luaL_checkstring(L, 2); while (1) { if (reg->name == NULL) return 0; if (!strcmp(reg->name, key)) { lua_pushcfunction(L, reg->value.value.func); return 1; } reg ++; } //return 0; } const rotable_Reg_t reg_mqtt_emtry[] = { { NULL, ROREG_INT(0)} }; LUAMOD_API int luaopen_mqtt( lua_State *L ) { #ifdef LUAT_USE_NETWORK luat_newlib2(L, reg_mqtt); luat_mqtt_struct_init(L); return 1; #else luat_newlib2(L, reg_mqtt_emtry); return 1; LLOGE("mqtt require network enable!!"); #endif }