#include "poll.hpp"
#include <sys/epoll.h>
#include <unistd.h>
HOOK(POST_IO, HOOK_PRIO_MID) {
delete Poll::getInst();
}
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1 << 29)
#endif
const event_t EVENT_IN = EPOLLIN;
const event_t EVENT_OUT = EPOLLOUT;
const event_t EVENT_CLOSE = EPOLLRDHUP|EPOLLERR|EPOLLHUP;
const event_t EVENT_TOUT = EPOLLPRI;
const event_t EVENT_WAKEUP = EPOLLWAKEUP;
Poll* Poll::inst = NULL;
Poll::Poll(): fd_no(0), loop_counter(0) {
assert(!inst);
inst = this;
efd = epoll_create(1);
assert(efd != -1);
}
Poll::~Poll() {
close(efd);
assert(inst);
inst = NULL;
if (fd_no) {
log(notice, "poll: %u fds active!", fd_no);
for (int i = 0; i<fds.size; ++i) {
if (fds[i].handler != NULL) {
wakeup(i);
}
}
}
}
bool Poll::ctl(int op, int cfd, event_t events) {
assert((op == EPOLL_CTL_DEL) == (events == EVENT_NONE));
assert((op == EPOLL_CTL_DEL) != (event_isset(events, EVENT_CLOSE))); // explicit default
// epoll state caching
if (op == EPOLL_CTL_MOD || op == EPOLL_CTL_DEL) {
if (events == fds[cfd].events) {
assert(op != EPOLL_CTL_DEL); // not used atm, so we won't expect multiple deletes here
log(io, "ctl(%d, %" EVENT_LOG_FMT "): skipped", cfd, EVENT_LOG(events));
return true;
} else {
fds[cfd].events = events;
}
} else {
assert(op == EPOLL_CTL_ADD);
fds[cfd].events = events;
}
// do the ctl
struct epoll_event ev = {}; // make valgrind happy
ev.data.fd = cfd;
ev.events = events;
log(debug, "ctl(%d, %d, %" EVENT_LOG_FMT ")", op, cfd, EVENT_LOG(events));
if (epoll_ctl(efd, op, cfd, &ev) == -1) {
log_errno(notice, "EPOLL_CTL_(%d,%d)", op, cfd);
return false;
}
return true;
}
bool Poll::add(int cfd, poll_handler_t handler, event_t events, bool short_timeout, void* ctx) {
if (ctl(EPOLL_CTL_ADD, cfd, events)) {
++fd_no;
fds[cfd].handler = handler;
fds[cfd].handler_ctx = ctx;
fds[cfd].short_timeout = short_timeout;
fds[cfd].invalid_loop = loop_counter; // prevent wrapping
if (short_timeout) {
short_timeouts.push(cfd, NOW);
} else {
timeouts.push(cfd, NOW);
}
return true;
}
return false;
}
bool Poll::mod(int cfd, poll_handler_t handler, event_t events, bool short_timeout, void* ctx) {
assert(handler || fds[cfd].handler);
if (handler) {
fds[cfd].handler = handler;
fds[cfd].handler_ctx = ctx;
}
if (fds[cfd].short_timeout != short_timeout) {
// so we make sure the timeout is only resetted upon fd event in wait() or timeout change, not for every (possibly noop) mod
fds[cfd].short_timeout = short_timeout;
if (short_timeout) {
timeouts.pop(cfd);
short_timeouts.push(cfd, NOW);
} else {
short_timeouts.pop(cfd);
timeouts.push(cfd, NOW);
}
}
if (events == EVENT_NONE) {
log(debug, "entering timeout-only mode for %d", cfd);
assert(false); // not used atm
return ctl(EPOLL_CTL_DEL, cfd, events); // so we support a timeout-only poll (multiple dels will be noop due to event caching)
} else {
return ctl(EPOLL_CTL_MOD, cfd, events);
}
}
bool Poll::del(int cfd) {
if (ctl(EPOLL_CTL_DEL, cfd, EVENT_NONE)) {
--fd_no;
fds[cfd].handler = NULL;
fds[cfd].handler_ctx = NULL;
fds[cfd].invalid_loop = loop_counter; // don't handle the events that might already be gathered for the current loop
timeouts.pop(cfd);
short_timeouts.pop(cfd);
return true;
}
return false;
}
bool Poll::wakeup(int fd) {
unless (fds[fd].handler) return false;
log(io, "poll: wakeup %d: %p(%p)", fd, fds[fd].handler, fds[fd].handler_ctx);
if (unlikely(fds[fd].invalid_loop == loop_counter)) return false;
if (timeouts.pop(fd)) {
timeouts.push(fd, NOW);
} else {
short_timeouts.pop(fd);
short_timeouts.push(fd, NOW);
}
fds[fd].handler(fd, EVENT_WAKEUP, loop_counter, fds[fd].handler_ctx);
return true;
}
void Poll::wakeup() {
// iterating over the whole fdtable seems to be too much overhead.
// iterating over all timeouts could cause issues in the linked list when wakeupped fds delete themselves.
// so we use the slow but safe approach for now
int maxfd = get_curr_maxfd();
log(debug, "poll: wakeup %d fds", maxfd);
for (int i=0; i<maxfd; ++i) {
wakeup(i);
}
}
void* Poll::ctx_get(int fd) {
return fds[fd].handler_ctx;
}
void Poll::wait(int tout_ms) {
struct epoll_event events[POLL_EVENTS_MAX];
const int n = epoll_wait(efd, events, POLL_EVENTS_MAX, tout_ms);
if (n == -1) {
if (errno == EINTR) {
return;
} else {
die_errno("epoll_wait()");
}
}
update_now();
++loop_counter;
for (int i=0; i<n; ++i) {
struct epoll_event* event = &events[i];
int fd = event->data.fd;
// We could reach here an fd that has already been removed before during the same iteration. Additionally, the same fd could be added right afterwards.
if (fds[fd].invalid_loop == loop_counter) {
log(debug, "ignoring events for %d, has already been deleted", fd);
continue;
}
// reset fd (including handler mask) as recently active
if (timeouts.pop(fd)) {
timeouts.push(fd, NOW);
} else {
short_timeouts.pop(fd);
short_timeouts.push(fd, NOW);
}
// call handler - this might delete the fd
log(debug, "events %" EVENT_LOG_FMT "/%" PRIu32 " for fd %d", EVENT_LOG(event->events), event->events, fd);
fds[fd].handler(fd, event->events, loop_counter, fds[fd].handler_ctx);
}
int fd;
while ((fd = timeouts.pop(NOW-MAX_IDLE)) != -1) {
timeouts.push(fd, NOW);
fds[fd].handler(fd, EVENT_TOUT, loop_counter, fds[fd].handler_ctx);
}
while ((fd = short_timeouts.pop(NOW-MAX_SHORT_IDLE)) != -1) {
short_timeouts.push(fd, NOW);
fds[fd].handler(fd, EVENT_TOUT, loop_counter, fds[fd].handler_ctx);
}
WakeUp::getInst()->run();
Periodic::getInst()->run();
Poller::getInst()->run();
// sleep a bit in case there is only little work to reduce cpu load
if (POLL_INTERVAL && fd_no < POLL_INTERVAL_MAX && n < POLL_EVENTS_MAX) {
msleep(POLL_INTERVAL);
}
}