Client provides high level MQTT client APIs.
Include header files
To use async_mqtt, include the following header file:
#include <async_mqtt/all.hpp>
In addition, if you want to use TLS and/or WebSocket, include the following header files. They are not included in all.hpp
.
For TLS:
#include <async_mqtt/predefined_layer/mqtts.hpp>
For Websocket
#include <async_mqtt/predefined_layer/ws.hpp>
For Websocket on TLS
#include <async_mqtt/predefined_layer/wss.hpp>
Create client
First, choose MQTT protocol version (v3.1.1 or v5), and then choose underlying layer.
The following namespace alias is defined in the all code example:
namespace as = boost::asio;
namespace tls = as::ssl;
mqtt
using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::mqtt;
as::io_context ioc;
auto amcl = client_t{
ioc.get_executor() // args for underlying layer (mqtt)
// mqtt is as::basic_stream_socket<as::ip::tcp, as::io_context::executor_type>
};
// handshake underlying layers
co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);
mqtts
using client_t = am::client<am::protocol_version::v5, am::protocol::mqtts>;
as::io_context ioc;
am::tls::context ctx{am::tls::context::tlsv12};
ctx.set_verify_mode(am::tls::verify_none);
// If you want to check server certificate, set cacert as follows.
// ctx.load_verify_file(cacert);
// You the broker requires additional TLS layer information,
// you can use OpenSSL directly.
//
// static const unsigned char protos[5] = {4, 'm','q','t','t'};
// auto res = SSL_CTX_set_alpn_protos(ctx.native_handle(), protos, 5);
// // (check res)
auto amcl = client_t{
ioc.get_executor(), // args for underlying layer (as::ssl::stream<mqtt>)
ctx
};
// handshake underlying layers
co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);
ws
using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::ws>;
as::io_context ioc;
auto amcl = client_t{
ioc.get_executor() // args for underlying layer (bs::websocket::stream<mqtt>)
};
// handshake underlying layers
co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);
wss
using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::wss>;
as::io_context ioc;
am::tls::context ctx{am::tls::context::tlsv12};
ctx.set_verify_mode(am::tls::verify_none);
// If you want to check server certificate, set cacert as follows.
// ctx.load_verify_file(cacert);
// You the broker requires additional TLS layer information,
// you can use OpenSSL directly.
//
// static const unsigned char protos[5] = {4, 'm','q','t','t'};
// auto res = SSL_CTX_set_alpn_protos(ctx.native_handle(), protos, 5);
// // (check res)
auto amcl = client_t{
ioc.get_executor(), // args for underlying layer ( bs::websocket::stream<mqtts>)
ctx // mqtts is as::ssl::stream<mqtt>
};
// handshake underlying layers
co_await am::async_underlying_handshake(amcl.next_layer(), host, port, as::use_awaitable);
Layer access
Typically, you only need to use next_layer()
to pass the async_underlying_handshake()
argument. However, if you need to configure each layer individually, you can access all the underlying layers.
Layer access | mqtt | mqtts | ws | wss |
---|---|---|---|---|
next_layer() |
TCP stream |
TLS stream |
WS stream |
WS stream |
next_layer().next_layer() |
- |
TCP stream |
TCP stream |
TLS stream |
next_layer().next_layer().next_layer() |
- |
- |
- |
TCP stream |
lowest_layer() |
TCP stream |
TCP stream |
TCP stream |
TCP stream |
Send MQTT CONNECT packet and start receive loop
After the handshaking of the underlying layers is complete, initiate the MQTT layer connection.
async_start() funtion
// prepare will message if you need.
am::will will{
"WillTopic1",
"WillMessage1",
am::qos::at_most_once,
{ // properties
am::property::user_property{"key1", "val1"},
am::property::content_type{"text"},
}
};
// MQTT connect and receive loop start
auto connack_opt = co_await amcl.async_start(
am::v5::connect_packet{
true, // clean_start
0x1234, // keep_alive
"ClientIdentifier1",
will, // you can pass std::nullopt if you don't want to set the will message
"UserName1",
"Password1"
},
as::use_awaitable
);
if (connack_opt) {
std::cout << *connack_opt << std::endl;
}
The parameters of CompletionToken
are error_code
and (optionally) connack_packet
. The connack_packet
will have a value only if error_code
indicates success. When using CompletionToken
with mechanisms like as::use_awaitable
, as::use_future
, or as::deferred
, the initial error_code
is converted into an exception. If you prefer to receive the error_code
as a return value, you can proceed as follows:
For detailed information about errors, refer to Errors for APIs.
// MQTT connect and receive loop start
auto [ec, connack_opt] = co_await amcl.async_start(
am::v5::connect_packet{
true, // clean_start
0x1234, // keep_alive
"ClientIdentifier1",
will, // you can pass std::nullopt if you don't want to set the will message
"UserName1",
"Password1"
},
as::as_tuple(as::use_awaitable)
);
std::cout << ec.message() << std::endl;
if (connack_opt) {
std::cout << *connack_opt << std::endl;
}
You can omit explicit packet creation as follows:
auto connack_opt = co_await amcl.async_start(
true, // clean_start
std::uint16_t(0x1234), // keep_alive
"ClientIdentifier1",
will, // you can pass std::nullopt if you don't want to set the will message
"UserName1",
"Password1",
as::use_awaitable
);
The connect_packet
correspondint to the client version is created automatically.
Send SUBSCRIBE/UNSUBSCRIBE and wait SUBACK/UNSUBACK
async_subscribe() funtion
// subscribe
// MQTT send subscribe and wait suback
std::vector<am::topic_subopts> sub_entry{
{"topic1", am::qos::at_most_once},
{"topic2", am::qos::at_least_once},
{"topic3", am::qos::exactly_once},
};
auto suback_opt = co_await amcl.async_subscribe(
am::v5::subscribe_packet{
*amcl.acquire_unique_packet_id(), // sync version only in thread safe environment
am::force_move(sub_entry) // sub_entry variable is required to avoid g++ bug
},
as::use_awaitable
);
if (suback_opt) {
std::cout << *suback_opt << std::endl;
}
The parameters of CompletionToken
are error_code
and an optional suback_packet
. The suback_packet
will have a value only if the error_code
indicates success.
async_unsubscribe() funtion
// MQTT send unsubscribe and wait unsuback
std::vector<am::topic_sharename> unsub_entry{
"topic1",
"topic2",
"topic3",
};
auto unsuback_opt = co_await amcl.async_unsubscribe(
am::v5::unsubscribe_packet{
*amcl.acquire_unique_packet_id(), // sync version only in thread safe environment
am::force_move(unsub_entry) // unsub_entry variable is required to avoid g++ bug
},
as::use_awaitable
);
if (unsuback_opt) {
std::cout << *unsuback_opt << std::endl;
}
The parameters of CompletionToken
are error_code
and an optional unsuback_packet
. The unsuback_packet
will have a value only if the error_code
indicates success.
Send PUBLISH packet and wait response
async_publish() funtion
Here is a code example that sending QoS0 PUBLISH packet.
// MQTT publish QoS0 and wait response (socket write complete)
auto pubres0 = co_await amcl.async_publish(
am::v5::publish_packet{
"topic1",
"payload1",
am::qos::at_most_once
},
as::use_awaitable
);
The parameters of CompletionToken
are error_code
and pubres_t. When you send a QoS0 PUBLISH packet, no response packet is expected, so the CompletionToken
is invoked when the underlying layer’s async_write
operation is finished. All members of pubres_t
are nullopt
.
Here is a code example that sending QoS1 PUBLISH packet.
// MQTT publish QoS1 and wait response (puback receive)
auto pid_pub1 = co_await amcl.async_acquire_unique_packet_id(as::use_awaitable); // async version
auto pubres1 = co_await amcl.async_publish(
am::v5::publish_packet{
*pid_pub1_opt,
"topic2",
"payload2",
am::qos::at_least_once
},
as::use_awaitable
);
To create a QoS1 PUBLISH packet, you need to acquire a packet identifier. The example code above uses async_acquire_unique_packet_id(), which is the asynchronous version. You can call it from anywhere you want. The parameters of CompletionToken
are error_code
and packet_id_type
. If all packet IDs have already been acquired, the error_code
parameter will be mqtt_error::packet_identifier_fully_used
. For convenience, a synchronous version, acquire_unique_packet_id(), also exists. The synchronous version must be called in a thread-safe context. For example, if you pass a strand-wrapped executor to the client upon creation, the callback handler of the asynchronous function will be in a thread-safe context.
After async_publish()
completes, the puback_opt
of pubres_t is set. You can then retrieve the PUBACK packet.
Here is a code example that sending QoS1 PUBLISH packet.
// MQTT publish QoS2 and wait response (pubrec, pubcomp receive)
auto pid_pub2 = co_await amcl.acquire_unique_packet_id_wait_until(as::use_awaitable); // async version
auto pubres2 = co_await amcl.async_publish(
am::v5::publish_packet{
pid_pub2,
"topic3",
"payload3",
am::qos::exactly_once
},
as::use_awaitable
);
To create a QoS2 PUBLISH packet, you need to acquire a packet identifier. In this example, async_acquire_unique_packet_id_wait_until() is used. The CompletionToken
parameter is an error_code and a packet identifier. If all packet identifiers are in use, the function waits until at least one packet identifier becomes available again and then invokes the CompletionToken
. This helps keep the user code simple.
After async_publish()
completes, the pubrec_opt
and pubcomp_opt
of pubres_t are set. You can then retrieve the PUBREC and PUBCOMP packets.
Receive PUBLISH packet from the broker
async_recv() funtion
auto pv = co_await amcl.async_recv(as::use_awaitable);
pv.visit(
am::overload{
[&](awaitable_client::publish_packet& p) {
std::cout << p << std::endl;
std::cout << "topic : " << p.topic() << std::endl;
std::cout << "payload : " << p.payload() << std::endl;
},
[&](awaitable_client::disconnect_packet& p) {
std::cout << p << std::endl;
},
[](auto&) {
}
}
);
After you call the async_start()
function, the received PUBLISH packets are stored in the client. You can retrieve them using the async_recv()
function. If no packets are stored, async_recv()
waits until a PUBLISH, DISCONNECT, or AUTH packet is received.
The parameters of CompletionToken
are error_code
and packet_variant
. If error_code
indicates success, packet_variant
contains a packet; otherwise, packet_variant
is set to std::monostate
.
Send DISCONNECT packet
async_disconnect() funtion
co_await amcl.async_disconnect(
am::v5::disconnect_packet{},
as::use_awaitable
);
The CompletionToken
parameter is error_code
.
Sending a DISCONNECT packet to the broker initiates a graceful disconnect sequence. The broker sends the MQTT will message if needed, then disconnects the network connection from its side. The client detects the disconnection and finally closes the client-side socket.
Close
async_close() funtion
co_await amcl.async_close(as::use_awaitable);
The CompletionToken
parameter is nothing.
If you want to close the socket forcibly, you can call the async_close()
function. This is useful, for example, when no packets are received from the broker unexpectedly and the client side doesn’t detect the disconnection.
Whole code
Supported Functionality
client supports the following functionalities: