luat_lib_mqtt.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643
  1. /*
  2. @module mqtt
  3. @summary mqtt客户端
  4. @version 1.0
  5. @date 2022.08.25
  6. @demo socket
  7. @tag LUAT_USE_MQTT
  8. */
  9. #include "luat_base.h"
  10. #include "luat_network_adapter.h"
  11. #include "libemqtt.h"
  12. #include "luat_rtos.h"
  13. #include "luat_zbuff.h"
  14. #include "luat_malloc.h"
  15. #include "luat_mqtt.h"
  16. #define LUAT_LOG_TAG "mqtt"
  17. #include "luat_log.h"
  18. #define LUAT_MQTT_CTRL_TYPE "MQTTCTRL*"
  19. static luat_mqtt_ctrl_t * get_mqtt_ctrl(lua_State *L){
  20. if (luaL_testudata(L, 1, LUAT_MQTT_CTRL_TYPE)){
  21. return ((luat_mqtt_ctrl_t *)luaL_checkudata(L, 1, LUAT_MQTT_CTRL_TYPE));
  22. }else{
  23. return ((luat_mqtt_ctrl_t *)lua_touserdata(L, 1));
  24. }
  25. }
  26. static int32_t l_mqtt_callback(lua_State *L, void* ptr){
  27. (void)ptr;
  28. rtos_msg_t* msg = (rtos_msg_t*)lua_topointer(L, -1);
  29. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)msg->ptr;
  30. switch (msg->arg1) {
  31. case MQTT_MSG_TIMER_PING : {
  32. luat_mqtt_ping(mqtt_ctrl);
  33. break;
  34. }
  35. case MQTT_MSG_RECONNECT : {
  36. luat_mqtt_reconnect(mqtt_ctrl);
  37. break;
  38. }
  39. case MQTT_MSG_PUBLISH : {
  40. luat_mqtt_msg_t *mqtt_msg =(luat_mqtt_msg_t *)msg->arg2;
  41. if (mqtt_ctrl->mqtt_cb) {
  42. luat_mqtt_msg_t *mqtt_msg =(luat_mqtt_msg_t *)msg->arg2;
  43. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
  44. if (lua_isfunction(L, -1)) {
  45. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
  46. lua_pushstring(L, "recv");
  47. lua_pushlstring(L, (const char*)(mqtt_msg->data),mqtt_msg->topic_len);
  48. lua_pushlstring(L, (const char*)(mqtt_msg->data+mqtt_msg->topic_len),mqtt_msg->payload_len);
  49. // 增加一个返回值meta,类型为table,包含qos、retain和dup
  50. // mqttc:on(function(mqtt_client, event, data, payload, meta)
  51. // if event == "recv" then
  52. // log.info("mqtt recv", "topic", data)
  53. // log.info("mqtt recv", 'payload', payload)
  54. // log.info("mqtt recv", 'meta.qos', meta.qos)
  55. // log.info("mqtt recv", 'meta.retain', meta.retain)
  56. // log.info("mqtt recv", 'meta.dup', meta.dup)
  57. lua_createtable(L, 0, 3);
  58. lua_pushliteral(L, "qos");
  59. lua_pushinteger(L, MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer));
  60. lua_settable(L, -3);
  61. lua_pushliteral(L, "retain");
  62. lua_pushinteger(L, MQTTParseMessageRetain(mqtt_ctrl->mqtt_packet_buffer));
  63. lua_settable(L, -3);
  64. lua_pushliteral(L, "dup");
  65. lua_pushinteger(L, MQTTParseMessageDuplicate(mqtt_ctrl->mqtt_packet_buffer));
  66. lua_settable(L, -3);
  67. // lua_call(L, 4, 0);
  68. lua_call(L, 5, 0);
  69. }
  70. }
  71. luat_heap_free(mqtt_msg);
  72. break;
  73. }
  74. case MQTT_MSG_CONNACK: {
  75. if (mqtt_ctrl->mqtt_cb) {
  76. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
  77. if (lua_isfunction(L, -1)) {
  78. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
  79. lua_pushstring(L, "conack");
  80. lua_call(L, 2, 0);
  81. }
  82. lua_getglobal(L, "sys_pub");
  83. if (lua_isfunction(L, -1)) {
  84. lua_pushstring(L, "MQTT_CONNACK");
  85. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
  86. lua_call(L, 2, 0);
  87. }
  88. }
  89. break;
  90. }
  91. case MQTT_MSG_PUBACK:
  92. case MQTT_MSG_PUBCOMP: {
  93. if (mqtt_ctrl->mqtt_cb) {
  94. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
  95. if (lua_isfunction(L, -1)) {
  96. lua_geti(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
  97. lua_pushstring(L, "sent");
  98. lua_pushinteger(L, msg->arg2);
  99. lua_call(L, 3, 0);
  100. }
  101. }
  102. break;
  103. }
  104. case MQTT_MSG_RELEASE: {
  105. if (mqtt_ctrl->mqtt_ref) {
  106. luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_ref);
  107. mqtt_ctrl->mqtt_ref = 0;
  108. }
  109. break;
  110. }
  111. default : {
  112. LLOGD("l_mqtt_callback error arg1:%d",msg->arg1);
  113. break;
  114. }
  115. }
  116. // lua_pushinteger(L, 0);
  117. return 0;
  118. }
  119. int l_luat_mqtt_msg_cb(luat_mqtt_ctrl_t * ptr, int arg1, int arg2) {
  120. rtos_msg_t msg = {
  121. .handler = l_mqtt_callback,
  122. .ptr = ptr,
  123. .arg1 = arg1,
  124. .arg2 = arg2
  125. };
  126. luat_msgbus_put(&msg, 0);
  127. return 0;
  128. }
  129. /*
  130. 订阅主题
  131. @api mqttc:subscribe(topic, qos)
  132. @string/table 主题
  133. @int topic为string时生效 0/1/2 默认0
  134. @return int 消息id,当qos为1时有效, 若底层返回失败,会返回nil
  135. @usage
  136. -- 订阅单个topic, 且qos=0
  137. mqttc:subscribe("/luatos/123456", 0)
  138. -- 订阅单个topic, 且qos=1
  139. mqttc:subscribe("/luatos/12345678", 1)
  140. -- 订阅多个topic, 且使用不同的qos
  141. mqttc:subscribe({["/luatos/1234567"]=1,["/luatos/12345678"]=2})
  142. */
  143. static int l_mqtt_subscribe(lua_State *L) {
  144. size_t len = 0;
  145. int ret = 0;
  146. uint16_t msgid = 0;
  147. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_touserdata(L, 1);
  148. if (lua_isstring(L, 2)){
  149. const char * topic = luaL_checklstring(L, 2, &len);
  150. uint8_t qos = luaL_optinteger(L, 3, 0);
  151. ret = mqtt_subscribe(&(mqtt_ctrl->broker), topic, &msgid, qos);
  152. }else if(lua_istable(L, 2)){
  153. lua_pushnil(L);
  154. while (lua_next(L, 2) != 0) {
  155. ret &= mqtt_subscribe(&(mqtt_ctrl->broker), lua_tostring(L, -2), &msgid, luaL_optinteger(L, -1, 0)) == 1 ? 1 : 0;
  156. lua_pop(L, 1);
  157. }
  158. }
  159. if (ret == 1) {
  160. lua_pushinteger(L, msgid);
  161. return 1;
  162. }
  163. else {
  164. return 0;
  165. }
  166. }
  167. /*
  168. 取消订阅主题
  169. @api mqttc:unsubscribe(topic)
  170. @string/table 主题
  171. @usage
  172. mqttc:unsubscribe("/luatos/123456")
  173. mqttc:unsubscribe({"/luatos/1234567","/luatos/12345678"})
  174. */
  175. static int l_mqtt_unsubscribe(lua_State *L) {
  176. size_t len = 0;
  177. int ret = 0;
  178. uint16_t msgid = 0;
  179. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_touserdata(L, 1);
  180. if (lua_isstring(L, 2)){
  181. const char * topic = luaL_checklstring(L, 2, &len);
  182. ret = mqtt_unsubscribe(&(mqtt_ctrl->broker), topic, &msgid);
  183. }else if(lua_istable(L, 2)){
  184. size_t count = lua_rawlen(L, 2);
  185. for (size_t i = 1; i <= count; i++){
  186. lua_geti(L, 2, i);
  187. const char * topic = luaL_checklstring(L, -1, &len);
  188. ret &= mqtt_unsubscribe(&(mqtt_ctrl->broker), topic, &msgid) == 1 ? 1 : 0;
  189. lua_pop(L, 1);
  190. }
  191. }
  192. if (ret == 1) {
  193. lua_pushinteger(L, msgid);
  194. return 1;
  195. }
  196. return 0;
  197. }
  198. /*
  199. 配置是否打开debug信息
  200. @api mqttc:debug(onoff)
  201. @boolean 是否打开debug开关
  202. @return nil 无返回值
  203. @usage mqttc:debug(true)
  204. */
  205. static int l_mqtt_set_debug(lua_State *L){
  206. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  207. if (lua_isboolean(L, 2)){
  208. mqtt_ctrl->netc->is_debug = lua_toboolean(L, 2);
  209. }
  210. return 0;
  211. }
  212. /*
  213. mqtt客户端创建
  214. @api mqtt.create(adapter,host,port,isssl,ca_file)
  215. @int 适配器序号, 只能是socket.ETH0, socket.STA, socket.AP,如果不填,会选择平台自带的方式,然后是最后一个注册的适配器
  216. @string 服务器地址,可以是域名, 也可以是ip
  217. @int 端口号
  218. @bool/table 是否为ssl加密连接,默认不加密,true为无证书最简单的加密,table为有证书的加密 <br>server_cert 服务器ca证书数据 <br>client_cert 客户端ca证书数据 <br>client_key 客户端私钥加密数据 <br>client_password 客户端私钥口令数据
  219. @bool 是否为ipv6 默认不是
  220. @return userdata 若成功会返回mqtt客户端实例,否则返回nil
  221. @usage
  222. -- 普通TCP链接
  223. mqttc = mqtt.create(nil,"120.55.137.106", 1884)
  224. -- 加密TCP链接,不验证服务器证书
  225. mqttc = mqtt.create(nil,"120.55.137.106", 8883, true)
  226. -- 加密TCPTCP链接,单服务器证书验证
  227. mqttc = mqtt.create(nil,"120.55.137.106", 8883, {server_cert=io.readFile("/luadb/ca.crt"))
  228. -- 加密TCPTCP链接,双向证书验证
  229. mqttc = mqtt.create(nil,"120.55.137.106", 8883, {
  230. server_cert=io.readFile("/luadb/ca.crt"),
  231. client_cert=io.readFile("/luadb/client.pem"),
  232. client_key="123456",
  233. client_password="123456",
  234. )
  235. */
  236. static int l_mqtt_create(lua_State *L) {
  237. int ret = 0;
  238. int adapter_index = luaL_optinteger(L, 1, network_get_last_register_adapter());
  239. if (adapter_index < 0 || adapter_index >= NW_ADAPTER_QTY){
  240. return 0;
  241. }
  242. luat_mqtt_ctrl_t *mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_newuserdata(L, sizeof(luat_mqtt_ctrl_t));
  243. if (!mqtt_ctrl){
  244. LLOGE("out of memory when malloc mqtt_ctrl");
  245. return 0;
  246. }
  247. ret = luat_mqtt_init(mqtt_ctrl, adapter_index);
  248. if (ret) {
  249. LLOGE("mqtt init FAID ret %d", ret);
  250. return 0;
  251. }
  252. luat_mqtt_connopts_t opts = {0};
  253. // 连接参数相关
  254. // const char *ip;
  255. size_t ip_len = 0;
  256. #ifdef LUAT_USE_LWIP
  257. mqtt_ctrl->ip_addr.type = 0xff;
  258. #else
  259. mqtt_ctrl->ip_addr.is_ipv6 = 0xff;
  260. #endif
  261. if (lua_isinteger(L, 2)){
  262. #ifdef LUAT_USE_LWIP
  263. mqtt_ctrl->ip_addr.type = IPADDR_TYPE_V4;
  264. mqtt_ctrl->ip_addr.u_addr.ip4.addr = lua_tointeger(L, 2);
  265. #else
  266. mqtt_ctrl->ip_addr.is_ipv6 = 0;
  267. mqtt_ctrl->ip_addr.ipv4 = lua_tointeger(L, 2);
  268. #endif
  269. // ip = NULL;
  270. ip_len = 0;
  271. }else{
  272. ip_len = 0;
  273. opts.host = luaL_checklstring(L, 2, &ip_len);
  274. // TODO 判断 host的长度,超过191就不行了
  275. }
  276. opts.port = luaL_checkinteger(L, 3);
  277. // 加密相关
  278. if (lua_isboolean(L, 4)){
  279. opts.is_tls = lua_toboolean(L, 4);
  280. }
  281. if (lua_istable(L, 4)){
  282. opts.is_tls = 1;
  283. lua_pushstring(L, "server_cert");
  284. if (LUA_TSTRING == lua_gettable(L, 4)) {
  285. opts.server_cert = luaL_checklstring(L, -1, &opts.server_cert_len);
  286. }
  287. lua_pop(L, 1);
  288. lua_pushstring(L, "client_cert");
  289. if (LUA_TSTRING == lua_gettable(L, 4)) {
  290. opts.client_cert = luaL_checklstring(L, -1, &opts.client_cert_len);
  291. }
  292. lua_pop(L, 1);
  293. lua_pushstring(L, "client_key");
  294. if (LUA_TSTRING == lua_gettable(L, 4)) {
  295. opts.client_key = luaL_checklstring(L, -1, &opts.client_key_len);
  296. }
  297. lua_pop(L, 1);
  298. lua_pushstring(L, "client_password");
  299. if (LUA_TSTRING == lua_gettable(L, 4)) {
  300. opts.client_password = luaL_checklstring(L, -1, &opts.client_password_len);
  301. }
  302. lua_pop(L, 1);
  303. }
  304. if (lua_isboolean(L, 5)){
  305. opts.is_ipv6 = lua_toboolean(L, 5);
  306. }
  307. ret = luat_mqtt_set_connopts(mqtt_ctrl, &opts);
  308. // TODO 判断ret, 如果初始化失败, 应该终止
  309. luaL_setmetatable(L, LUAT_MQTT_CTRL_TYPE);
  310. lua_pushvalue(L, -1);
  311. mqtt_ctrl->mqtt_ref = luaL_ref(L, LUA_REGISTRYINDEX);
  312. return 1;
  313. }
  314. /*
  315. mqtt三元组配置及cleanSession
  316. @api mqttc:auth(client_id,username,password,cleanSession)
  317. @string 设备识别id,对于同一个mqtt服务器来说, 通常要求唯一,相同client_id会互相踢下线
  318. @string 账号 可选
  319. @string 密码 可选
  320. @bool 清除session,默认true,可选
  321. @return nil 无返回值
  322. @usage
  323. -- 无账号密码登录,仅clientId
  324. mqttc:auth("123456789")
  325. -- 带账号密码登录
  326. mqttc:auth("123456789","username","password")
  327. -- 额外配置cleanSession,不清除
  328. mqttc:auth("123456789","username","password", false)
  329. -- 无clientId模式, 服务器随机生成id, cleanSession不可配置
  330. mqttc:auth()
  331. */
  332. static int l_mqtt_auth(lua_State *L) {
  333. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  334. const char *client_id = luaL_optstring(L, 2, "");
  335. const char *username = luaL_optstring(L, 3, "");
  336. const char *password = luaL_optstring(L, 4, "");
  337. int cleanSession = 1;
  338. if (lua_isboolean(L, 5) && !lua_toboolean(L, 5)) {
  339. cleanSession = 0;
  340. }
  341. mqtt_init(&(mqtt_ctrl->broker), client_id);
  342. mqtt_init_auth(&(mqtt_ctrl->broker), username, password);
  343. mqtt_ctrl->broker.clean_session = cleanSession;
  344. return 0;
  345. }
  346. /*
  347. mqtt心跳设置
  348. @api mqttc:keepalive(time)
  349. @int 可选 单位s 默认240s. 最先15,最高600
  350. @return nil 无返回值
  351. @usage
  352. mqttc:keepalive(30)
  353. */
  354. static int l_mqtt_keepalive(lua_State *L) {
  355. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  356. int timeout = luaL_optinteger(L, 2, 240);
  357. if (timeout < 15)
  358. timeout = 15;
  359. if (timeout > 600)
  360. timeout = 600;
  361. mqtt_ctrl->keepalive = timeout;
  362. return 0;
  363. }
  364. /*
  365. 注册mqtt回调
  366. @api mqttc:on(cb)
  367. @function cb mqtt回调,参数包括mqtt_client, event, data, payload
  368. @return nil 无返回值
  369. @usage
  370. mqttc:on(function(mqtt_client, event, data, payload)
  371. -- 用户自定义代码
  372. log.info("mqtt", "event", event, mqtt_client, data, payload)
  373. end)
  374. */
  375. static int l_mqtt_on(lua_State *L) {
  376. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  377. if (mqtt_ctrl->mqtt_cb != 0) {
  378. luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
  379. mqtt_ctrl->mqtt_cb = 0;
  380. }
  381. if (lua_isfunction(L, 2)) {
  382. lua_pushvalue(L, 2);
  383. mqtt_ctrl->mqtt_cb = luaL_ref(L, LUA_REGISTRYINDEX);
  384. }
  385. return 0;
  386. }
  387. /*
  388. 连接服务器
  389. @api mqttc:connect()
  390. @return boolean 发起成功返回true, 否则返回false
  391. @usage
  392. -- 开始建立连接
  393. mqttc:connect()
  394. -- 本函数仅代表发起成功, 后续仍需根据ready函数判断mqtt是否连接正常
  395. */
  396. static int l_mqtt_connect(lua_State *L) {
  397. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  398. int ret = luat_mqtt_connect(mqtt_ctrl);
  399. if (ret) {
  400. LLOGE("socket connect ret=%d\n", ret);
  401. luat_mqtt_close_socket(mqtt_ctrl);
  402. lua_pushboolean(L, 0);
  403. return 1;
  404. }
  405. lua_pushboolean(L, 1);
  406. return 1;
  407. }
  408. /*
  409. 断开服务器连接(不会释放资源)
  410. @api mqttc:disconnect()
  411. @return boolean 发起成功返回true, 否则返回false
  412. @usage
  413. -- 断开连接
  414. mqttc:disconnect()
  415. */
  416. static int l_mqtt_disconnect(lua_State *L) {
  417. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  418. mqtt_disconnect(&(mqtt_ctrl->broker));
  419. if (mqtt_ctrl->netc){
  420. network_force_close_socket(mqtt_ctrl->netc);
  421. }
  422. mqtt_ctrl->mqtt_state = 0;
  423. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  424. luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
  425. mqtt_ctrl->buffer_offset = 0;
  426. lua_pushboolean(L, 1);
  427. return 1;
  428. }
  429. /*
  430. 自动重连
  431. @api mqttc:autoreconn(reconnect, reconnect_time)
  432. @bool 是否自动重连
  433. @int 自动重连周期 单位ms 默认3000ms
  434. @usage
  435. mqttc:autoreconn(true)
  436. */
  437. static int l_mqtt_autoreconn(lua_State *L) {
  438. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  439. if (lua_isboolean(L, 2)){
  440. mqtt_ctrl->reconnect = lua_toboolean(L, 2);
  441. }
  442. mqtt_ctrl->reconnect_time = luaL_optinteger(L, 3, 3000);
  443. if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time < 1000)
  444. mqtt_ctrl->reconnect_time = 1000;
  445. return 0;
  446. }
  447. /*
  448. 发布消息
  449. @api mqttc:publish(topic, data, qos, retain)
  450. @string 主题,必填
  451. @string 消息,必填,但长度可以是0
  452. @int 消息级别 0/1 默认0
  453. @int 是否存档, 0/1,默认0
  454. @return int 消息id, 当qos为1或2时会有效值. 若底层返回是否, 会返回nil
  455. @usage
  456. mqttc:publish("/luatos/123456", "123")
  457. */
  458. static int l_mqtt_publish(lua_State *L) {
  459. uint16_t message_id = 0;
  460. size_t payload_len = 0;
  461. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  462. const char * topic = luaL_checkstring(L, 2);
  463. const char * payload = NULL;
  464. luat_zbuff_t *buff = NULL;
  465. if (lua_isstring(L, 3)){
  466. payload = luaL_checklstring(L, 3, &payload_len);
  467. }else if (luaL_testudata(L, 3, LUAT_ZBUFF_TYPE)){
  468. buff = ((luat_zbuff_t *)luaL_checkudata(L, 3, LUAT_ZBUFF_TYPE));
  469. payload = (const char*)buff->addr;
  470. payload_len = buff->used;
  471. }else{
  472. LLOGD("only support string or zbuff");
  473. }
  474. // LLOGD("payload_len:%d",payload_len);
  475. uint8_t qos = luaL_optinteger(L, 4, 0);
  476. uint8_t retain = luaL_optinteger(L, 5, 0);
  477. int ret = mqtt_publish_with_qos(&(mqtt_ctrl->broker), topic, payload, payload_len, retain, qos, &message_id);
  478. if (ret != 1){
  479. return 0;
  480. }
  481. if (qos == 0){
  482. rtos_msg_t msg = {0};
  483. msg.handler = l_mqtt_callback;
  484. msg.ptr = mqtt_ctrl;
  485. msg.arg1 = MQTT_MSG_PUBACK;
  486. msg.arg2 = message_id;
  487. luat_msgbus_put(&msg, 0);
  488. }
  489. lua_pushinteger(L, message_id);
  490. return 1;
  491. }
  492. /*
  493. mqtt客户端关闭(关闭后资源释放无法再使用)
  494. @api mqttc:close()
  495. @usage
  496. mqttc:close()
  497. */
  498. static int l_mqtt_close(lua_State *L) {
  499. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  500. mqtt_disconnect(&(mqtt_ctrl->broker));
  501. luat_mqtt_close_socket(mqtt_ctrl);
  502. if (mqtt_ctrl->mqtt_cb != 0) {
  503. luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
  504. mqtt_ctrl->mqtt_cb = 0;
  505. }
  506. luat_mqtt_release_socket(mqtt_ctrl);
  507. return 0;
  508. }
  509. /*
  510. mqtt客户端是否就绪
  511. @api mqttc:ready()
  512. @return bool 客户端是否就绪
  513. @usage
  514. local error = mqttc:ready()
  515. */
  516. static int l_mqtt_ready(lua_State *L) {
  517. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  518. lua_pushboolean(L, mqtt_ctrl->mqtt_state > 0 ? 1 : 0);
  519. return 1;
  520. }
  521. /*
  522. 设置遗嘱消息
  523. @api mqttc:will(topic, payload, qos, retain)
  524. @string 遗嘱消息的topic
  525. @string 遗嘱消息的payload
  526. @string 遗嘱消息的qos, 默认0, 可以不填
  527. @string 遗嘱消息的retain, 默认0, 可以不填
  528. @return bool 成功返回true,否则返回false
  529. @usage
  530. -- 要在connect之前调用
  531. mqttc:will("/xxx/xxx", "xxxxxx")
  532. */
  533. static int l_mqtt_will(lua_State *L) {
  534. luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
  535. size_t payload_len = 0;
  536. const char* topic = luaL_checkstring(L, 2);
  537. const char* payload = luaL_checklstring(L, 3, &payload_len);
  538. int qos = luaL_optinteger(L, 4, 0);
  539. int retain = luaL_optinteger(L, 5, 0);
  540. lua_pushboolean(L, luat_mqtt_set_will(mqtt_ctrl, topic, payload, payload_len, qos, retain) == 0 ? 1 : 0);
  541. return 1;
  542. }
  543. static int _mqtt_struct_newindex(lua_State *L);
  544. void luat_mqtt_struct_init(lua_State *L) {
  545. luaL_newmetatable(L, LUAT_MQTT_CTRL_TYPE);
  546. lua_pushcfunction(L, _mqtt_struct_newindex);
  547. lua_setfield( L, -2, "__index" );
  548. lua_pop(L, 1);
  549. }
  550. #include "rotable2.h"
  551. static const rotable_Reg_t reg_mqtt[] =
  552. {
  553. {"create", ROREG_FUNC(l_mqtt_create)},
  554. {"auth", ROREG_FUNC(l_mqtt_auth)},
  555. {"keepalive", ROREG_FUNC(l_mqtt_keepalive)},
  556. {"on", ROREG_FUNC(l_mqtt_on)},
  557. {"connect", ROREG_FUNC(l_mqtt_connect)},
  558. {"autoreconn", ROREG_FUNC(l_mqtt_autoreconn)},
  559. {"publish", ROREG_FUNC(l_mqtt_publish)},
  560. {"subscribe", ROREG_FUNC(l_mqtt_subscribe)},
  561. {"unsubscribe", ROREG_FUNC(l_mqtt_unsubscribe)},
  562. {"disconnect", ROREG_FUNC(l_mqtt_disconnect)},
  563. {"close", ROREG_FUNC(l_mqtt_close)},
  564. {"ready", ROREG_FUNC(l_mqtt_ready)},
  565. {"will", ROREG_FUNC(l_mqtt_will)},
  566. { NULL, ROREG_INT(0)}
  567. };
  568. static int _mqtt_struct_newindex(lua_State *L) {
  569. const rotable_Reg_t* reg = reg_mqtt;
  570. const char* key = luaL_checkstring(L, 2);
  571. while (1) {
  572. if (reg->name == NULL)
  573. return 0;
  574. if (!strcmp(reg->name, key)) {
  575. lua_pushcfunction(L, reg->value.value.func);
  576. return 1;
  577. }
  578. reg ++;
  579. }
  580. //return 0;
  581. }
  582. static const rotable_Reg_t reg_mqtt_emtry[] =
  583. {
  584. { NULL, ROREG_INT(0)}
  585. };
  586. LUAMOD_API int luaopen_mqtt( lua_State *L ) {
  587. #ifdef LUAT_USE_NETWORK
  588. luat_newlib2(L, reg_mqtt);
  589. luat_mqtt_struct_init(L);
  590. return 1;
  591. #else
  592. luat_newlib2(L, reg_mqtt_emtry);
  593. return 1;
  594. LLOGE("mqtt require network enable!!");
  595. #endif
  596. }