This is the stackless coroutine approach. It uses the switch-case-based Boost.Asio stackless coroutine. See https://www.boost.org/doc/html/boost_asio/overview/composition/coroutine.html

It is convenient but a little tricky. Due to the switch-case basis, there are some restrictions, especially regarding defining local variables. If you can use C++20 coroutine, I recommend using it.

Stackless coroutines can avoid deeply nested callbacks, making the code easier to read.

Prepare your application class

Here is an application class prototype:

#include <boost/asio/yield.hpp>

template <typename Executor>
struct app {

private:
    struct impl {
        impl(app& a):app_{a}
        {
        }
        // forwarding callbacks
        void operator()() const {
            proc({}, am::packet_variant{});
        }
        void operator()(am::error_code const& ec) const {
            proc(ec, am::packet_variant{});
        }
        void operator()(am::error_code const& ec, am::packet_variant pv) const {
            proc(ec, am::force_move(pv));
        }
    private:
        void proc(
            am::error_code const& ec,
            am::packet_variant pv
        ) const {

            reenter (coro_) {
                std::cout << "start" << std::endl;
            }
        }

    private:
        app& app_;
        mutable as::coroutine coro_;
    };

    impl impl_{*this};
};
#include <boost/asio/unyield.hpp>

Next, add the app’s constructor as follows. The endpoint is created here. In the constructor body, impl_() is called to start the coroutine. Then, declare the app’s member variables.

    app(Executor exe,
        std::string_view host,
        std::string_view port
    ):host_{std::move(host)},
      port_{std::move(port)},
      amep_{am::protocol_version::v3_1_1, exe}
    {
        impl_();
    }
    // ...

private:
    void proc(
        am::error_code const& ec,
        am::packet_variant pv
    ) {
        reenter (coro) {
        }
    }

    std::string_view host_;
    std::string_view port_;
    am::endpoint<am::role::client, am::protocol::mqtt> amep_;
    std::size_t count_ = 0;
    impl impl_{*this};
    // prepare will message if you need.
    am::will will{
        "WillTopic1",
        "WillMessage1",
        am::qos::at_most_once,
        { // properties
            am::property::user_property{"key1", "val1"},
            am::property::content_type{"text"},
        }
    };
};

Next, implement int main() as follows:

int main(int argc, char* argv[]) {
    if (argc != 3) {
        std::cout << "Usage: " << argv[0] << " host port" << std::endl;
        return -1;
    }
    am::setup_log(am::severity_level::trace);
    as::io_context ioc;
    app a{ioc.get_executor(), argv[1], argv[2]};
    ioc.run();
}

When you execute the program, you will see the start output message. This indicates that the coroutine has started successfully.

        void proc(
            am::error_code const& ec,
            am::packet_variant pv
        ) const {

            reenter (coro_) {
                std::cout << "start" << std::endl;

This is the basics of the stackless coroutine approach. Now, let’s add a more meaningful sequence.

Handshake underlying layers (mqtt, mqtts, ws, wss)

        void proc(
            am::error_code const& ec,
            am::packet_variant pv
        ) const {

            reenter (coro_) {
                std::cout << "start" << std::endl;

                // Handshake undlerying layer (Name resolution and TCP handshaking)
                yield am::async_underlying_handshake(
                    app_.amep_.next_layer(),
                    app_.host_,
                    app_.port_,
                    *this
                );

                std::cout
                    << "Underlying layer connected ec:"
                    << ec.message()
                    << std::endl;

                if (ec) return;

The important point is

                yield am::async_underlying_handshake(
                    app_.amep_.next_layer(),
                    app_.host_,
                    app_.port_,
                    *this
                );

The fourth argument of underlying_handshake is CompletionToken. When using a stackless coroutine, we pass *this as the CompletionToken. The function proc() is implicitly returned and underlying_handshake starts processing. When the asynchronous process is finished, the following operator() is called:

        void operator()(am::error_code const& ec) const {
            proc(ec, am::packet_variant{});
        }

Then, proc is called. You can distinguish which async process has finished by proc()’s parameter. You can check `ec as follows:

                std::cout
                    << "Underlying layer connected ec:"
                    << ec.message()
                    << std::endl;

                if (ec) return;

Even if proc() is called again, the following part of the code is skipped:

                std::cout << "start" << std::endl;

                // Handshake undlerying layer (Name resolution and TCP handshaking)
                yield am::underlying_handshake(
                    app_.amep_.next_layer(),
                    app_.host_,
                    app_.port_,
                    *this
                );

This is the switch-case-based Boost.Asio stackless coroutine mechanism. See https://www.boost.org/doc/html/boost_asio/overview/composition/coroutine.html

Send MQTT CONNECT packet

Create MQTT CONNECT packet and send it as follows:

                // Send MQTT CONNECT
                yield app_.amep_.async_send(
                    am::v3_1_1::connect_packet{
                        true,   // clean_session
                        0x1234, // keep_alive
                        "ClientIdentifier1",
                        app_.will,   // you can pass std::nullopt if you don't want to set the will message
                        "UserName1",
                        "Password1"
                    },
                    *this
                );

When the async process is finished, the function resumes at the following line:

                if (ec) {
                    std::cout << "MQTT CONNECT send error:" << ec.message() << std::endl;
                    return;
                }

The parameter of the completion token is error_code const& ec. See API reference.

Recv MQTT CONNACK packet

Receive MQTT packet as follows:

                // Recv MQTT CONNACK
                yield app_.amep_.async_recv(*this);

When a packet is received then the function resumes at the following line:

                if (ec) {
                    std::cout
                        << "MQTT CONNACK recv error:"
                        << ec.message()
                        << std::endl;
                    return;
                }
                else {
                    BOOST_ASSERT(pv); // if ec is not error, then pv is always valid.
                    pv.visit(
                        am::overload {
                            [&](am::v3_1_1::connack_packet const& p) {
                                std::cout
                                    << "MQTT CONNACK recv"
                                    << " sp:" << p.session_present()
                                    << std::endl;
                            },
                            [](auto const&) {}
                        }
                    );
                }

The parameters of the completion token are error_code const& ec and packet_variant pv. The ec can be converted to a boolean. If ec evaluates to true, an error has occurred. For detailed information about errors, refer to Errors for APIs.

If there is no error, you can access the pv using the visit function and overloaded lambda expressions. Each lambda expression corresponds to the actual packet type.

Send/Recv packets

See the simple example ep_slcoro_mqtt_client.cpp.

If you want to explore a more complex use case, client_cli.cpp is helpful. It is a command-line MQTT client application.

Examples