This is stackless coroutine approach. It uses switch-case based Boost.Asio stackless corotuine. See https://www.boost.org/doc/html/boost_asio/overview/composition/coroutine.html

It is convenient but a little tricky. Due to switch-case based, there are some restrictions especially define local variables. If you can use C++20 coroutine, I recommend use it.

stackless coroutine can avoid deeply nested callbacks. The code becomes easy to read.

Prepare your application class

Here is a application class prototype:

=include <boost/asio/yield.hpp>

struct app {
    // forwarding callbacks
    void operator()() {
        proc({}, {}, {}, {});
    }
    void operator()(boost::system::error_code const& ec) {
        proc(ec, {}, {}, {});
    }
    void operator()(boost::system::error_code ec, as::ip::tcp::resolver::results_type eps) {
        proc(ec, {}, {}, std::move(eps));
    }
    void operator()(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
        proc(ec, {}, {}, {});
    }
    void operator()(am::system_error const& se) {
        proc({}, se, {}, {});
    }
    void operator()(am::packet_variant pv) {
        proc({}, {}, am::force_move(pv), {});
    }

private:
    void proc(
        boost::system::error_code const& ec,
        am::system_error const& se,
        am::packet_variant pv,
        std::optional<as::ip::tcp::resolver::results_type> eps
    ) {
        reenter (coro) {
        }
    }

    as::coroutine coro;
};

#include <boost/asio/unyield.hpp>

Then, add copyable member variables to access from proc() and initialize them by the constructor.

struct app {
    app(as::ip::tcp::resolver& res,
        std::string host,
        std::string port,
        am::endpoint<am::role::client, am::protocol::mqtt>& amep
    ):res{res},
      host{std::move(host)},
      port{std::move(port)},
      amep{amep}
    {}

    // ...

private:
    void proc(
        boost::system::error_code const& ec,
        am::system_error const& se,
        am::packet_variant pv,
        std::optional<as::ip::tcp::resolver::results_type> eps
    ) {
        reenter (coro) {
        }
    }

    as::ip::tcp::resolver& res;
    std::string host;
    std::string port;
    am::endpoint<am::role::client, am::protocol::mqtt>& amep;
    std::size_t count = 0;    as::coroutine coro;
};

Then, create your application class instance and call the function call operator.

    as::io_context ioc;

    // To get IP address from hostname
    as::ip::tcp::socket resolve_sock{ioc};
    as::ip::tcp::resolver res{resolve_sock.get_executor()};

    auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
        am::protocol_version::v3_1_1, // choose MQTT version v3_1_1 or v5
        ioc.get_executor() // args for underlying layer (mqtt)
        // mqtt is as::basic_stream_socket<as::ip::tcp, as::io_context::executor_type>
    );

    // Added these code
    app a{res, argv[1], argv[2], *amep};
    a();
    ioc.run();

When a() is called then "start" is output.

    void proc(
        boost::system::error_code const& ec,
        am::system_error const& se,
        am::packet_variant pv,
        std::optional<as::ip::tcp::resolver::results_type> eps
    ) {
        reenter (coro) {
            std::cout << "start" << std::endl;
        }
    }

This is the basic of stackless coroutine approach. Now, adding more meaningful sequence.

Resolve hostname (mqtt, mqtts, ws, wss)

If you use IP Address directly, you can skip resolve phase.

    void proc(
        boost::system::error_code const& ec,
        am::system_error const& se,
        am::packet_variant pv,
        std::optional<as::ip::tcp::resolver::results_type> eps
    ) {
        reenter (coro) {
            std::cout << "start" << std::endl;

            // Resolve hostname
            yield res.async_resolve(host, port, *this);
            // The function finish here, and when async_resolve is finished
            // resume at the next line.
            std::cout << "async_resolve:" << ec.message() << std::endl;
            if (ec) return;

The important point is

            yield res.async_resolve(host, port, *this);

The third argument of async_resolve is CompletionToken. In this case, asio document said it ResolveToken. When we use stackless coroutine, we pass *this as the CompletionToken. The function proc() is implicitly returned and async_resolve starts processing. When async process (resolving hostname) is finished, then the following operator() is called:

    void operator()(boost::system::error_code ec, as::ip::tcp::resolver::results_type eps) {
        proc(ec, {}, {}, std::move(eps));
    }

Then, proc is called. You can distinguish which async process is finished by proc()'s parameter. You can check ec as follows:

            std::cout << "async_resolve:" << ec.message() << std::endl;
            if (ec) return;

Even if proc() is called again, the following part of the code is skipped:

            std::cout << "start" << std::endl;

            // Resolve hostname
            yield res.async_resolve(host, port, *this);

This is switch-case based Boost.Asio stackless coroutine mechanism. See https://www.boost.org/doc/html/boost_asio/overview/composition/coroutine.html

TCP connect (mqtt, mqtts, ws, wss)

Now, hostname is resolved. The next step is making TCP connection.

            // Underlying TCP connect
            yield as::async_connect(
                amep.next_layer(), // or amep.lowest_layer()
                *eps,
                *this
            );
            std::cout
                << "TCP connected ec:"
                << ec.message()
                << std::endl;

            if (ec) return;

TLS handshake and WS handshake

See #Examples[Examples]

Send MQTT CONNECT packet

Create MQTT CONNECT packet and send it as follows:

            // Send MQTT CONNECT
            yield amep.send(
                am::v3_1_1::connect_packet{
                    true,   // clean_session
                    0x1234, // keep_alive
                    am::allocate_buffer("cid1"),
                    am::nullopt, // will
                    am::nullopt, // username set like am::allocate_buffer("user1"),
                    am::nullopt  // password set like am::allocate_buffer("pass1")
                },
                *this
            );

When async process is finished the function resumes at the following line:

            if (se) {
                std::cout << "MQTT CONNECT send error:" << se.what() << std::endl;
                return;
            }

The parameter of the completion token is system_error const& se. See API reference.

Recv MQTT CONNACK packet

Receive MQTT packet as follows:

            // Recv MQTT CONNACK
            yield amep.recv(*this);

When a packet is received then the function resumes at the following line:

            if (pv) {
                pv.visit(
                    am::overload {
                        [&](am::v3_1_1::connack_packet const& p) {
                            std::cout
                                << "MQTT CONNACK recv"
                                << " sp:" << p.session_present()
                                << std::endl;
                        },
                        [](auto const&) {}
                    }
                );
            }
            else {
                std::cout
                    << "MQTT CONNACK recv error:"
                    << pv.get<am::system_error>().what()
                    << std::endl;
                return;
            }

The parameter of the completion token is packet_variant pv. You can access the pv using visit function and overloaded lamnda expressions. Each lambda expression is corresponding to the actual packet type. pv can be evalurated as bool. If any receive error happens then pv evaluated as false, otherwise true.

Send/Recv packets

See the simple example ep_slcoro_mqtt_client.cpp.

If you want to know more complex usecase, client_cli.cpp is helpful. This is commandline MQTT client application.

Examples