#include "io.hpp"
void IoWrapper::reset() {
if (!is_valid()) {
return;
}
log(io, "reset io: %d", info());
if (polled) {
Poll::getInst()->del(fd);
polled = false;
}
if (ssl) {
ssl_del(ssl);
ssl = NULL;
}
if (reset_ptr) {
*reset_ptr = NULL;
reset_ptr = NULL;
}
read_empty(fd); // use shutdown to ACK pending data from client -> don't send RST upon close. TODO: maybe we want this sometimes? (but e.g. not when gracefully sending an error page)
EINTR_RETRY(close(fd));
fd = -1;
}
void IoWrapper::set(int _fd) {
if (_fd == -1) {
reset();
return;
}
if (ssl) {
assert(ssl->fd != _fd); // TODO: need downgrading from ssl to fd?
reset();
} else if (fd != -1) {
assert(fd != _fd);
if (fd == _fd) return;
reset();
}
fd = _fd;
got_close = false;
log(io, "set io: %d", info());
}
void IoWrapper::set(ssl_t* _ssl) {
if (!_ssl) {
reset();
return;
}
assert(_ssl->fd != -1);
if (ssl) {
// replacing ssl
assert(ssl->fd != _ssl->fd);
reset();
} else if (fd != -1) {
if (fd != _ssl->fd) {
// replacing fd
reset();
} else {
// upgrading from fd to ssl
log(io, "fd %d -> ssl i/o", fd);
}
}
ssl = _ssl;
fd = _ssl->fd;
got_close = false;
log(io, "set io: %d", info());
}
bool IoWrapper::poll(poll_handler_t h, event_t e, bool short_timeout, void* ctx) {
assert(fd != -1);
if (polled) {
if (e == EVENT_NONE) {
return !(polled = !Poll::getInst()->del(fd));
} else {
return Poll::getInst()->mod(fd, h, e, short_timeout, ctx);
}
} else if (e != EVENT_NONE) {
return (polled = Poll::getInst()->add(fd, h, e, short_timeout, ctx));
} else {
return true; // XXX: as we cannot set timeout-only atm
}
}
bool IoWrapper::send_alert() {
// TODO: check if a handshake is in progress?
/*if (ssl && ssl->init_finished == TRI_FALSE) {
return false;
}*/
// https://en.wikipedia.org/wiki/Transport_Layer_Security#Alert_protocol
// 60: Export restriction
// TODO: check for ssl/tls version?
// TODO: might send HTTP error after handshake is completed and no data has been sent?
static const char* alert = "\x15\x03\x01\x00\x02\x02\x3C";
static const int alert_len = 7;
const bool rv = (fd == -1)? false: (write(fd, alert, alert_len) == alert_len);
log_errno(io, "write(%d, alert): %d", fd, rv);
return rv;
}
void (*IoSource::data_in_handler)(const chain_t*) = NULL;
event_t IoSource::events(const IoWrapper* peer) const {
assert(!peer || peer != handler);
unless (handler->is_valid()) {
return EVENT_NONE;
}
event_t ev = events();
if (peer) {
if (!peer->is_valid() || peer->closed()) return EVENT_NONE; // cannot do anything with the data (so both sides cannot be in temporary EVENT_TOUT unpoll state below)
if (handler->closed()) { // we got a close but could not read until eof as we are/were full
if (ev == EVENT_OUT) {
return EVENT_NONE; // no writing to closed socket
} else if (chain->is_empty()) {
return EVENT_IN|EVENT_CLOSE; // spin for eof, other side possibly cannot make progress
} else {
return EVENT_TOUT; // let other side wake us up, as it can write. only non-permanent "closing" state that causes all other events to be ignored and this fd removed from the pollset.
}
}
} else {
if (ev == EVENT_NONE) return EVENT_NONE; // we're full and cannot read further read events if there is no upstream yet -> fatal
if (handler->closed()) return EVENT_NONE; // if we already received an EVENT_CLOSE, we must not poll for anything, as this will be always implicitly listened for, causing spinning
}
return ev|EVENT_CLOSE; // got no close yet so we can/must listen (for it)
}
bool IoSource::do_io() {
unless (handler->is_valid()) return false;
bool read_something = false;
unsigned loops = MAX_IOS;
again:
stats_inc(read_attempts);
buf_t* buf = NULL;
char* b;
size_t l;
if (!chain->get_space(b, l)) {
if (chain->is_full()) {
log(io, "read(%d): chain full", handler->info());
return read_something; // so we won't detect an EOF yet
}
buf = buf_pool.pop();
buf->get_space(b, l);
}
log(io, "will read(%d): %p+%zu%s", handler->info(), b, l, buf? " (new)": "");
ssize_t rv = handler->do_read(b, l);
stats_inc(reads);
read_wants_write = false;
if (rv == 0) {
err = ENOMSG;
log(io, "read(%d): EOF", handler->info());
handler->reset(); // NOTE: assume we can close the write side as well for EOF (problem: e.g. write-side shutdown from client?)
} else if (rv == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // no real error
log(io, "read(%d): 0", handler->info());
} else if (errno == ERESTART) { // XXX: what if the other buf writes before we retry reading?
log_errno(debug, "read(%d): write", handler->info());
read_wants_write = true; // as we have not read anything, we still can_read() at next call
} else {
err = errno;
log_errno(debug, "read(%d)", handler->info());
handler->reset();
}
} else {
log(io, "read(%d): %zd", handler->info(), rv);
if (rx_stats) stat_add(rx_stats, rv);
stats_inc(read_data);
unless (stime) stime = NOW_MSEC;
if (buf) {
buf->add_data((size_t)rv);
chain->add_data(buf);
buf = NULL;
} else {
chain->add_data((size_t)rv);
}
unless (total) {
if (data_in_handler) data_in_handler(chain); // try this once upon first incoming data
}
total += (size_t)rv;
read_something = true;
if (handler->closed() || --loops) { // try to hit eof early in case of close event, as we cannot listen again
goto again;
}
}
if (buf) {
buf_pool.push(buf);
}
return read_something;
}
event_t IoSink::events(const IoWrapper* peer) const {
assert(!peer || peer != handler);
unless (handler->is_valid() && !handler->closed()) {
return EVENT_NONE; // don't try to write (or read handshake) anymore. also we don't add our EVENT_CLOSE here, what could cause spinning if we cannot read for some reason.
}
event_t ev = events();
if (ev == EVENT_NONE && (!peer || !peer->is_valid())) {
return EVENT_NONE; // no data and there won't be any read in the future (not necessarily the case for peer->closed) (or we are the initial client, there is no upstream yet and we're full)
}
return ev|EVENT_CLOSE; // got no close yet so we can/must listen (for it)
}
#if (SPLICE)
bool IoSink::splice_to(chain_t* _out_chain) {
if (!_out_chain) {
out_chain = NULL;
return true;
}
if (out_chain || total) {
return false;
}
out_chain = _out_chain;
return true;
}
#endif
bool IoSink::do_io() {
unless (handler->is_valid()) return false;
unless (!handler->closed()) return false; // don't attempt to write anymore, as we already got EPIPE or EVENT_CLOSE
bool wrote_something = false;
unsigned loops = MAX_IOS;
again:
stats_inc(write_attempts);
buf_t* buf = NULL; // injection buf
size_t limit = 0;
char* b;
size_t l;
unless (inject_bufs.empty()) {
size_t pos = inject_bufs.begin()->first;
buf_t* ibuf = inject_bufs.begin()->second;
if (total < pos) {
log(io, "injecting at %zu (now %zu)", pos, total);
limit = pos-total;
} else if (total > pos) {
log(error, "missed injecting buffer, resetting (%zu/%zu)", pos, total);
handler->reset();
return wrote_something;
} else { // total == pos
log(io, "injecting at %zu", total);
ibuf->get_data(b, l);
buf = ibuf;
}
}
if (!buf && !in_chain->get_data(b, l)) {
log(io, "write(%d): chain empty", handler->info());
return wrote_something;
}
if (limit) l = MIN(l, limit);
log(io, "will write(%d): %p+%zu", handler->info(), b, l);
ssize_t rv = handler->do_write(b, l); // TODO: some alternative writev implementation? but this wont work for ssl_write
stats_inc(writes);
write_wants_read = false;
if (rv == 0) {
log(debug, "write(%d): 0?", handler->info());
} else if (rv == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // no real error
log(io, "write(%d): 0", handler->info());
} else if (errno == ERESTART) {
log_errno(debug, "write(%d): read", handler->info());
write_wants_read = true;
} else if (errno == EPIPE) { // not resetting handler yet, there could still be something left for reading
err = errno;
log_errno(debug, "write(%d): setting closed flag", handler->info());
handler->closed(true);
} else {
err = errno;
log_errno(debug, "write(%d)", handler->info());
handler->reset();
}
} else {
bool full_write = ((size_t)rv == l);
log(io, "write(%d): %zd", handler->info(), rv);
unless (stime) stime = NOW_MSEC;
stats_inc(wrote_data);
if (buf) {
assert(buf == inject_bufs.begin()->second);
buf->del_data((size_t)rv);
if (buf->has_data()) {
log(io, "%p injection partially done", buf);
} else {
log(io, "%p injection done", buf);
buf_pool.push(buf);
inject_bufs.erase(inject_bufs.begin());
}
} else {
total += (size_t)rv; // TODO: account for injected data (connot do this atm because of map-key)
buf = in_chain->del_data((size_t)rv);
if (buf) {
if (out_chain) {
if (!out_chain->is_full()) {
log(io, "write(%d): recycling buf to %p", handler->info(), out_chain);
out_chain->add_data(buf);
} else {
log(io, "write(%d): resetting out chain", handler->info());
out_chain = NULL;
buf_pool.push(buf);
}
} else {
log(io, "write(%d): buf done", handler->info());
buf_pool.push(buf);
}
}
}
wrote_something = true;
if (full_write && --loops) {
goto again;
}
}
return wrote_something;
}
bool IoSink::set_http_req(const sockaddr_t& cip, const sockaddr_t& sip, const char* sni, size_t sni_len) {
// there should be only one inject operation s.t. we can easy revert upon error
unless (inject_bufs.empty()) return false;
// find http header start
assert(in_chain->is_http_req() == TRI_TRUE); // so we have very strict contraints wrt. data position
char* b;
size_t l;
unless (in_chain->get_data(b, l)) assert(false);
char* nl = strnchr(b, '\n', l);
unless (nl) return false;
const bool rn = (nl[-1] == '\r');
++nl; // so actually after the nl
// allocate buf
buf_t* hdrbuf_buf = buf_pool.pop();
size_t tmp;
char* buf;
hdrbuf_buf->get_space(buf, tmp);
char* p = buf;
// add upstream ip and port
p = strccpy(p, "X-Forwarded-Dest: ");
p = addr2str(sip, p, true);
if (rn) { *p = '\r'; ++p; }
*p = '\n'; ++p;
// add client ip
p = strccpy(p, "X-Forwarded-For: ");
p = addr2str(cip, p, false);
if (rn) { *p = '\r'; ++p; }
*p = '\n'; ++p;
// add sni (if any)
p = strccpy(p, "X-Forwarded-Host: ");
if (sni_len) {
p = strncpy(p, sni, sni_len) + sni_len;
} else {
p = addr2str(sip, p, false);
}
if (rn) { *p = '\r'; ++p; }
*p = '\n'; ++p;
// inject & done
hdrbuf_buf->add_data(p-buf);
inject_bufs.insert(std::pair<size_t, buf_t*>(nl-b, hdrbuf_buf)); // should succeed
log(debug, "created proxy request");
return true;
}
IoSink::~IoSink() {
if (!err && !in_chain->is_empty()) { // requires in_chain to be destructed after us
err = ECANCELED;
}
while (!inject_bufs.empty()) {
buf_pool.push(inject_bufs.begin()->second);
inject_bufs.erase(inject_bufs.begin());
}
}
TEST(io_events) {
struct local {
static bool test(tristate_t valid_c, tristate_t valid_s, tristate_t upstream_data, tristate_t downstream_data, event_t ev_c, event_t ev_s, event_t ev_null_c) {
IoWrapper src, dst;
if (valid_c == TRI_TRUE) {
src.set(dup(null_fd()));
} else if (valid_c == TRI_NONE) {
src.set(dup(null_fd()));
src.closed(true);
}
if (valid_s == TRI_TRUE) {
dst.set(dup(null_fd()));
} else if (valid_s == TRI_NONE) {
dst.set(dup(null_fd()));
dst.closed(true);
}
chain_t upstream, downstream;
if (upstream_data == TRI_NONE) {
upstream.add_data(NOW_STR, sizeof(NOW_STR));
} else if (upstream_data == TRI_TRUE) {
while (!upstream.is_full()) {
upstream.add_data("", 1);
}
}
if (downstream_data == TRI_NONE) {
downstream.add_data(NOW_STR, sizeof(NOW_STR));
} else if (downstream_data == TRI_TRUE) {
while (!downstream.is_full()) {
downstream.add_data("", 1);
}
}
IoSource src_in(&src, &upstream), dst_in(&dst, &downstream);
IoSink src_out(&src, &downstream), dst_out(&dst, &upstream);
log(io, ">%" EVENT_LOG_FMT " %" EVENT_LOG_FMT " %" EVENT_LOG_FMT, EVENT_LOG(src_in.events(&dst)|src_out.events(&dst)), EVENT_LOG(dst_in.events(&src)|dst_out.events(&src)), EVENT_LOG(src_in.events(NULL)|src_out.events(NULL)));
if ((src_in.events(NULL)|src_out.events(NULL)) != ev_null_c) return false; // check this first, before closing below
if ((src_in.events(&dst)|src_out.events(&dst)) == EVENT_NONE) src.reset(); // as in the main event handling, we have to re-evaluate the events if the other side gave EVENT_NONE
if ((dst_in.events(&src)|dst_out.events(&src)) == EVENT_NONE) dst.reset();
if ((src_in.events(&dst)|src_out.events(&dst)) == EVENT_NONE) src.reset();
log(io, "<%" EVENT_LOG_FMT " %" EVENT_LOG_FMT " %" EVENT_LOG_FMT, EVENT_LOG(src_in.events(&dst)|src_out.events(&dst)), EVENT_LOG(dst_in.events(&src)|dst_out.events(&src)), EVENT_LOG(ev_null_c));
if ((src_in.events(&dst)|src_out.events(&dst)) != ev_c) return false;
if ((dst_in.events(&src)|dst_out.events(&src)) != ev_s) return false;
// check for some basic invariants TODO: better stalls and loops detection
if (src.closed() && event_isset(ev_c, EVENT_IN|EVENT_OUT|EVENT_CLOSE) && event_isset(ev_s, EVENT_IN|EVENT_OUT)) return false; // no listening for i/o when already closed (-> spinning) and the other side could make progress for us
if (dst.closed() && event_isset(ev_s, EVENT_IN|EVENT_OUT|EVENT_CLOSE) && event_isset(ev_c, EVENT_IN|EVENT_OUT)) return false;
if (src.closed() && event_isset(ev_c, EVENT_OUT)) return false; // no writing to known closed socket
if (dst.closed() && event_isset(ev_s, EVENT_OUT)) return false;
if (!src.closed() && event_isset(ev_c, EVENT_TOUT)) return false; // TOUT only for src if it is closed
if (!dst.closed() && event_isset(ev_s, EVENT_TOUT)) return false;
if (event_isset(ev_null_c, EVENT_TOUT)) return false;
if (!src.is_valid() && ev_c != EVENT_NONE) return false; // close if other side is closed (except for writing available data)
if (!dst.is_valid() && ev_s != EVENT_NONE) return false;
if (ev_null_c == EVENT_CLOSE) return false; // no noop for client-only mode if no progress can be made
if (event_isset(ev_c, EVENT_IN|EVENT_OUT) && !event_isset(ev_c, EVENT_CLOSE)) return false; // ordinary events only with close
if (event_isset(ev_s, EVENT_IN|EVENT_OUT) && !event_isset(ev_s, EVENT_CLOSE)) return false;
if (event_isset(ev_null_c, EVENT_IN|EVENT_OUT) && !event_isset(ev_null_c, EVENT_CLOSE)) return false;
if (event_isset(ev_c, EVENT_CLOSE) && !event_isonly(ev_c, EVENT_IN|EVENT_OUT|EVENT_CLOSE)) return false; // close only w/ ordinary events
if (event_isset(ev_s, EVENT_CLOSE) && !event_isonly(ev_s, EVENT_IN|EVENT_OUT|EVENT_CLOSE)) return false;
if (event_isset(ev_null_c, EVENT_CLOSE) && !event_isonly(ev_null_c, EVENT_IN|EVENT_OUT|EVENT_CLOSE)) return false;
if (event_isset(ev_c|ev_s, EVENT_CLOSE) && !event_isset(ev_c|ev_s, EVENT_IN|EVENT_OUT)) return false; // if we're listening some progress must be possible on one side at least
//if (!event_isset(ev_c|ev_s, EVENT_CLOSE) && ) // TODO: if we're not listening, progress must be impossible
return true;
}
};
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_TRUE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_OUT|EVENT_CLOSE, EVENT_OUT|EVENT_CLOSE)); // c: !r(full)/w(!empty); s: !r(full)/w(!empty); cc: !r(full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_TRUE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_ALL, EVENT_OUT|EVENT_CLOSE)); // c: !r(full)/w(!empty); s: r(!full)/w(!empty); cc: !r(full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_TRUE, TRI_FALSE, EVENT_CLOSE, EVENT_ALL, EVENT_NONE)); // c: !r(full)/!w(empty); s: r(!full)/w(!empty); cc: !r(full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_NONE, TRI_TRUE, EVENT_ALL, EVENT_OUT|EVENT_CLOSE, EVENT_ALL)); // c: r(!full)/w(!empty); s: !r(full)/w(!empty); cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_NONE, TRI_NONE, EVENT_ALL, EVENT_ALL, EVENT_ALL)); // c: r(!full)/w(!empty); s: r(!full)/w(!empty); cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_NONE, TRI_FALSE, EVENT_IN|EVENT_CLOSE, EVENT_ALL, EVENT_IN|EVENT_CLOSE)); // c: r(!full)/!w(empty); s: r(!full)/w(!empty); cc: r(!full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_FALSE, TRI_TRUE, EVENT_ALL, EVENT_CLOSE, EVENT_ALL)); // c: r(!full)/w(!empty); s: !r(full)/!w(empty); cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_FALSE, TRI_NONE, EVENT_ALL, EVENT_IN|EVENT_CLOSE, EVENT_ALL)); // c: r(!full)/w(!empty); s: r(!full)/!w(empty); cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_TRUE, TRI_FALSE, TRI_FALSE, EVENT_IN|EVENT_CLOSE, EVENT_IN|EVENT_CLOSE, EVENT_IN|EVENT_CLOSE)); // c: r(!full)/!w(empty); s: r(!full)/!w(empty); cc: r(!full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_TRUE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE)); // c/s: see NONE/TRUE below; cc: !r(full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_TRUE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE)); // cc: !r(full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_TRUE, TRI_FALSE, EVENT_CLOSE, EVENT_IN|EVENT_CLOSE, EVENT_NONE)); // cc: !r(full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_NONE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_TOUT, EVENT_ALL)); // cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_NONE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_TOUT, EVENT_ALL));
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_NONE, TRI_FALSE, EVENT_CLOSE, EVENT_IN|EVENT_CLOSE, EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_FALSE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_TOUT, EVENT_ALL)); // cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_FALSE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_TOUT, EVENT_ALL));
TEST_ASSERT(local::test(TRI_TRUE, TRI_NONE, TRI_FALSE, TRI_FALSE, EVENT_CLOSE, EVENT_IN|EVENT_CLOSE, EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_TRUE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE)); // c/s: see FALSE/TRUE below: cc: !r(full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_TRUE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE));
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_TRUE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE)); // cc: !r(full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_NONE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE, EVENT_ALL)); // cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_NONE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE, EVENT_ALL));
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_NONE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_FALSE, TRI_TRUE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE, EVENT_ALL)); // cc: r(!full)/w(!empty)
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_FALSE, TRI_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE, EVENT_ALL));
TEST_ASSERT(local::test(TRI_TRUE, TRI_FALSE, TRI_FALSE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_TRUE, TRI_TRUE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE, EVENT_NONE)); // c: !r(close ev, no eof, data/full, s.w)/!w; s: !r(!c.w)/w(!empty)
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_TRUE, TRI_NONE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_TRUE, TRI_FALSE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_NONE, TRI_TRUE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_NONE, TRI_NONE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_NONE, TRI_FALSE, EVENT_TOUT, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_FALSE, TRI_TRUE, EVENT_IN|EVENT_CLOSE, EVENT_CLOSE, EVENT_NONE)); // c: r(close ev, no eof, will spin for read)/!w; s: !r(!c.w)/!w(empty)
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_FALSE, TRI_NONE, EVENT_IN|EVENT_CLOSE, EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_TRUE, TRI_FALSE, TRI_FALSE, EVENT_IN|EVENT_CLOSE, EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_TRUE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE)); // c: !r(!s.w)/!w; s: !r(!c.w)/!w
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_TRUE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_TRUE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_NONE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_NONE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_NONE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_FALSE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_FALSE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_NONE, TRI_FALSE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_TRUE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_TRUE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_TRUE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_NONE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_NONE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_NONE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_FALSE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_FALSE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_NONE, TRI_FALSE, TRI_FALSE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_TRUE, TRI_TRUE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE)); // c: !r/!w; s: !r(!c.w)/w(!empty)
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_TRUE, TRI_NONE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_TRUE, TRI_FALSE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_NONE, TRI_TRUE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_NONE, TRI_NONE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_NONE, TRI_FALSE, EVENT_NONE, EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_FALSE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE)); // c: !r/!w; s: !r(!c.w)/!w(empty)
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_FALSE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE, TRI_FALSE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_TRUE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE)); // c: !r/!w; s: !r(!c.w)/!w;
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_TRUE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_TRUE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_NONE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_NONE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_NONE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_FALSE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_FALSE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE, TRI_FALSE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_TRUE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_TRUE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_TRUE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_NONE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_NONE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_NONE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_FALSE, TRI_TRUE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_FALSE, TRI_NONE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_FALSE, TRI_FALSE, EVENT_NONE, EVENT_NONE, EVENT_NONE));
return true;
}
#if (SPLICE)
bool AioSink::set(const char* fn, chain_t* _chain) {
assert((fn && _chain) || (!fn && !_chain));
if (fn) {
aio = AioFile::getInst(fn, true, NULL, &cb);
if (!aio) return false;
} else {
if (aio) aio->finish();
aio = NULL;
}
if (chain) {
chain->reset();
}
chain = _chain;
return true;
}
void AioSink::cb(intptr_t ctx) {
if (ctx) {
log(io, "aio: free chain buf %p", (void*)ctx);
buf_pool.push_ts((buf_t*)ctx);
}
}
void AioSink::do_io() {
if (!aio) { // garbage collect and bail out
if (chain) {
chain->reset();
}
return;
}
again:
char* b;
size_t l;
if (!chain->get_data(b, l)) {
log(io, "aio(%d): chain empty", aio->get_fd());
return;
}
buf_t* freebuf = chain->will_del_data(l);
log(io, "will aio(%d): %p+%zu (%p)", aio->get_fd(), b, l, freebuf);
ssize_t rv = aio->enqueue((const char*)b, l, (intptr_t)freebuf);
if (rv == -1) {
aio->finish();
aio = NULL;
} else {
assert(rv == 0 || rv == (ssize_t)l);
buf_t* buf = chain->del_data(l);
assert(buf == freebuf);
if (buf) {
if (rv == 0) { // enqueued
log(io, "aio: not freeing chain buf %p", buf);
} else { // directly written
log(io, "aio: directly free chain buf %p", buf);
buf_pool.push(freebuf);
}
}
goto again;
}
}
#endif