Program Listing for File net.hpp
↰ Return to documentation for file (actorpp/net.hpp)
#pragma once
#include "actor.hpp"
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
namespace actorpp {
enum class CloseReason {
Normal,
Error,
};
#if !defined(ACTORPP_RECV_THREAD_SHUTDOWN) && !defined(ACTORPP_RECV_THREAD_PIPE)
#define ACTORPP_RECV_THREAD_PIPE
#endif
#if defined(ACTORPP_RECV_THREAD_SHUTDOWN)
class RecvThread : Actor {
public:
RecvThread(int fd, Channel<std::vector<uint8_t>> on_message,
Channel<CloseReason> on_close)
: fd(fd), on_message(std::move(on_message)),
on_close(std::move(on_close)) {}
void run() {
while (true) {
std::vector<uint8_t> buf(128);
int bytes_read = recv(fd, buf.data(), buf.size(), 0);
if (bytes_read > 0) {
buf.resize(bytes_read);
on_message.push(std::move(buf));
} else {
on_close.push(CloseReason::Normal);
break;
}
}
}
void exit() { shutdown(fd, SHUT_RD); }
private:
int fd;
Channel<std::vector<uint8_t>> on_message;
Channel<CloseReason> on_close;
};
#elif defined(ACTORPP_RECV_THREAD_PIPE)
#include <sys/poll.h>
class RecvThread : Actor {
public:
RecvThread(int fd, Channel<std::vector<uint8_t>> on_message,
Channel<CloseReason> on_close)
: fd(fd), on_message(std::move(on_message)),
on_close(std::move(on_close)) {
if (pipe(pipe_fds) != 0)
throw std::runtime_error("pipe() failed");
}
void run() {
struct pollfd fds[2];
fds[0].fd = fd;
fds[0].events = POLLIN;
fds[1].fd = pipe_fds[0];
fds[1].events = POLLIN;
while (true) {
if (poll(fds, 2, -1) <= 0)
throw std::runtime_error("poll() failed");
if (fds[1].revents != 0)
break;
if (fds[0].revents & POLLIN) {
std::vector<uint8_t> buf(128);
int bytes_read = recv(fd, buf.data(), buf.size(), 0);
if (bytes_read > 0) {
buf.resize(bytes_read);
on_message.push(std::move(buf));
} else {
on_close.push(CloseReason::Normal);
break;
}
}
}
// wait for data and close the read side of the pipe; this ensures that
// exit() never tries to write into a closed pipe
char buf[1];
if (fds[1].revents == 0)
if (read(pipe_fds[0], buf, 1) != 1)
throw std::runtime_error("read(pipe fd 0) failed");
if (close(pipe_fds[0]) != 0)
throw std::runtime_error("close(pipe fd 0) failed");
}
void exit() {
if (pipe_fds[1]) {
if (write(pipe_fds[1], "q", 1) != 1)
throw std::runtime_error("write(pipe fd 1) failed");
if (close(pipe_fds[1]) != 0)
throw std::runtime_error("close(pipe fd 1) failed");
pipe_fds[1] = 0;
}
}
~RecvThread() { exit(); }
private:
int fd;
Channel<std::vector<uint8_t>> on_message;
Channel<CloseReason> on_close;
int pipe_fds[2];
};
#else
#error "unknown RecvThread implementation"
#endif
int connect(const std::string &hostname, int port) {
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *res;
std::string port_str = std::to_string(port);
int err = getaddrinfo(hostname.c_str(), port_str.c_str(), &hints, &res);
if (err != 0 || res == NULL) {
throw std::runtime_error("dns lookup failed");
}
int sockfd = socket(res->ai_family, res->ai_socktype, 0);
if (sockfd < 0) {
freeaddrinfo(res);
throw std::runtime_error("failed to allocate socket");
}
if (connect(sockfd, res->ai_addr, res->ai_addrlen) != 0) {
close(sockfd);
freeaddrinfo(res);
throw std::runtime_error("failed to connect");
}
freeaddrinfo(res);
return sockfd;
}
} // namespace actorpp