Client provides high level MQTT client APIs.

Include header files

To use async_mqtt, include the following header file:

#include <async_mqtt/all.hpp>

In addition, if you want to use TLS and/or WebSocket, include the following header files. They are not included in all.hpp.

For TLS:

#include <async_mqtt/predefined_layer/mqtts.hpp>

For Websocket

#include <async_mqtt/predefined_layer/ws.hpp>

For Websocket on TLS

#include <async_mqtt/predefined_layer/wss.hpp>

Create client

First, choose MQTT protocol version (v3.1.1 or v5), and then choose underlying layer.

layer structure

The following namespace alias is defined in the all code example:

namespace as = boost::asio;
namespace tls = as::ssl;

mqtt

    using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::mqtt;
    as::io_context ioc;
    auto amcl = client_t{
        ioc.get_executor() // args for underlying layer (mqtt)
        // mqtt is as::basic_stream_socket<as::ip::tcp, as::io_context::executor_type>
    };
    // handshake underlying layers
    co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);

mqtts

    using client_t = am::client<am::protocol_version::v5, am::protocol::mqtts>;
    as::io_context ioc;
    am::tls::context ctx{am::tls::context::tlsv12};
    ctx.set_verify_mode(am::tls::verify_none);

    // If you want to check server certificate, set cacert as follows.
    // ctx.load_verify_file(cacert);

    // You the broker requires additional TLS layer information,
    // you can use OpenSSL directly.
    //
    // static const unsigned char protos[5] = {4, 'm','q','t','t'};
    // auto res = SSL_CTX_set_alpn_protos(ctx.native_handle(), protos, 5);
    // // (check res)

    auto amcl = client_t{
        ioc.get_executor(),  // args for underlying layer (as::ssl::stream<mqtt>)
        ctx
    };
    // handshake underlying layers
    co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);

ws

    using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::ws>;
    as::io_context ioc;
    auto amcl = client_t{
        ioc.get_executor()  // args for underlying layer (bs::websocket::stream<mqtt>)
    };
    // handshake underlying layers
    co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);

wss

    using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::wss>;
    as::io_context ioc;
    am::tls::context ctx{am::tls::context::tlsv12};
    ctx.set_verify_mode(am::tls::verify_none);
    // If you want to check server certificate, set cacert as follows.
    // ctx.load_verify_file(cacert);

    // You the broker requires additional TLS layer information,
    // you can use OpenSSL directly.
    //
    // static const unsigned char protos[5] = {4, 'm','q','t','t'};
    // auto res = SSL_CTX_set_alpn_protos(ctx.native_handle(), protos, 5);
    // // (check res)

    auto amcl = client_t{
        ioc.get_executor(),  // args for underlying layer ( bs::websocket::stream<mqtts>)
        ctx                  // mqtts is as::ssl::stream<mqtt>
    };
    // handshake underlying layers
    co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);

Layer access

Typically, you only need to use next_layer() to pass the async_underlying_handshake() argument. However, if you need to configure each layer individually, you can access all the underlying layers.

Layer access mqtt mqtts ws wss

next_layer()

TCP stream

TLS stream

WS stream

WS stream

next_layer().next_layer()

-

TCP stream

TCP stream

TLS stream

next_layer().next_layer().next_layer()

-

-

-

TCP stream

lowest_layer()

TCP stream

TCP stream

TCP stream

TCP stream

Send MQTT CONNECT packet and start receive loop

After the handshaking of the underlying layers is complete, initiate the MQTT layer connection.

async_start() funtion

// prepare will message if you need.
am::will will{
    "WillTopic1",
    "WillMessage1",
    am::qos::at_most_once,
    { // properties
        am::property::user_property{"key1", "val1"},
        am::property::content_type{"text"},
    }
};

// MQTT connect and receive loop start
auto connack_opt = co_await amcl.async_start(
    am::v5::connect_packet{
        true,   // clean_start
        0x1234, // keep_alive
        "ClientIdentifier1",
        will,   // you can pass std::nullopt if you don't want to set the will message
        "UserName1",
        "Password1"
    },
    as::use_awaitable
);
if (connack_opt) {
    std::cout << *connack_opt << std::endl;
}

The parameters of CompletionToken are error_code and (optionally) connack_packet. The connack_packet will have a value only if error_code indicates success. When using CompletionToken with mechanisms like as::use_awaitable, as::use_future, or as::deferred, the initial error_code is converted into an exception. If you prefer to receive the error_code as a return value, you can proceed as follows:

For detailed information about errors, refer to Errors for APIs.

// MQTT connect and receive loop start
auto [ec, connack_opt] = co_await amcl.async_start(
    am::v5::connect_packet{
        true,   // clean_start
        0x1234, // keep_alive
        "ClientIdentifier1",
        will,   // you can pass std::nullopt if you don't want to set the will message
        "UserName1",
        "Password1"
    },
    as::as_tuple(as::use_awaitable)
);
std::cout << ec.message() << std::endl;
if (connack_opt) {
    std::cout << *connack_opt << std::endl;
}

You can omit explicit packet creation as follows:

auto connack_opt = co_await amcl.async_start(
    true,   // clean_start
    std::uint16_t(0x1234), // keep_alive
    "ClientIdentifier1",
    will,   // you can pass std::nullopt if you don't want to set the will message
    "UserName1",
    "Password1",
    as::use_awaitable
);

The connect_packet correspondint to the client version is created automatically.

Send SUBSCRIBE/UNSUBSCRIBE and wait SUBACK/UNSUBACK

async_subscribe() funtion

// subscribe
// MQTT send subscribe and wait suback
std::vector<am::topic_subopts> sub_entry{
    {"topic1", am::qos::at_most_once},
    {"topic2", am::qos::at_least_once},
    {"topic3", am::qos::exactly_once},
};
auto suback_opt = co_await amcl.async_subscribe(
    am::v5::subscribe_packet{
        *amcl.acquire_unique_packet_id(), // sync version only in thread safe environment
        am::force_move(sub_entry) // sub_entry variable is required to avoid g++ bug
    },
    as::use_awaitable
);
if (suback_opt) {
    std::cout << *suback_opt << std::endl;
}

The parameters of CompletionToken are error_code and an optional suback_packet. The suback_packet will have a value only if the error_code indicates success.

async_unsubscribe() funtion

// MQTT send unsubscribe and wait unsuback
std::vector<am::topic_sharename> unsub_entry{
    "topic1",
    "topic2",
    "topic3",
};

auto unsuback_opt = co_await amcl.async_unsubscribe(
    am::v5::unsubscribe_packet{
        *amcl.acquire_unique_packet_id(), // sync version only in thread safe environment
        am::force_move(unsub_entry) // unsub_entry variable is required to avoid g++ bug
    },
    as::use_awaitable
);
if (unsuback_opt) {
    std::cout << *unsuback_opt << std::endl;
}

The parameters of CompletionToken are error_code and an optional unsuback_packet. The unsuback_packet will have a value only if the error_code indicates success.

Send PUBLISH packet and wait response

async_publish() funtion

Here is a code example that sending QoS0 PUBLISH packet.

// MQTT publish QoS0 and wait response (socket write complete)
auto pubres0 = co_await amcl.async_publish(
    am::v5::publish_packet{
        "topic1",
        "payload1",
        am::qos::at_most_once
    },
    as::use_awaitable
);

The parameters of CompletionToken are error_code and pubres_t. When you send a QoS0 PUBLISH packet, no response packet is expected, so the CompletionToken is invoked when the underlying layer’s async_write operation is finished. All members of pubres_t are nullopt.

Here is a code example that sending QoS1 PUBLISH packet.

// MQTT publish QoS1 and wait response (puback receive)
auto pid_pub1 = co_await amcl.async_acquire_unique_packet_id(as::use_awaitable); // async version
auto pubres1 = co_await amcl.async_publish(
    am::v5::publish_packet{
        *pid_pub1_opt,
        "topic2",
        "payload2",
        am::qos::at_least_once
    },
    as::use_awaitable
);

To create a QoS1 PUBLISH packet, you need to acquire a packet identifier. The example code above uses async_acquire_unique_packet_id(), which is the asynchronous version. You can call it from anywhere you want. The parameters of CompletionToken are error_code and packet_id_type. If all packet IDs have already been acquired, the error_code parameter will be mqtt_error::packet_identifier_fully_used. For convenience, a synchronous version, acquire_unique_packet_id(), also exists. The synchronous version must be called in a thread-safe context. For example, if you pass a strand-wrapped executor to the client upon creation, the callback handler of the asynchronous function will be in a thread-safe context.

After async_publish() completes, the puback_opt of pubres_t is set. You can then retrieve the PUBACK packet.

Here is a code example that sending QoS1 PUBLISH packet.

// MQTT publish QoS2 and wait response (pubrec, pubcomp receive)
auto pid_pub2 = co_await amcl.acquire_unique_packet_id_wait_until(as::use_awaitable); // async version
auto pubres2 = co_await amcl.async_publish(
    am::v5::publish_packet{
        pid_pub2,
        "topic3",
        "payload3",
        am::qos::exactly_once
    },
    as::use_awaitable
);

To create a QoS2 PUBLISH packet, you need to acquire a packet identifier. In this example, async_acquire_unique_packet_id_wait_until() is used. The CompletionToken parameter is an error_code and a packet identifier. If all packet identifiers are in use, the function waits until at least one packet identifier becomes available again and then invokes the CompletionToken. This helps keep the user code simple.

After async_publish() completes, the pubrec_opt and pubcomp_opt of pubres_t are set. You can then retrieve the PUBREC and PUBCOMP packets.

Receive PUBLISH packet from the broker

async_recv() funtion

auto pv = co_await amcl.async_recv(as::use_awaitable);
pv.visit(
    am::overload{
        [&](awaitable_client::publish_packet& p) {
            std::cout << p << std::endl;
            std::cout << "topic   : " << p.topic() << std::endl;
            std::cout << "payload : " << p.payload() << std::endl;
        },
        [&](awaitable_client::disconnect_packet& p) {
            std::cout << p << std::endl;
        },
        [](auto&) {
        }
    }
);

After you call the async_start() function, the received PUBLISH packets are stored in the client. You can retrieve them using the async_recv() function. If no packets are stored, async_recv() waits until a PUBLISH, DISCONNECT, or AUTH packet is received.

The parameters of CompletionToken are error_code and packet_variant. If error_code indicates success, packet_variant contains a packet; otherwise, packet_variant is set to std::monostate.

Send DISCONNECT packet

async_disconnect() funtion

co_await amcl.async_disconnect(
    am::v5::disconnect_packet{},
    as::use_awaitable
);

The CompletionToken parameter is error_code.

Sending a DISCONNECT packet to the broker initiates a graceful disconnect sequence. The broker sends the MQTT will message if needed, then disconnects the network connection from its side. The client detects the disconnection and finally closes the client-side socket.

Close

async_close() funtion

co_await amcl.async_close(as::use_awaitable);

The CompletionToken parameter is nothing.

If you want to close the socket forcibly, you can call the async_close() function. This is useful, for example, when no packets are received from the broker unexpectedly and the client side doesn’t detect the disconnection.

Whole code

Supported Functionality