.. _program_listing_file_eshet.hpp: Program Listing for File eshet.hpp ================================== |exhale_lsh| :ref:`Return to documentation for file ` (``eshet.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include "actorpp/actor.hpp" #include "actorpp/net.hpp" #include "eshet/commands.hpp" #include "eshet/data.hpp" #include "eshet/log.hpp" #include "eshet/msgpack_to_string.hpp" #include "eshet/unpack.hpp" #include "eshet/util.hpp" #include namespace eshet { using namespace actorpp; struct TimeoutConfig { std::chrono::seconds idle_ping{15}; std::chrono::seconds server_timeout{30}; std::chrono::seconds ping_timeout{5}; }; namespace detail { class ESHETClientActor : public Actor { using clock = std::chrono::steady_clock; using time_point = std::chrono::time_point; public: explicit ESHETClientActor(const std::string &hostname, int port, std::optional id = {}, TimeoutConfig timeout_config = {}) : hostname(hostname), port(port), id(std::move(id)), timeout_config(std::move(timeout_config)), ping_result(*this), should_exit(*this), on_message(*this), on_close(*this), on_command(*this), on_reply(*this), send_buf(128) {} explicit ESHETClientActor(const std::pair &hostport, std::optional id = {}, TimeoutConfig timeout_config = {}) : ESHETClientActor(hostport.first, hostport.second, std::move(id), std::move(timeout_config)) {} template void action_call_pack(std::string path, Channel result_chan, const T &args) { std::unique_ptr z = std::make_unique(); msgpack::object_handle oh(msgpack::object(args, *z), std::move(z)); on_command.emplace( ActionCall{std::move(path), std::move(result_chan), std::move(oh)}); } void action_register(std::string path, Channel result_chan, Channel call_chan) { on_command.emplace(ActionRegister{std::move(path), std::move(result_chan), std::move(call_chan)}); } void state_register(std::string path, Channel result_chan) { on_command.emplace(StateRegister{std::move(path), std::move(result_chan)}); } template void state_changed(std::string path, const T &value, Channel result_chan) { std::unique_ptr z = std::make_unique(); msgpack::object_handle oh(msgpack::object(value, *z), std::move(z)); on_command.push(StateChanged{std::move(path), std::move(result_chan), Known{std::move(oh)}}); } void state_unknown(std::string path, Channel result_chan) { on_command.push( StateChanged{std::move(path), std::move(result_chan), Unknown{}}); } void state_observe(std::string path, Channel result_chan, Channel changed_chan) { on_command.push(StateObserve{std::move(path), std::move(result_chan), std::move(changed_chan)}); } void event_register(std::string path, Channel result_chan) { on_command.push(EventRegister{std::move(path), std::move(result_chan)}); } template void event_emit(std::string path, const T &value, Channel result_chan) { std::unique_ptr z = std::make_unique(); msgpack::object_handle oh(msgpack::object(value, *z), std::move(z)); on_command.push( EventEmit{std::move(path), std::move(result_chan), std::move(oh)}); } void event_listen(std::string path, Channel event_chan, Channel result_chan) { on_command.push(EventListen{std::move(path), std::move(result_chan), std::move(event_chan)}); } void get(std::string path, Channel result_chan) { on_command.push(Get{std::move(path), std::move(result_chan)}); } template void set(std::string path, const T &value, Channel result_chan) { std::unique_ptr z = std::make_unique(); msgpack::object_handle oh(msgpack::object(value, *z), std::move(z)); on_command.push( Set{std::move(path), std::move(result_chan), std::move(oh)}); } void test_disconnect() { on_command.push(Disconnect{}); } // disconnect and stop the threads. It's not necessary to call this, but it // may help the destructor run faster void exit() { should_exit.push(true); } protected: void run() { // repeatedly call loop, with exponential backoff from min_delay to // max_delay, resetting back to min_delay if loop runs for at least // reset_thresh const std::chrono::seconds min_delay(1); const std::chrono::seconds max_delay(30); const std::chrono::seconds reset_thresh(10); std::chrono::seconds delay(min_delay); while (true) { auto start = clock::now(); loop(); auto end = clock::now(); if ((end - start) >= reset_thresh) delay = min_delay; if (wait_for(delay, should_exit) == 0) { cleanup_connection(); return; } delay *= 2; if (delay > max_delay) delay = max_delay; } } private: // connect, say hello, then loop receiving messages; returns if there was // an error, or if we should exit void loop() { if (!connect()) return; connection_id++; if (!do_hello()) return; if (!reregister()) return; while (true) { time_point timeout = ping_timeout ? std::min(*ping_timeout, idle_timeout) : idle_timeout; switch (wait_until(timeout, ping_result, on_close, on_message, on_reply, on_command, should_exit)) { case -1: { // timeout if (timeout == idle_timeout) { CommandVisitor{*this}(Ping{ping_result}); ping_timeout = clock::now() + timeout_config.ping_timeout; } else { // ping_timeout return; } } break; case 0: { // ping_result Result r = ping_result.read(); if (!std::holds_alternative(r)) throw ProtocolError(); // bad response to ping ping_timeout.reset(); } break; case 1: { // on_close on_close.read(); // XXX: do something with this return; } break; case 2: { // on_message unpacker.push(on_message.read()); std::optional> message; while ((message = unpacker.read())) { handle_message(*message); } } break; case 3: { // on_reply uint16_t call_connection_id; uint16_t id; Result result; std::tie(call_connection_id, id, result) = on_reply.read(); if (call_connection_id == connection_id) { send_buf.write_reply(id, result); send_send_buf(); } } break; case 4: { // on_command Command c = on_command.read(); std::visit(CommandVisitor{*this}, std::move(c)); } break; case 5: { // should_exit return; } break; } } } // methods relating to connection setup and teardown bool connect() { cleanup_connection(); try { sockfd = actorpp::connect(hostname, port); } catch (std::runtime_error &e) { log.error(e.what()); return false; } recv_thread = std::make_unique>( sockfd, on_message, on_close); return true; } void cleanup_connection() { if (recv_thread) { recv_thread.reset(); } if (sockfd != -1) { if (close(sockfd) != 0) throw std::runtime_error("close(sockfd) failed"); sockfd = -1; } for (auto &pair : reply_channels) std::visit([](auto &c) { c.push(Error("disconnected")); }, pair.second); reply_channels.clear(); for (auto &state : observed_states) state.second.push(Unknown{}); ping_timeout.reset(); // make sure to clear this after sending the disconnected messages, // otherwise there may still be a disconnect message left over ping_result.clear(); on_close.clear(); on_message.clear(); } // send and receive hello messages, returns success bool do_hello() { send_buf.write_hello(id, timeout_config.server_timeout.count()); send_send_buf(); while (true) { switch (wait(on_close, on_message, should_exit)) { case 0: on_close.read(); return false; case 1: { unpacker.push(on_message.read()); std::optional> message; if ((message = unpacker.read())) { handle_hello_message(*message); // no reason for the server to have sent us any more messages here if (unpacker.read()) throw ProtocolError(); return true; } } break; case 2: return false; } } } void handle_hello_message(const std::vector &msg) { if (msg.size() < 1) throw ProtocolError(); switch (msg[0]) { case 0x03: { // {hello} Parser p(&msg[1], msg.size() - 1); p.check_empty(); } break; case 0x04: { // {hello_id, ClientID} Parser p(&msg[1], msg.size() - 1); id = p.read_msgpack(); p.check_empty(); } break; default: throw ProtocolError(); } } // send registration commands after reconnecting bool reregister() { for (auto &action : action_channels) { const std::string &path = action.first; uint16_t id = get_id(); send_buf.write_action_register(id, path); send_send_buf(); if (!check_success(id, path)) return false; } for (auto &state : registered_states) { uint16_t id = get_id(); const std::string &path = state.first; send_buf.write_state_register(id, path); send_send_buf(); if (!check_success(id, path)) return false; id = get_id(); send_buf.write_state_changed(id, path, state.second); send_send_buf(); if (!check_success(id, path)) return false; } for (auto &state : observed_states) { uint16_t id = get_id(); const std::string &path = state.first; send_buf.write_state_observe(id, path); send_send_buf(); auto reply = wait_for_reply(id); if (!reply) return false; if (!std::visit(HandleStateReplyVisitor{*this, path, state.second}, std::move(*reply))) return false; } for (auto &path : registered_events) { uint16_t id = get_id(); send_buf.write_event_register(id, path); send_send_buf(); if (!check_success(id, path)) return false; } for (auto &event : listened_events) { uint16_t id = get_id(); const std::string &path = event.first; send_buf.write_event_listen(id, path); send_send_buf(); if (!check_success(id, path)) return false; } return true; } // wait for a reply message, and check that it is not an error, while // processing other messages normally bool check_success(uint16_t id, const std::string &path) { auto reply = wait_for_reply(id); if (reply) return std::visit(CheckResultSuccessVisitor{*this, path}, std::move(*reply)); else return false; } template std::optional wait_for_reply(uint16_t id) { Channel result_chan(*this); reply_channels.emplace(id, result_chan); while (true) { switch (wait(on_close, on_message, result_chan, should_exit)) { case 0: // on_close on_close.read(); return {}; case 1: { // on_message unpacker.push(on_message.read()); std::optional> message; while ((message = unpacker.read())) { handle_message(*message); } } break; case 2: { // result_chan return result_chan.read(); } break; case 3: return {}; } } } struct CheckResultBase { ESHETClientActor &c; const std::string &path; bool operator()(const Error &e) { std::string error_msg = "error while adding " + path + ": "; append_msgpack(error_msg, e.value.get()); c.log.error(error_msg); return false; } }; struct CheckResultSuccessVisitor : public CheckResultBase { using CheckResultBase::operator(); bool operator()(const Success &s) { return true; } }; struct HandleStateReplyVisitor : public CheckResultBase { using CheckResultBase::operator(); Channel &channel; bool operator()(Known s) { channel.push(std::move(s)); return true; } bool operator()(Unknown s) { channel.push(std::move(s)); return true; } }; // methods for handling commands from the user and sending outgoing messages struct CommandVisitor { ESHETClientActor &c; void operator()(ActionCall cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); c.send_buf.write_action_call(id, cmd.path, *cmd.args); c.send_send_buf(); } void operator()(ActionRegister cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); auto it = c.action_channels .emplace(std::move(cmd.path), std::move(cmd.call_chan)) .first; c.send_buf.write_action_register(id, it->first); c.send_send_buf(); } void operator()(StateRegister cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); auto it = c.registered_states.emplace(std::move(cmd.path), Unknown{}).first; c.send_buf.write_state_register(id, it->first); c.send_send_buf(); } void operator()(StateChanged cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); c.registered_states[cmd.path] = std::move(cmd.value); c.send_buf.write_state_changed(id, cmd.path, cmd.value); c.send_send_buf(); } void operator()(StateObserve cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); auto it = c.observed_states .emplace(std::move(cmd.path), std::move(cmd.changed_chan)) .first; c.send_buf.write_state_observe(id, it->first); c.send_send_buf(); } void operator()(EventRegister cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); auto it = c.registered_events.emplace(std::move(cmd.path)).first; c.send_buf.write_event_register(id, *it); c.send_send_buf(); } void operator()(EventEmit cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); c.send_buf.write_event_emit(id, cmd.path, *cmd.value); c.send_send_buf(); } void operator()(EventListen cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); auto it = c.listened_events .emplace(std::move(cmd.path), std::move(cmd.event_chan)) .first; c.send_buf.write_event_listen(id, it->first); c.send_send_buf(); } void operator()(Get cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); c.send_buf.write_get(id, cmd.path); c.send_send_buf(); } void operator()(Set cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); c.send_buf.write_set(id, cmd.path, *cmd.value); c.send_send_buf(); } void operator()(Ping cmd) { uint16_t id = c.get_id(); c.reply_channels.emplace(id, std::move(cmd.result_chan)); c.send_buf.write_ping(id); c.send_send_buf(); } void operator()(Disconnect d) { c.on_close.push(CloseReason::Error); } }; uint16_t get_id() { return next_id++; } void send_send_buf() { idle_timeout = clock::now() + timeout_config.idle_ping; if (send(sockfd, send_buf.sbuf.data(), send_buf.sbuf.size(), MSG_NOSIGNAL) < 0) throw Disconnected{}; } // methods for handling incoming messages void handle_message(const std::vector &msg) { if (msg.size() < 1) throw ProtocolError(); switch (msg[0]) { case 0x03: case 0x04: throw ProtocolError(); // shouldn't get a hello message case 0x05: { // {reply, Id, {ok, Msg}} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); handle_reply(id, Success(std::move(oh))); } break; case 0x06: { // {reply, Id, {error, Msg}} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); handle_reply(id, Error(std::move(oh))); } break; case 0x07: { // {reply_state, Id, {known, Msg}} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); handle_reply(id, Known(std::move(oh))); } break; case 0x08: { // {reply_state, Id, unknown} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); p.check_empty(); handle_reply(id, Unknown()); } break; case 0x0a: { // {reply_state, Id, {known, Msg}, T} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); uint32_t t = p.read32(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); handle_reply(id, Known(std::move(oh), Time{t})); } break; case 0x0b: { // {reply_state, Id, unknown, T} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); uint32_t t = p.read32(); p.check_empty(); handle_reply(id, Unknown(Time(t))); } break; case 0x11: { // {action_call, Id, Path, Msg} Parser p(&msg[1], msg.size() - 1); uint16_t id = p.read16(); std::string path = p.read_string(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); auto it = action_channels.find(path); if (it == action_channels.end()) // missing callback throw ProtocolError(); it->second.emplace(connection_id, id, std::move(oh), on_reply); } break; case 0x33: { // {event_notify, Path, Msg} Parser p(&msg[1], msg.size() - 1); std::string path = p.read_string(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); auto it = listened_events.find(path); if (it == listened_events.end()) // unknown event throw ProtocolError(); it->second.emplace(std::move(oh)); } break; case 0x44: { // {state_changed, Path, {known, State}} Parser p(&msg[1], msg.size() - 1); std::string path = p.read_string(); msgpack::object_handle oh = p.read_msgpack(); p.check_empty(); auto it = observed_states.find(path); if (it == observed_states.end()) // unknown state throw ProtocolError(); it->second.emplace(Known(std::move(oh))); } break; case 0x45: { // {state_changed, Path, unknown} Parser p(&msg[1], msg.size() - 1); std::string path = p.read_string(); p.check_empty(); auto it = observed_states.find(path); if (it == observed_states.end()) // unknown state throw ProtocolError(); it->second.emplace(Unknown()); } break; } } void handle_reply(uint16_t id, AnyResult result) { auto it = reply_channels.find(id); if (it == reply_channels.end()) // missing callback throw ProtocolError(); auto nh = reply_channels.extract(it); if (!std::visit( [&](auto &cb) { using T = typename std::remove_reference::type::type; return detail::convert_variant( std::move(result), [&](T result_t) { return cb.push(std::move(result_t)); }); }, nh.mapped())) // wrong type of return throw ProtocolError(); } std::string hostname; int port; std::optional id; TimeoutConfig timeout_config; std::optional ping_timeout; time_point idle_timeout; Channel ping_result; Logger log; Channel should_exit; int sockfd = -1; Channel> on_message; Channel on_close; std::unique_ptr> recv_thread; uint16_t connection_id = 0; Unpacker unpacker; using ReplyChannel = std::variant, Channel>; std::map reply_channels; std::map> action_channels; std::map registered_states; std::map> observed_states; std::set registered_events; std::map> listened_events; Channel on_command; Channel> on_reply; SendBuf send_buf; uint16_t next_id = 0; }; } // namespace detail using ESHETClient = ActorThread; } // namespace eshet