initial commit

This commit is contained in:
2025-08-05 15:53:44 +08:00
commit 09dc02ae52
553 changed files with 137665 additions and 0 deletions

629
third_party/libhv/mqtt/mqtt_client.c vendored Executable file
View File

@@ -0,0 +1,629 @@
#include "mqtt_client.h"
#include "hbase.h"
#include "hlog.h"
#include "herr.h"
#include "hendian.h"
#include "hsocket.h"
static unsigned short mqtt_next_mid() {
static unsigned short s_mid = 0;
return ++s_mid;
}
static int mqtt_client_send(mqtt_client_t* cli, const void* buf, int len) {
// thread-safe
hmutex_lock(&cli->mutex_);
int nwrite = hio_write(cli->io, buf, len);
hmutex_unlock(&cli->mutex_);
return nwrite;
}
static int mqtt_send_head(hio_t* io, int type, int length) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
mqtt_head_t head;
memset(&head, 0, sizeof(head));
head.type = type;
head.length = length;
unsigned char headbuf[8] = { 0 };
int headlen = mqtt_head_pack(&head, headbuf);
return mqtt_client_send(cli, headbuf, headlen);
}
static int mqtt_send_head_with_mid(hio_t* io, int type, unsigned short mid) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
mqtt_head_t head;
memset(&head, 0, sizeof(head));
head.type = type;
if (head.type == MQTT_TYPE_PUBREL) {
head.qos = 1;
}
head.length = 2;
unsigned char headbuf[8] = { 0 };
unsigned char* p = headbuf;
int headlen = mqtt_head_pack(&head, p);
p += headlen;
PUSH16(p, mid);
return mqtt_client_send(cli, headbuf, headlen + 2);
}
static void mqtt_send_ping(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
if (cli->ping_cnt++ == 3) {
hloge("mqtt no pong!");
hio_close(io);
return;
}
mqtt_send_head(io, MQTT_TYPE_PINGREQ, 0);
}
static void mqtt_send_pong(hio_t* io) {
mqtt_send_head(io, MQTT_TYPE_PINGRESP, 0);
}
static void mqtt_send_disconnect(hio_t* io) {
mqtt_send_head(io, MQTT_TYPE_DISCONNECT, 0);
}
/*
* MQTT_TYPE_CONNECT
* 2 + protocol_name + 1 protocol_version + 1 conn_flags + 2 keepalive + 2 + [client_id] +
* [2 + will_topic + 2 + will_payload] +
* [2 + username] + [2 + password]
*/
static int mqtt_client_login(mqtt_client_t* cli) {
int len = 2 + 1 + 1 + 2 + 2;
unsigned short cid_len = 0,
will_topic_len = 0,
will_payload_len = 0,
username_len = 0,
password_len = 0;
unsigned char conn_flags = 0;
// protocol_name_len
len += cli->protocol_version == MQTT_PROTOCOL_V31 ? 6 : 4;
if (*cli->client_id) {
cid_len = strlen(cli->client_id);
} else {
cid_len = 20;
hv_random_string(cli->client_id, cid_len);
hlogi("MQTT client_id: %.*s", (int)cid_len, cli->client_id);
}
len += cid_len;
if (cid_len == 0) cli->clean_session = 1;
if (cli->clean_session) {
conn_flags |= MQTT_CONN_CLEAN_SESSION;
}
if (cli->will && cli->will->topic && cli->will->payload) {
will_topic_len = cli->will->topic_len ? cli->will->topic_len : strlen(cli->will->topic);
will_payload_len = cli->will->payload_len ? cli->will->payload_len : strlen(cli->will->payload);
if (will_topic_len && will_payload_len) {
conn_flags |= MQTT_CONN_HAS_WILL;
conn_flags |= ((cli->will->qos & 3) << 3);
if (cli->will->retain) {
conn_flags |= MQTT_CONN_WILL_RETAIN;
}
len += 2 + will_topic_len;
len += 2 + will_payload_len;
}
}
if (*cli->username) {
username_len = strlen(cli->username);
if (username_len) {
conn_flags |= MQTT_CONN_HAS_USERNAME;
len += 2 + username_len;
}
}
if (*cli->password) {
password_len = strlen(cli->password);
if (password_len) {
conn_flags |= MQTT_CONN_HAS_PASSWORD;
len += 2 + password_len;
}
}
mqtt_head_t head;
memset(&head, 0, sizeof(head));
head.type = MQTT_TYPE_CONNECT;
head.length = len;
int buflen = mqtt_estimate_length(&head);
unsigned char* buf = NULL;
HV_STACK_ALLOC(buf, buflen);
unsigned char* p = buf;
int headlen = mqtt_head_pack(&head, p);
p += headlen;
// TODO: Not implement MQTT_PROTOCOL_V5
if (cli->protocol_version == MQTT_PROTOCOL_V31) {
PUSH16(p, 6);
PUSH_N(p, MQTT_PROTOCOL_NAME_v31, 6);
} else {
PUSH16(p, 4);
PUSH_N(p, MQTT_PROTOCOL_NAME, 4);
}
PUSH8(p, cli->protocol_version);
PUSH8(p, conn_flags);
PUSH16(p, cli->keepalive);
PUSH16(p, cid_len);
if (cid_len > 0) {
PUSH_N(p, cli->client_id, cid_len);
}
if (conn_flags & MQTT_CONN_HAS_WILL) {
PUSH16(p, will_topic_len);
PUSH_N(p, cli->will->topic, will_topic_len);
PUSH16(p, will_payload_len);
PUSH_N(p, cli->will->payload, will_payload_len);
}
if (conn_flags & MQTT_CONN_HAS_USERNAME) {
PUSH16(p, username_len);
PUSH_N(p, cli->username, username_len);
}
if (conn_flags & MQTT_CONN_HAS_PASSWORD) {
PUSH16(p, password_len);
PUSH_N(p, cli->password, password_len);
}
int nwrite = mqtt_client_send(cli, buf, p - buf);
HV_STACK_FREE(buf);
return nwrite < 0 ? nwrite : 0;
}
static void connect_timeout_cb(htimer_t* timer) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
if (cli == NULL) return;
cli->timer = NULL;
cli->error = ETIMEDOUT;
hio_t* io = cli->io;
if (io == NULL) return;
char localaddrstr[SOCKADDR_STRLEN] = {0};
char peeraddrstr[SOCKADDR_STRLEN] = {0};
hlogw("connect timeout [%s] <=> [%s]",
SOCKADDR_STR(hio_localaddr(io), localaddrstr),
SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
hio_close(io);
}
static void reconnect_timer_cb(htimer_t* timer) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(timer);
if (cli == NULL) return;
cli->timer = NULL;
mqtt_client_reconnect(cli);
}
static void on_close(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
cli->connected = 0;
if (cli->cb) {
cli->head.type = MQTT_TYPE_DISCONNECT;
cli->cb(cli, cli->head.type);
}
// reconnect
if (cli->reconn_setting && reconn_setting_can_retry(cli->reconn_setting)) {
uint32_t delay = reconn_setting_calc_delay(cli->reconn_setting);
cli->timer = htimer_add(cli->loop, reconnect_timer_cb, delay, 1);
hevent_set_userdata(cli->timer, cli);
}
}
static void on_packet(hio_t* io, void* buf, int len) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
unsigned char* p = (unsigned char*)buf;
unsigned char* end = p + len;
memset(&cli->head, 0, sizeof(mqtt_head_t));
int headlen = mqtt_head_unpack(&cli->head, p, len);
if (headlen <= 0) return;
p += headlen;
switch (cli->head.type) {
// case MQTT_TYPE_CONNECT:
case MQTT_TYPE_CONNACK:
{
if (cli->head.length < 2) {
hloge("MQTT CONNACK malformed!");
hio_close(io);
return;
}
unsigned char conn_flags = 0, rc = 0;
POP8(p, conn_flags);
POP8(p, rc);
if (rc != MQTT_CONNACK_ACCEPTED) {
cli->error = rc;
hloge("MQTT CONNACK error=%d", cli->error);
hio_close(io);
return;
}
cli->connected = 1;
if (cli->timer) {
htimer_del(cli->timer);
cli->timer = NULL;
}
if (cli->keepalive) {
cli->ping_cnt = 0;
hio_set_heartbeat(io, cli->keepalive * 1000, mqtt_send_ping);
}
}
break;
case MQTT_TYPE_PUBLISH:
{
if (cli->head.length < 2) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
memset(&cli->message, 0, sizeof(mqtt_message_t));
POP16(p, cli->message.topic_len);
if (end - p < cli->message.topic_len) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
// NOTE: Not deep copy
cli->message.topic = (char*)p;
p += cli->message.topic_len;
if (cli->head.qos > 0) {
if (end - p < 2) {
hloge("MQTT PUBLISH malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
cli->message.payload_len = end - p;
if (cli->message.payload_len > 0) {
// NOTE: Not deep copy
cli->message.payload = (char*)p;
}
cli->message.qos = cli->head.qos;
if (cli->message.qos == 0) {
// Do nothing
} else if (cli->message.qos == 1) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBACK, cli->mid);
} else if (cli->message.qos == 2) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREC, cli->mid);
}
}
break;
case MQTT_TYPE_PUBACK:
case MQTT_TYPE_PUBREC:
case MQTT_TYPE_PUBREL:
case MQTT_TYPE_PUBCOMP:
{
if (cli->head.length < 2) {
hloge("MQTT PUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
if (cli->head.type == MQTT_TYPE_PUBREC) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBREL, cli->mid);
} else if (cli->head.type == MQTT_TYPE_PUBREL) {
mqtt_send_head_with_mid(io, MQTT_TYPE_PUBCOMP, cli->mid);
}
}
break;
// case MQTT_TYPE_SUBSCRIBE:
// break;
case MQTT_TYPE_SUBACK:
{
if (cli->head.length < 2) {
hloge("MQTT SUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
break;
// case MQTT_TYPE_UNSUBSCRIBE:
// break;
case MQTT_TYPE_UNSUBACK:
{
if (cli->head.length < 2) {
hloge("MQTT UNSUBACK malformed!");
hio_close(io);
return;
}
POP16(p, cli->mid);
}
break;
case MQTT_TYPE_PINGREQ:
// printf("recv ping\n");
// printf("send pong\n");
mqtt_send_pong(io);
return;
case MQTT_TYPE_PINGRESP:
// printf("recv pong\n");
cli->ping_cnt = 0;
return;
case MQTT_TYPE_DISCONNECT:
hio_close(io);
return;
default:
hloge("MQTT client received wrong type=%d", (int)cli->head.type);
hio_close(io);
return;
}
if (cli->cb) {
cli->cb(cli, cli->head.type);
}
}
static void on_connect(hio_t* io) {
mqtt_client_t* cli = (mqtt_client_t*)hevent_userdata(io);
if (cli->cb) {
cli->head.type = MQTT_TYPE_CONNECT;
cli->cb(cli, cli->head.type);
}
if (cli->reconn_setting) {
reconn_setting_reset(cli->reconn_setting);
}
static unpack_setting_t mqtt_unpack_setting;
mqtt_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
mqtt_unpack_setting.package_max_length = DEFAULT_MQTT_PACKAGE_MAX_LENGTH;
mqtt_unpack_setting.body_offset = 2;
mqtt_unpack_setting.length_field_offset = 1;
mqtt_unpack_setting.length_field_bytes = 1;
mqtt_unpack_setting.length_field_coding = ENCODE_BY_VARINT;
hio_set_unpack(io, &mqtt_unpack_setting);
// start recv packet
hio_setcb_read(io, on_packet);
hio_read(io);
mqtt_client_login(cli);
}
mqtt_client_t* mqtt_client_new(hloop_t* loop) {
if (loop == NULL) {
loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
if (loop == NULL) return NULL;
}
mqtt_client_t* cli = NULL;
HV_ALLOC_SIZEOF(cli);
if (cli == NULL) return NULL;
cli->loop = loop;
cli->protocol_version = MQTT_PROTOCOL_V311;
cli->keepalive = DEFAULT_MQTT_KEEPALIVE;
hmutex_init(&cli->mutex_);
return cli;
}
void mqtt_client_free(mqtt_client_t* cli) {
if (!cli) return;
hmutex_destroy(&cli->mutex_);
if (cli->ssl_ctx && cli->alloced_ssl_ctx) {
hssl_ctx_free(cli->ssl_ctx);
cli->ssl_ctx = NULL;
}
HV_FREE(cli->reconn_setting);
HV_FREE(cli->will);
HV_FREE(cli);
}
void mqtt_client_run (mqtt_client_t* cli) {
if (!cli || !cli->loop) return;
hloop_run(cli->loop);
}
void mqtt_client_stop(mqtt_client_t* cli) {
if (!cli || !cli->loop) return;
hloop_stop(cli->loop);
}
void mqtt_client_set_id(mqtt_client_t* cli, const char* id) {
if (!cli || !id) return;
hv_strncpy(cli->client_id, id, sizeof(cli->client_id));
}
void mqtt_client_set_will(mqtt_client_t* cli, mqtt_message_t* will) {
if (!cli || !will) return;
if (cli->will == NULL) {
HV_ALLOC_SIZEOF(cli->will);
}
memcpy(cli->will, will, sizeof(mqtt_message_t));
}
void mqtt_client_set_auth(mqtt_client_t* cli, const char* username, const char* password) {
if (!cli) return;
if (username) {
hv_strncpy(cli->username, username, sizeof(cli->username));
}
if (password) {
hv_strncpy(cli->password, password, sizeof(cli->password));
}
}
void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb) {
if (!cli) return;
cli->cb = cb;
}
void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata) {
if (!cli) return;
cli->userdata = userdata;
}
void* mqtt_client_get_userdata(mqtt_client_t* cli) {
if (!cli) return NULL;
return cli->userdata;
}
int mqtt_client_get_last_error(mqtt_client_t* cli) {
if (!cli) return -1;
return cli->error;
}
int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx) {
cli->ssl_ctx = ssl_ctx;
return 0;
}
int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt) {
opt->endpoint = HSSL_CLIENT;
hssl_ctx_t ssl_ctx = hssl_ctx_new(opt);
if (ssl_ctx == NULL) return ERR_NEW_SSL_CTX;
cli->alloced_ssl_ctx = true;
return mqtt_client_set_ssl_ctx(cli, ssl_ctx);
}
int mqtt_client_set_reconnect(mqtt_client_t* cli, reconn_setting_t* reconn) {
if (reconn == NULL) {
HV_FREE(cli->reconn_setting);
return 0;
}
if (cli->reconn_setting == NULL) {
HV_ALLOC_SIZEOF(cli->reconn_setting);
}
*cli->reconn_setting = *reconn;
return 0;
}
int mqtt_client_reconnect(mqtt_client_t* cli) {
mqtt_client_connect(cli, cli->host, cli->port, cli->ssl);
return 0;
}
void mqtt_client_set_connect_timeout(mqtt_client_t* cli, int ms) {
cli->connect_timeout = ms;
}
void mqtt_client_set_host(mqtt_client_t* cli, const char* host, int port, int ssl) {
hv_strncpy(cli->host, host, sizeof(cli->host));
cli->port = port;
cli->ssl = ssl;
}
int mqtt_client_connect(mqtt_client_t* cli, const char* host, int port, int ssl) {
if (!cli) return -1;
hv_strncpy(cli->host, host, sizeof(cli->host));
cli->port = port;
cli->ssl = ssl;
hio_t* io = hio_create_socket(cli->loop, host, port, HIO_TYPE_TCP, HIO_CLIENT_SIDE);
if (io == NULL) return -1;
if (ssl) {
if (cli->ssl_ctx) {
hio_set_ssl_ctx(io, cli->ssl_ctx);
}
hio_enable_ssl(io);
}
cli->io = io;
hevent_set_userdata(io, cli);
hio_setcb_connect(io, on_connect);
hio_setcb_close(io, on_close);
if (cli->connect_timeout > 0) {
cli->timer = htimer_add(cli->loop, connect_timeout_cb, cli->connect_timeout, 1);
hevent_set_userdata(cli->timer, cli);
}
return hio_connect(io);
}
bool mqtt_client_is_connected(mqtt_client_t* cli) {
return cli && cli->connected;
}
int mqtt_client_disconnect(mqtt_client_t* cli) {
if (!cli || !cli->io) return -1;
// cancel reconnect first
mqtt_client_set_reconnect(cli, NULL);
mqtt_send_disconnect(cli->io);
return hio_close(cli->io);
}
int mqtt_client_publish(mqtt_client_t* cli, mqtt_message_t* msg) {
if (!cli || !cli->io || !msg) return -1;
if (!cli->connected) return -2;
int topic_len = msg->topic_len ? msg->topic_len : strlen(msg->topic);
int payload_len = msg->payload_len ? msg->payload_len : strlen(msg->payload);
int len = 2 + topic_len + payload_len;
if (msg->qos > 0) len += 2; // mid
unsigned short mid = 0;
mqtt_head_t head;
memset(&head, 0, sizeof(head));
head.type = MQTT_TYPE_PUBLISH;
head.qos = msg->qos & 3;
head.retain = msg->retain;
head.length = len;
int buflen = mqtt_estimate_length(&head);
// NOTE: send payload alone
buflen -= payload_len;
unsigned char* buf = NULL;
HV_STACK_ALLOC(buf, buflen);
unsigned char* p = buf;
int headlen = mqtt_head_pack(&head, p);
p += headlen;
PUSH16(p, topic_len);
PUSH_N(p, msg->topic, topic_len);
if (msg->qos) {
mid = mqtt_next_mid();
PUSH16(p, mid);
}
hmutex_lock(&cli->mutex_);
// send head + topic + mid
int nwrite = hio_write(cli->io, buf, p - buf);
HV_STACK_FREE(buf);
if (nwrite < 0) {
goto unlock;
}
// send payload
nwrite = hio_write(cli->io, msg->payload, payload_len);
unlock:
hmutex_unlock(&cli->mutex_);
return nwrite < 0 ? nwrite : mid;
}
int mqtt_client_subscribe(mqtt_client_t* cli, const char* topic, int qos) {
if (!cli || !cli->io || !topic) return -1;
if (!cli->connected) return -2;
int topic_len = strlen(topic);
int len = 2 + 2 + topic_len + 1;
mqtt_head_t head;
memset(&head, 0, sizeof(head));
head.type = MQTT_TYPE_SUBSCRIBE;
head.qos = 1;
head.length = len;
int buflen = mqtt_estimate_length(&head);
unsigned char* buf = NULL;
HV_STACK_ALLOC(buf, buflen);
unsigned char* p = buf;
int headlen = mqtt_head_pack(&head, p);
p += headlen;
unsigned short mid = mqtt_next_mid();
PUSH16(p, mid);
PUSH16(p, topic_len);
PUSH_N(p, topic, topic_len);
PUSH8(p, qos & 3);
// send head + mid + topic + qos
int nwrite = mqtt_client_send(cli, buf, p - buf);
HV_STACK_FREE(buf);
return nwrite < 0 ? nwrite : mid;
}
int mqtt_client_unsubscribe(mqtt_client_t* cli, const char* topic) {
if (!cli || !cli->io || !topic) return -1;
if (!cli->connected) return -2;
int topic_len = strlen(topic);
int len = 2 + 2 + topic_len;
mqtt_head_t head;
memset(&head, 0, sizeof(head));
head.type = MQTT_TYPE_UNSUBSCRIBE;
head.qos = 1;
head.length = len;
int buflen = mqtt_estimate_length(&head);
unsigned char* buf = NULL;
HV_STACK_ALLOC(buf, buflen);
unsigned char* p = buf;
int headlen = mqtt_head_pack(&head, p);
p += headlen;
unsigned short mid = mqtt_next_mid();
PUSH16(p, mid);
PUSH16(p, topic_len);
PUSH_N(p, topic, topic_len);
// send head + mid + topic
int nwrite = mqtt_client_send(cli, buf, p - buf);
HV_STACK_FREE(buf);
return nwrite < 0 ? nwrite : mid;
}

