Client
Client
provides high level MQTT client APIs.
Include header files
To use async_mqtt, include the following header file:
#include <async_mqtt/all.hpp>
In addition, if you want to use TLS and/or WebSocket, include the following header files. They are not included in all.hpp
.
For TLS:
#include <async_mqtt/asio_bind/predefined_layer/mqtts.hpp>
For Websocket
#include <async_mqtt/asio_bind/predefined_layer/ws.hpp>
For Websocket on TLS
#include <async_mqtt/asio_bind/predefined_layer/wss.hpp>
Create client
First, choose MQTT protocol version (v3.1.1 or v5), and then choose underlying layer.
The following namespace alias is defined in the all code example:
namespace as = boost::asio;
namespace tls = as::ssl;
mqtt
using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::mqtt;
as::io_context ioc;
auto amcl = client_t{
ioc.get_executor() // args for underlying layer (mqtt)
// mqtt is as::basic_stream_socket<as::ip::tcp, as::io_context::executor_type>
};
// handshake underlying layers
co_await amcl.async_underlying_handshake(host, port, as::use_awaitable);
mqtts
using client_t = am::client<am::protocol_version::v5, am::protocol::mqtts>;
as::io_context ioc;
am::tls::context ctx{am::tls::context::tlsv12};
ctx.set_verify_mode(am::tls::verify_none);
// If you want to check server certificate, set cacert as follows.
// ctx.load_verify_file(cacert);
// You the broker requires additional TLS layer information,
// you can use OpenSSL directly.
//
// static const unsigned char protos[5] = {4, 'm','q','t','t'};
// auto res = SSL_CTX_set_alpn_protos(ctx.native_handle(), protos, 5);
// // (check res)
auto amcl = client_t{
ioc.get_executor(), // args for underlying layer (as::ssl::stream<mqtt>)
ctx
};
// handshake underlying layers
co_await amcl.async_underlying_handshake(host, port, as::use_awaitable);
ws
using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::ws>;
as::io_context ioc;
auto amcl = client_t{
ioc.get_executor() // args for underlying layer (bs::websocket::stream<mqtt>)
};
// handshake underlying layers
co_await amcl.async_underlying_handshake(host, port, as::use_awaitable);
wss
using client_t = am::client<am::protocol_version::v3_1_1, am::protocol::wss>;
as::io_context ioc;
am::tls::context ctx{am::tls::context::tlsv12};
ctx.set_verify_mode(am::tls::verify_none);
// If you want to check server certificate, set cacert as follows.
// ctx.load_verify_file(cacert);
// You the broker requires additional TLS layer information,
// you can use OpenSSL directly.
//
// static const unsigned char protos[5] = {4, 'm','q','t','t'};
// auto res = SSL_CTX_set_alpn_protos(ctx.native_handle(), protos, 5);
// // (check res)
auto amcl = client_t{
ioc.get_executor(), // args for underlying layer ( bs::websocket::stream<mqtts>)
ctx // mqtts is as::ssl::stream<mqtt>
};
// handshake underlying layers
co_await amcl.async_underlying_handshake(host, port, as::use_awaitable);
Layer access
Typically, calling the async_underlying_handshake()
member function is sufficient to establish the underlying layer. However, if you need to configure each layer individually, you can access all the underlying layers.
Layer access | mqtt | mqtts | ws | wss |
---|---|---|---|---|
next_layer() |
TCP stream |
TLS stream |
WS stream |
WS stream |
next_layer().next_layer() |
- |
TCP stream |
TCP stream |
TLS stream |
next_layer().next_layer().next_layer() |
- |
- |
- |
TCP stream |
lowest_layer() |
TCP stream |
TCP stream |
TCP stream |
TCP stream |
Send MQTT CONNECT packet and start receive loop
After the handshaking of the underlying layers is complete, initiate the MQTT layer connection.
async_start()
funtion
// prepare will message if you need.
am::will will{
"WillTopic1",
"WillMessage1",
am::qos::at_most_once,
{ // properties
am::property::user_property{"key1", "val1"},
am::property::content_type{"text"},
}
};
// MQTT connect and receive loop start
auto connack_opt = co_await amcl.async_start(
am::v5::connect_packet{
true, // clean_start
0x1234, // keep_alive
"ClientIdentifier1",
will, // you can pass std::nullopt if you don't want to set the will message
"UserName1",
"Password1"
},
as::use_awaitable
);
if (connack_opt) {
std::cout << *connack_opt << std::endl;
}
The parameters of CompletionToken
are error_code
and (optionally) connack_packet
. The connack_packet
will have a value only if error_code
indicates success. When using CompletionToken
with mechanisms like as::use_awaitable
, as::use_future
, or as::deferred
, the initial error_code
is converted into an exception. If you prefer to receive the error_code
as a return value, you can proceed as follows:
For detailed information about errors, refer to Error reporting.
// MQTT connect and receive loop start
auto [ec, connack_opt] = co_await amcl.async_start(
am::v5::connect_packet{
true, // clean_start
0x1234, // keep_alive
"ClientIdentifier1",
will, // you can pass std::nullopt if you don't want to set the will message
"UserName1",
"Password1"
},
as::as_tuple(as::use_awaitable)
);
std::cout << ec.message() << std::endl;
if (connack_opt) {
std::cout << *connack_opt << std::endl;
}
You can omit explicit packet creation as follows:
auto connack_opt = co_await amcl.async_start(
true, // clean_start
std::uint16_t(0x1234), // keep_alive
"ClientIdentifier1",
will, // you can pass std::nullopt if you don't want to set the will message
"UserName1",
"Password1",
as::use_awaitable
);
The connect_packet
correspondint to the client version is created automatically.
Send SUBSCRIBE/UNSUBSCRIBE and wait SUBACK/UNSUBACK
async_subscribe()
funtion
// subscribe
// MQTT send subscribe and wait suback
std::vector<am::topic_subopts> sub_entry{
{"topic1", am::qos::at_most_once},
{"topic2", am::qos::at_least_once},
{"topic3", am::qos::exactly_once},
};
auto suback_opt = co_await amcl.async_subscribe(
am::v5::subscribe_packet{
*amcl.acquire_unique_packet_id(), // sync version only in thread safe environment
am::force_move(sub_entry) // sub_entry variable is required to avoid g++ bug
},
as::use_awaitable
);
if (suback_opt) {
std::cout << *suback_opt << std::endl;
}
The parameters of CompletionToken
are error_code
and an optional suback_packet
. The suback_packet
will have a value only if the error_code
indicates success.
async_unsubscribe()
funtion
// MQTT send unsubscribe and wait unsuback
std::vector<am::topic_sharename> unsub_entry{
"topic1",
"topic2",
"topic3",
};
auto unsuback_opt = co_await amcl.async_unsubscribe(
am::v5::unsubscribe_packet{
*amcl.acquire_unique_packet_id(), // sync version only in thread safe environment
am::force_move(unsub_entry) // unsub_entry variable is required to avoid g++ bug
},
as::use_awaitable
);
if (unsuback_opt) {
std::cout << *unsuback_opt << std::endl;
}
The parameters of CompletionToken
are error_code
and an optional unsuback_packet
. The unsuback_packet
will have a value only if the error_code
indicates success.
Send PUBLISH packet and wait response
async_publish()
funtion
Here is a code example that sending QoS0 PUBLISH packet.
// MQTT publish QoS0 and wait response (socket write complete)
auto pubres0 = co_await amcl.async_publish(
am::v5::publish_packet{
"topic1",
"payload1",
am::qos::at_most_once
},
as::use_awaitable
);
The parameters of CompletionToken
are error_code
and pubres_type
. When you send a QoS0 PUBLISH packet, no response packet is expected, so the CompletionToken
is invoked when the underlying layer’s async_write
operation is finished. All members of pubres_t
are nullopt
.
Here is a code example that sending QoS1 PUBLISH packet.
// MQTT publish QoS1 and wait response (puback receive)
auto pid_pub1 = co_await amcl.async_acquire_unique_packet_id(as::use_awaitable); // async version
auto pubres1 = co_await amcl.async_publish(
am::v5::publish_packet{
*pid_pub1_opt,
"topic2",
"payload2",
am::qos::at_least_once
},
as::use_awaitable
);
To create a QoS1 PUBLISH packet, you need to acquire a packet identifier. The example code above uses async_acquire_unique_packet_id()
, which is the asynchronous version. You can call it from anywhere you want. The parameters of CompletionToken
are error_code
and packet_id_type
. If all packet IDs have already been acquired, the error_code
parameter will be mqtt_error::packet_identifier_fully_used
. For convenience, a synchronous version, acquire_unique_packet_id()
, also exists. The synchronous version must be called in a thread-safe context. For example, if you pass a strand-wrapped executor to the client upon creation, the callback handler of the asynchronous function will be in a thread-safe context.
After async_publish()
completes, the puback_opt
of pubres_type
is set. You can then retrieve the PUBACK packet.
Here is a code example that sending QoS1 PUBLISH packet.
// MQTT publish QoS2 and wait response (pubrec, pubcomp receive)
auto pid_pub2 = co_await amcl.acquire_unique_packet_id_wait_until(as::use_awaitable); // async version
auto pubres2 = co_await amcl.async_publish(
am::v5::publish_packet{
pid_pub2,
"topic3",
"payload3",
am::qos::exactly_once
},
as::use_awaitable
);
To create a QoS2 PUBLISH packet, you need to acquire a packet identifier. In this example, async_acquire_unique_packet_id_wait_until()
is used. The CompletionToken
parameter is an error_code and a packet identifier. If all packet identifiers are in use, the function waits until at least one packet identifier becomes available again and then invokes the CompletionToken
. This helps keep the user code simple.
After async_publish()
completes, the pubrec_opt
and pubcomp_opt
of pubres_type
are set. You can then retrieve the PUBREC and PUBCOMP packets.
Receive PUBLISH packet from the broker
async_recv()
funtion
auto pv = co_await amcl.async_recv(as::use_awaitable);
pv.visit(
am::overload{
[&](awaitable_client::publish_packet& p) {
std::cout << p << std::endl;
std::cout << "topic : " << p.topic() << std::endl;
std::cout << "payload : " << p.payload() << std::endl;
},
[&](awaitable_client::disconnect_packet& p) {
std::cout << p << std::endl;
},
[](auto&) {
}
}
);
After you call the async_start()
function, the received PUBLISH packets are stored in the client. You can retrieve them using the async_recv()
function. If no packets are stored, async_recv()
waits until a PUBLISH, DISCONNECT, or AUTH packet is received.
The parameters of CompletionToken
are error_code
and packet_variant
. If error_code
indicates success, packet_variant
contains a packet; otherwise, packet_variant
is set to std::monostate
.
Send DISCONNECT packet
async_disconnect()
funtion
co_await amcl.async_disconnect(
am::v5::disconnect_packet{},
as::use_awaitable
);
The CompletionToken
parameter is error_code
.
Sending a DISCONNECT packet to the broker initiates a graceful disconnect sequence. The broker sends the MQTT will message if needed, then disconnects the network connection from its side. The client detects the disconnection and finally closes the client-side socket.
Close
async_close()
funtion
co_await amcl.async_close(as::use_awaitable);
The CompletionToken
parameter is nothing.
If you want to close the socket forcibly, you can call the async_close()
function. This is useful, for example, when no packets are received from the broker unexpectedly and the client side doesn’t detect the disconnection.