#include "ipc.hpp"
class certd_fd_t; // fwd
static FdTable<certd_fd_t*> fds; ///< certd or client fds
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
class MultiHandler {
private:
typedef struct {
int fd;
ipc_handler_t handler;
void* handler_ctx;
} handler_t;
std::vector<handler_t> handlers;
public:
~MultiHandler() { reset(); } ///< as reset()
void enqueue(int, ipc_handler_t, void*); ///< adds client fd and handler to be called
void dequeue(int); ///< removes previously added client fd, and deletes it from #fds - O(n)
void handle(char*, size_t); ///< calls and removes all handlers, and deletes them from #fds
void reset(); ///< dummy-calls and removes all handlers, and deletes them from #fds
};
void MultiHandler::enqueue(int fd, ipc_handler_t handler, void* handler_ctx) {
handlers.push_back((handler_t){fd, handler, handler_ctx});
}
void MultiHandler::dequeue(int fd) {
for (std::vector<handler_t>::iterator it=handlers.begin(); it!=handlers.end(); ++it) {
if (it->fd == fd) {
assert(fds[it->fd] != NULL);
fds[it->fd] = NULL;
handlers.erase(it);
return;
}
}
assert(false);
}
void MultiHandler::handle(char* buf, size_t len) {
for (std::vector<handler_t>::iterator it=handlers.begin(); it!=handlers.end(); ++it) {
assert(fds[it->fd] != NULL);
fds[it->fd] = NULL;
it->handler(it->fd, it->handler_ctx, buf, len);
}
handlers.clear();
}
void MultiHandler::reset() {
handle(NULL, 0);
}
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
class fd_t {
private:
int fd;
char* const path; ///< copy for reconnecting from @see slab_pool
const poll_handler_t handler;
certd_fd_t* const parent; ///< for maintaining @see fds
time_t last_connect;
public:
fd_t(const char*, poll_handler_t, certd_fd_t*);
~fd_t();
int get(); ///< get fd ((re-)connect first if reasonable)
void failed(); ///< close fd obtained via get() due to i/o error
};
fd_t::fd_t(const char* _path, poll_handler_t _handler, certd_fd_t* _parent): fd(-1), path(slab_pool.dup(_path)), handler(_handler), parent(_parent), last_connect(0) {
}
fd_t::~fd_t() {
failed();
slab_pool.push(path);
}
int fd_t::get() {
if (likely(fd != -1)) {
return fd;
} else if (last_connect >= NOW - 30) {
return -1;
} else {
log(io, "(re-)connecting to '%s'", path);
fd = uds_connect(path);
last_connect = NOW;
if (fd != -1) {
assert(fds[fd] == NULL);
fds[fd] = parent;
Poll::getInst()->add(fd, handler, EVENT_CLOSE|EVENT_IN, false);
}
return fd;
}
}
void fd_t::failed() {
if (fd == -1) {
return;
}
log(io, "closing %d to '%s'", fd, path);
assert(fds[fd] == parent);
fds[fd] = NULL;
Poll::getInst()->del(fd);
EINTR_RETRY(close(fd));
fd = -1;
}
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
class certd_fd_t {
private:
fd_t fd; ///< certd fd
MultiHandler clients; ///< client fds to wake up
char request[1024];
static bool line_handler(int, char*, size_t); ///< incoming data on the given certd #fd
static void poll_handler(int, event_t, unsigned, void*); ///< incoming event on given certd #fd, reads, and calls line_handler()
public:
certd_fd_t(const char*);
int fd_get();
void fd_failed(); ///< some i/o failed on fd obtained from fd_get(), might wakeup client
const char* is_busy() const; ///< whether there is a request pending on fd_get() and which one if so
void set_busy(const char*, int, ipc_handler_t, void*); ///< just wrote this request to fd_get() for the given client fd, or add fd to #clients if the request matches
void unset_busy(int); ///< set_busy() fd is not interested anymore, discard results
};
certd_fd_t::certd_fd_t(const char* path): fd(path, &poll_handler, this) {
*request = '\0';
}
int certd_fd_t::fd_get() {
return fd.get();
}
void certd_fd_t::fd_failed() {
fd.failed();
clients.reset();
*request = '\0';
}
const char* certd_fd_t::is_busy() const {
return *request? request: NULL;
}
void certd_fd_t::set_busy(const char* req, int cfd, ipc_handler_t handler, void* handler_ctx) {
assert(*req != '\0');
if (!is_busy()) {
assert(strlen(req) < sizeof(request));
strcpy(request, req);
} else {
assert(strcmp(request, req) == 0);
}
clients.enqueue(cfd, handler, handler_ctx);
assert(fds[cfd] == NULL);
fds[cfd] = this;
}
void certd_fd_t::unset_busy(int cfd) {
clients.dequeue(cfd);
}
bool certd_fd_t::line_handler(int _fd, char* b, size_t l) {
certd_fd_t* inst = fds[_fd];
log(debug, "ipc result from %d", inst->fd_get());
inst->clients.handle(b, l);
*inst->request = '\0'; // not busy anymore
return true; // go on
}
void certd_fd_t::poll_handler(int _fd, event_t ev, unsigned, void*) {
certd_fd_t* inst = fds[_fd];
assert(inst);
// upon shutdown (i.e. wakeup broadcast), don't collect results anymore. instead, remove fd from the pollset s.t. it can get empty.
if (unlikely(SHUTDOWN)) {
if (!inst->is_busy() || SHUTDOWN > SHUTDOWN_GRACEFUL) {
// close and might call handler with null
inst->fd_failed();
return;
}
}
// don't care about events, fd is nonblocking anyhow and we will detect errors then. timeouts are not interesting here, too.
if (separator_read(_fd, &line_handler) == TRI_FALSE) {
inst->fd_failed();
return;
}
// try again to more gracefully close fd in case of shutdown
if (unlikely(SHUTDOWN)) {
if (!inst->is_busy()) {
inst->fd_failed();
return;
}
}
}
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
typedef std::vector<certd_fd_t*> certd_fds_t;
static bool certd_sock_config(void*& ctx, char* value) {
certd_fds_t* socks = (certd_fds_t*)ctx;
if (!socks) {
socks = new certd_fds_t();
ctx = (void*)socks;
}
socks->push_back(new certd_fd_t(value));
return true;
}
static void certd_sock_unconfig(void*& ctx) {
if (ctx) {
certd_fds_t* socks = (certd_fds_t*)ctx;
for (certd_fds_t::iterator it = socks->begin(); it != socks->end(); ++it) {
delete *it; // aborts pending requests
}
delete socks;
ctx = NULL;
}
}
CONF_DEF(config) {
ConfigKey certd_sock;
};
CONF_INIT(config) {
CONF_KEY_INIT(certd_sock, true, true, &certd_sock_config, &certd_sock_unconfig);
}
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
bool ipc_enqueue(int cfd, ipc_handler_t handler, void* handler_ctx, const char* buf, size_t len) {
// start early not accepting new requests
if (unlikely(SHUTDOWN)) {
return false;
}
// check config
certd_fds_t* socks = config? config->certd_sock.ctx_as<certd_fds_t>(): NULL;
unless (socks && socks->size()) {
return false;
}
// pick a socket
unsigned tries = MIN(2, socks->size());
size_t num = hash(buf, len);
next_try:
certd_fd_t* sock = (*socks)[num % socks->size()];
unless (tries--) {
// TODO: collapse calls for the same input/domain
log(info, "ipc: cannot find free socket for '%s'", buf);
return false;
}
if (sock->is_busy()) {
// due to hashing and increment, there is a chance we can enqueue it if the hostname matches
const char* busy_req = sock->is_busy();
if (strcmp(busy_req, buf) == 0) {
sock->set_busy(buf, cfd, handler, handler_ctx);
log(debug, "enqueued '%s' for %d on existing instance", buf, cfd);
return true;
}
num++;
goto next_try;
}
int sockfd = sock->fd_get();
if (sockfd == -1) {
num++;
goto next_try;
}
switch (separator_write(sockfd, buf, len)) {
case TRI_FALSE:
sock->fd_failed();
num++;
goto next_try;
break;
case TRI_NONE:
num++; // blocked?
goto next_try;
break;
case TRI_TRUE:
break;
}
sock->set_busy(buf, cfd, handler, handler_ctx);
log(debug, "enqueued '%s' for %d", buf, cfd);
return true;
}
void ipc_dequeue(int fd) {
certd_fd_t* sock = fds[fd];
if (!sock) {
log(notice, "ipc: cannot dequeue");
return;
}
sock->unset_busy(fd);
}