340
third_party/libhv/mqtt/mqtt_client.h vendored Executable file
View File

@@ -0,0 +1,340 @@
#ifndef HV_MQTT_CLIENT_H_
#define HV_MQTT_CLIENT_H_
#include "mqtt_protocol.h"
#include "hloop.h"
#include "hssl.h"
#include "hmutex.h"
#define DEFAULT_MQTT_KEEPALIVE 60 // s
typedef struct mqtt_client_s mqtt_client_t;
// @type mqtt_type_e
// @example examples/mqtt
typedef void (*mqtt_client_cb)(mqtt_client_t* cli, int type);
struct mqtt_client_s {
// connect: host:port
char host[256];
int port;
int connect_timeout; // ms
// reconnect
reconn_setting_t* reconn_setting;
// login: flags + keepalive + client_id + will + username + password
// flags
unsigned char protocol_version; // Default MQTT_PROTOCOL_V311
unsigned char clean_session: 1;
unsigned char ssl: 1; // Read Only
unsigned char alloced_ssl_ctx: 1; // intern
unsigned char connected : 1;
unsigned short keepalive;
int ping_cnt;
char client_id[64];
// will
mqtt_message_t* will;
// auth
char username[64];
char password[64];
// message
mqtt_head_t head;
int error; // for MQTT_TYPE_CONNACK
int mid; // for MQTT_TYPE_SUBACK, MQTT_TYPE_PUBACK
mqtt_message_t message; // for MQTT_TYPE_PUBLISH
// callback
mqtt_client_cb cb;
// userdata
void* userdata;
// privdata
hloop_t* loop;
hio_t* io;
htimer_t* timer;
// SSL/TLS
hssl_ctx_t ssl_ctx;
// thread-safe
hmutex_t mutex_;
};
BEGIN_EXTERN_C
// hloop_new -> malloc(mqtt_client_t)
HV_EXPORT mqtt_client_t* mqtt_client_new(hloop_t* loop DEFAULT(NULL));
// @see hloop_run
HV_EXPORT void mqtt_client_run (mqtt_client_t* cli);
// @see hloop_stop
HV_EXPORT void mqtt_client_stop(mqtt_client_t* cli);
// hloop_free -> free(mqtt_client_t)
HV_EXPORT void mqtt_client_free(mqtt_client_t* cli);
// id
HV_EXPORT void mqtt_client_set_id(mqtt_client_t* cli, const char* id);
// will
HV_EXPORT void mqtt_client_set_will(mqtt_client_t* cli,
mqtt_message_t* will);
// auth
HV_EXPORT void mqtt_client_set_auth(mqtt_client_t* cli,
const char* username, const char* password);
// callback
HV_EXPORT void mqtt_client_set_callback(mqtt_client_t* cli, mqtt_client_cb cb);
// userdata
HV_EXPORT void mqtt_client_set_userdata(mqtt_client_t* cli, void* userdata);
HV_EXPORT void* mqtt_client_get_userdata(mqtt_client_t* cli);
// error
HV_EXPORT int mqtt_client_get_last_error(mqtt_client_t* cli);
// SSL/TLS
HV_EXPORT int mqtt_client_set_ssl_ctx(mqtt_client_t* cli, hssl_ctx_t ssl_ctx);
// hssl_ctx_new(opt) -> mqtt_client_set_ssl_ctx
HV_EXPORT int mqtt_client_new_ssl_ctx(mqtt_client_t* cli, hssl_ctx_opt_t* opt);
// reconnect
HV_EXPORT int mqtt_client_set_reconnect(mqtt_client_t* cli,
reconn_setting_t* reconn);
HV_EXPORT int mqtt_client_reconnect(mqtt_client_t* cli);
// connect
// hio_create_socket -> hio_connect ->
// on_connect -> mqtt_client_login ->
// on_connack
HV_EXPORT void mqtt_client_set_connect_timeout(mqtt_client_t* cli, int ms);
HV_EXPORT void mqtt_client_set_host(mqtt_client_t* cli, const char* host, int port, int ssl);
HV_EXPORT int mqtt_client_connect(mqtt_client_t* cli,
const char* host,
int port DEFAULT(DEFAULT_MQTT_PORT),
int ssl DEFAULT(0));
HV_EXPORT bool mqtt_client_is_connected(mqtt_client_t* cli);
// disconnect
// @see hio_close
HV_EXPORT int mqtt_client_disconnect(mqtt_client_t* cli);
// publish
HV_EXPORT int mqtt_client_publish(mqtt_client_t* cli,
mqtt_message_t* msg);
// subscribe
HV_EXPORT int mqtt_client_subscribe(mqtt_client_t* cli,
const char* topic, int qos DEFAULT(0));
// unsubscribe
HV_EXPORT int mqtt_client_unsubscribe(mqtt_client_t* cli,
const char* topic);
END_EXTERN_C
#ifdef __cplusplus
#include <functional>
#include <map>
#include <mutex>
#include <string>
namespace hv {
// @usage examples/mqtt/mqtt_client_test.cpp
class MqttClient {
public:
mqtt_client_t* client;
// callbacks
typedef std::function<void(MqttClient*)> MqttCallback;
typedef std::function<void(MqttClient*, mqtt_message_t*)> MqttMessageCallback;
MqttCallback onConnect;
MqttCallback onClose;
MqttMessageCallback onMessage;
MqttClient(hloop_t* loop = NULL) {
client = mqtt_client_new(loop);
}
~MqttClient() {
if (client) {
mqtt_client_free(client);
client = NULL;
}
}
void run() {
mqtt_client_set_callback(client, on_mqtt);
mqtt_client_set_userdata(client, this);
mqtt_client_run(client);
}
void stop() {
mqtt_client_stop(client);
}
void setID(const char* id) {
mqtt_client_set_id(client, id);
}
void setWill(mqtt_message_t* will) {
mqtt_client_set_will(client, will);
}
void setAuth(const char* username, const char* password) {
mqtt_client_set_auth(client, username, password);
}
void setPingInterval(int sec) {
client->keepalive = sec;
}
int lastError() {
return mqtt_client_get_last_error(client);
}
// SSL/TLS
int setSslCtx(hssl_ctx_t ssl_ctx) {
return mqtt_client_set_ssl_ctx(client, ssl_ctx);
}
int newSslCtx(hssl_ctx_opt_t* opt) {
return mqtt_client_new_ssl_ctx(client, opt);
}
void setReconnect(reconn_setting_t* reconn) {
mqtt_client_set_reconnect(client, reconn);
}
void setConnectTimeout(int ms) {
mqtt_client_set_connect_timeout(client, ms);
}
void setHost(const char* host, int port = DEFAULT_MQTT_PORT, int ssl = 0) {
mqtt_client_set_host(client, host, port, ssl);
}
int connect(const char* host, int port = DEFAULT_MQTT_PORT, int ssl = 0) {
return mqtt_client_connect(client, host, port, ssl);
}
int reconnect() {
return mqtt_client_reconnect(client);
}
int disconnect() {
return mqtt_client_disconnect(client);
}
bool isConnected() {
return mqtt_client_is_connected(client);
}
int publish(mqtt_message_t* msg, MqttCallback ack_cb = NULL) {
int mid = mqtt_client_publish(client, msg);
if (msg->qos > 0 && mid >= 0 && ack_cb) {
setAckCallback(mid, ack_cb);
}
return mid;
}
int publish(const std::string& topic, const std::string& payload, int qos = 0, int retain = 0, MqttCallback ack_cb = NULL) {
mqtt_message_t msg;
memset(&msg, 0, sizeof(msg));
msg.topic_len = topic.size();
msg.topic = topic.c_str();
msg.payload_len = payload.size();
msg.payload = payload.c_str();
msg.qos = qos;
msg.retain = retain;
return publish(&msg, ack_cb);
}
int subscribe(const char* topic, int qos = 0, MqttCallback ack_cb = NULL) {
int mid = mqtt_client_subscribe(client, topic, qos);
if (qos > 0 && mid >= 0 && ack_cb) {
setAckCallback(mid, ack_cb);
}
return mid;
}
int unsubscribe(const char* topic, MqttCallback ack_cb = NULL) {
int mid = mqtt_client_unsubscribe(client, topic);
if (mid >= 0 && ack_cb) {
setAckCallback(mid, ack_cb);
}
return mid;
}
protected:
void setAckCallback(int mid, MqttCallback cb) {
ack_cbs_mutex.lock();
ack_cbs[mid] = std::move(cb);
ack_cbs_mutex.unlock();
}
void invokeAckCallback(int mid) {
MqttCallback ack_cb = NULL;
ack_cbs_mutex.lock();
auto iter = ack_cbs.find(mid);
if (iter != ack_cbs.end()) {
ack_cb = std::move(iter->second);
ack_cbs.erase(iter);
}
ack_cbs_mutex.unlock();
if (ack_cb) ack_cb(this);
}
static void on_mqtt(mqtt_client_t* cli, int type) {
MqttClient* client = (MqttClient*)mqtt_client_get_userdata(cli);
// printf("on_mqtt type=%d\n", type);
switch(type) {
case MQTT_TYPE_CONNECT:
// printf("mqtt connected!\n");
break;
case MQTT_TYPE_DISCONNECT:
// printf("mqtt disconnected!\n");
if (client->onClose) {
client->onClose(client);
}
break;
case MQTT_TYPE_CONNACK:
// printf("mqtt connack!\n");
if (client->onConnect) {
client->onConnect(client);
}
break;
case MQTT_TYPE_PUBLISH:
if (client->onMessage) {
client->onMessage(client, &cli->message);
}
break;
case MQTT_TYPE_PUBACK: /* qos = 1 */
// printf("mqtt puback mid=%d\n", cli->mid);
client->invokeAckCallback(cli->mid);
break;
case MQTT_TYPE_PUBREC: /* qos = 2 */
// printf("mqtt pubrec mid=%d\n", cli->mid);
// wait MQTT_TYPE_PUBCOMP
break;
case MQTT_TYPE_PUBCOMP: /* qos = 2 */
// printf("mqtt pubcomp mid=%d\n", cli->mid);
client->invokeAckCallback(cli->mid);
break;
case MQTT_TYPE_SUBACK:
// printf("mqtt suback mid=%d\n", cli->mid);
client->invokeAckCallback(cli->mid);
break;
case MQTT_TYPE_UNSUBACK:
// printf("mqtt unsuback mid=%d\n", cli->mid);
client->invokeAckCallback(cli->mid);
break;
default:
break;
}
}
private:
// mid => ack callback
std::map<int, MqttCallback> ack_cbs;
std::mutex ack_cbs_mutex;
};
}
#endif
#endif // HV_MQTT_CLIENT_H_

