Reading an HTTP Stream Using C++ Boost Beast

0 comments

Today, we will read a real HTTP stream from Oanda’s REST API using C++ code.  We will not parse the JSON snippets but simply print them to the console.

Design Objectives

We will be write a streaming app:

  • In native C++, as a Console app.
  • Connects to Oanda v20 API to stream live market data
  • Prints out individual streamed JSON messages
  • Prints a separator (“————————“) between each line to confirm proper separation of messages.
  • Fully asynchronous (via Asio).   By making it asynchronous, we can easily add on to this app.  For example, this app can also be a GUI server, a trading server, and a client to yet another broker.  (Translation: Asio is awesome!)
  • Single-threaded.  This simplifies the code.  You can have asynchronous code on a single thread: this is called “concurrency without parallelism”.
  • Does not use Asio’s coroutines, but rather Asio’s standard handler configuration.  Fun fact: Boost Asio with Coroutines is deprecated since it depends on the deprecated Boost Coroutines library.
  • Uses Boost Beast for convenient HTTP abstractions.
  • Generally portable.  This will be made with Visual Studio 2017, but the code should be portable to Linux with minimal modification, if any.  Basically, our code will not have any direct API calls.
  • This code will time out if it does not receive data from Oanda for more than 20 seconds.  It does not attempt to re-connect. 

Libraries Used

Our core library of choice is Boost Asio, a very stable general-purpose I/O library, especially suited for asynchronous networking.   A working knowledge of std::bind and lambdas in C++ would greatly help.

We will also use Boost Beast, which has some excellent abstractions suited for HTTP and WebSockets.

Finally, we will use OpenSSL for our TLS 1.2 client connection, as it is supported by Asio out of the box.

The Code Templates

Much of my code is based on Vinnie Falco’s and Chris Kohlhoff’s standard templates:

The Code, The Build

All code can be found here:
https://github.com/AndrewAMD/blog/tree/master/oanda_demo_stream

I also made a Win32 build so that you can try it yourself: Download

Post Mortem

Since the code is too large to stuff in this blog post, I will make some highlights.

First, let’s talk about getting chunks of data from an HTTP stream:

  • To get chunks, you need a parser.
  • The parser needs to be configured to have an on_chunk_body callback.
  • BUT!!! … If you are keeping your streaming session object alive using a shared_ptr, you will need some extra steps to get access to your session member variables. 

This last point is a little confusing, so here’s a snippet of the callback function:

std::size_t session::on_chunk_body(
	std::uint64_t remain,
	beast::string_view body,
	error_code& ec) {
	if (is_stopped_)
		return body.length();

	if (ec) {
		do_shutdown();
		fail(ec, "chunk");
	}

	// move timer
	heartbeat_timer_.expires_at(cro::steady_clock::now() + cro::seconds(TIMEOUT_SECONDS));
	
	// Oanda puts new lines "}/n" at the END of chunks.
	// Therefore, these strings must end with "}\n"
	beast::string_view chk = body.substr(body.length() - 2, 2),
		end = "}\n";
	if (chk != end)
	{
		incomplete_chunk_ += (std::string)body;
		return body.length();
	}

	// complete
	if (incomplete_chunk_.length()) {
		std::cout << incomplete_chunk_;
		incomplete_chunk_ = ""; // this must be reset when this is spent
	}
	std::cout << body;
	std::cout << "--------------------------------" << std::endl;
	return body.length();
}

Notice that incomplete_chunk_ is a std::string from the session object.  I need this because Oanda’s server sometimes sends incomplete chunks, which I then need to piece together myself.

I stumbled across the solution to this elsewhere on Github.  I implemented it, as follows:

// Performs an HTTP GET and prints the response
class session : public std::enable_shared_from_this
{
	// ...
	// ...
	// https://github.com/aerilon/bpistats/blob/2c53509cea3a84d87c0ae97c3622c6175fe5fd8c/src/network_session.hpp#L73
	// boost::beast::http::response_parser::on_chunk_cb takes a *reference*
	// to a callback, so we have to manage its lifetime manually. This is a
	// *major* pain :-/
	std::optional<std::function<size_t(uint64_t, beast::string_view, boost::system::error_code&)>> on_chunk_body_trampoline;

};

And the set-up:


	// set up chunk callback, with a "trampoline" per github.com/aerilon/bpistats
	{
		// https://github.com/aerilon/bpistats/blob/2c53509cea3a84d87c0ae97c3622c6175fe5fd8c/src/network_session.cpp#L121
		this->on_chunk_body_trampoline.emplace(
			[self = this->shared_from_this()](auto remain, auto body, auto ec)
		{
			return self->on_chunk_body(remain, body, ec);
		});
		this->par_.on_chunk_body(*this->on_chunk_body_trampoline);
	}

I can only imagine how much blood and tears it took the other guy to figure this out!  Anyways, it works great.

I suppose that these were all the major concerns that I had.

Feel free to comment or ask questions.

Andrew

Note: Comments have been disabled due to spam bots.  Questions are welcomed via email.