22
third_party/libhv/mqtt/mqtt_protocol.c vendored Executable file
View File

@@ -0,0 +1,22 @@
#include "mqtt_protocol.h"
#include "hmath.h"
int mqtt_head_pack(mqtt_head_t* head, unsigned char buf[]) {
buf[0] = (head->type << 4) |
(head->dup << 3) |
(head->qos << 1) |
(head->retain);
int bytes = varint_encode(head->length, buf + 1);
return 1 + bytes;
}
int mqtt_head_unpack(mqtt_head_t* head, const unsigned char* buf, int len) {
head->type = (buf[0] >> 4) & 0x0F;
head->dup = (buf[0] >> 3) & 0x01;
head->qos = (buf[0] >> 1) & 0x03;
head->retain = buf[0] & 0x01;
int bytes = len - 1;
head->length = varint_decode(buf + 1, &bytes);
if (bytes <= 0) return bytes;
return 1 + bytes;
}

82
third_party/libhv/mqtt/mqtt_protocol.h vendored Executable file
View File

@@ -0,0 +1,82 @@
#ifndef HV_MQTT_PROTOCOL_H_
#define HV_MQTT_PROTOCOL_H_
#include "hexport.h"
#define DEFAULT_MQTT_PORT 1883
#define MQTT_PROTOCOL_V31 3
#define MQTT_PROTOCOL_V311 4
#define MQTT_PROTOCOL_V5 5 // Not yet supproted
#define MQTT_PROTOCOL_NAME "MQTT"
#define MQTT_PROTOCOL_NAME_v31 "MQIsdp"
/*
* connect flags
* 0 1 2 3-4 5 6 7
* reserved clean_session has_will will_qos will_retain has_password has_username
*/
#define MQTT_CONN_CLEAN_SESSION 0x02
#define MQTT_CONN_HAS_WILL 0x04
#define MQTT_CONN_WILL_RETAIN 0x20
#define MQTT_CONN_HAS_PASSWORD 0x40
#define MQTT_CONN_HAS_USERNAME 0x80
typedef enum {
MQTT_TYPE_CONNECT = 1,
MQTT_TYPE_CONNACK = 2,
MQTT_TYPE_PUBLISH = 3,
MQTT_TYPE_PUBACK = 4,
MQTT_TYPE_PUBREC = 5,
MQTT_TYPE_PUBREL = 6,
MQTT_TYPE_PUBCOMP = 7,
MQTT_TYPE_SUBSCRIBE = 8,
MQTT_TYPE_SUBACK = 9,
MQTT_TYPE_UNSUBSCRIBE = 10,
MQTT_TYPE_UNSUBACK = 11,
MQTT_TYPE_PINGREQ = 12,
MQTT_TYPE_PINGRESP = 13,
MQTT_TYPE_DISCONNECT = 14,
} mqtt_type_e;
typedef enum {
MQTT_CONNACK_ACCEPTED = 0,
MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1,
MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2,
MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3,
MQTT_CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4,
MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5,
} mqtt_connack_e;
typedef struct mqtt_head_s {
unsigned char type: 4;
unsigned char dup: 1;
unsigned char qos: 2;
unsigned char retain: 1;
unsigned int length;
} mqtt_head_t;
typedef struct mqtt_message_s {
unsigned int topic_len;
const char* topic;
unsigned int payload_len;
const char* payload;
unsigned char qos;
unsigned char retain;
} mqtt_message_t;
BEGIN_EXTERN_C
#define DEFAULT_MQTT_PACKAGE_MAX_LENGTH (1 << 28) // 256M
HV_INLINE int mqtt_estimate_length(mqtt_head_t* head) {
// 28 bits => 4*7 bits varint
return 1 + 4 + head->length;
}
HV_EXPORT int mqtt_head_pack(mqtt_head_t* head, unsigned char buf[]);
HV_EXPORT int mqtt_head_unpack(mqtt_head_t* head, const unsigned char* buf, int len);
END_EXTERN_C
#endif // HV_MQTT_PROTOCOL_H_