#include "io.hpp"


void IoWrapper::reset() {
    if (!is_valid()) {
        return;
    }
    log(io, "reset io: %d", info());
    if (polled) {
        Poll::getInst()->del(fd);
        polled = false;
    }
    if (ssl) {
        ssl_del(ssl);
        ssl = NULL;
    }
    if (reset_ptr) {
        *reset_ptr = NULL;
        reset_ptr = NULL;
    }
    read_empty(fd); // use shutdown to ACK pending data from client -> don't send RST upon close. TODO: maybe we want this sometimes? (but e.g. not when gracefully sending an error page)
    EINTR_RETRY(close(fd));
    fd = -1;
}


void IoWrapper::set(int _fd) {
    if (_fd == -1) {
        reset();
        return;
    }
    if (ssl) {
        assert(ssl->fd != _fd); // TODO: need downgrading from ssl to fd?
        reset();
    } else if (fd != -1) {
        assert(fd != _fd);
        if (fd == _fd) return;
        reset();
    }
    fd = _fd;
    got_close = false;
    log(io, "set io: %d", info());
}


void IoWrapper::set(ssl_t* _ssl) {
    if (!_ssl) {
        reset();
        return;
    }
    assert(_ssl->fd != -1);
    if (ssl) {
        // replacing ssl
        assert(ssl->fd != _ssl->fd);
        reset();
    } else if (fd != -1) {
        if (fd != _ssl->fd) {
            // replacing fd
            reset();
        } else {
            // upgrading from fd to ssl
            log(io, "fd %d -> ssl i/o", fd);
        }
    }
    ssl = _ssl;
    fd = _ssl->fd;
    got_close = false;
    log(io, "set io: %d", info());
}


bool IoWrapper::poll(poll_handler_t h, event_t e, bool short_timeout, void* ctx) {
    assert(fd != -1);
    if (polled) {
        if (e == EVENT_NONE) {
            return !(polled = !Poll::getInst()->del(fd));
        } else {
            return Poll::getInst()->mod(fd, h, e, short_timeout, ctx);
        }
    } else if (e != EVENT_NONE) {
        return (polled = Poll::getInst()->add(fd, h, e, short_timeout, ctx));
    } else {
        return true; // XXX: as we cannot set timeout-only atm
    }
}


bool IoWrapper::send_alert() {
    // TODO: check if a handshake is in progress?
    /*if (ssl && ssl->init_finished == TRI_FALSE) {
        return false;
    }*/

    // https://en.wikipedia.org/wiki/Transport_Layer_Security#Alert_protocol
    // 60: Export restriction
    // TODO: check for ssl/tls version?
    // TODO: might send HTTP error after handshake is completed and no data has been sent?
    static const char* alert = "\x15\x03\x01\x00\x02\x02\x3C";
    static const int alert_len = 7;
    const bool rv = (fd == -1)? false: (write(fd, alert, alert_len) == alert_len);
    log_errno(io, "write(%d, alert): %d", fd, rv);
    return rv;
}


void (*IoSource::data_in_handler)(const chain_t*) = NULL;


event_t IoSource::events(const IoWrapper* peer) const {
    assert(!peer || peer != handler);
    unless (handler->is_valid()) {
        return EVENT_NONE;
    }
    event_t ev = events();
    if (peer) {
        if (!peer->is_valid() || peer->closed()) return EVENT_NONE; // cannot do anything with the data (so both sides cannot be in temporary EVENT_TOUT unpoll state below)
        if (handler->closed()) { // we got a close but could not read until eof as we are/were full
            if (ev == EVENT_OUT) {
                return EVENT_NONE; // no writing to closed socket
            } else if (chain->is_empty()) {
                return EVENT_IN|EVENT_CLOSE; // spin for eof, other side possibly cannot make progress
            } else {
                return EVENT_TOUT; // let other side wake us up, as it can write. only non-permanent "closing" state that causes all other events to be ignored and this fd removed from the pollset.
            }
        }
    } else {
        if (ev == EVENT_NONE) return EVENT_NONE; // we're full and cannot read further read events if there is no upstream yet -> fatal
        if (handler->closed()) return EVENT_NONE; // if we already received an EVENT_CLOSE, we must not poll for anything, as this will be always implicitly listened for, causing spinning
    }
    return ev|EVENT_CLOSE; // got no close yet so we can/must listen (for it)
}


bool IoSource::do_io() {
    unless (handler->is_valid()) return false;
    bool read_something = false;

    unsigned loops = MAX_IOS;
    again:
    stats_inc(read_attempts);
    buf_t* buf = NULL;
    char* b;
    size_t l;
    if (!chain->get_space(b, l)) {
        if (chain->is_full()) {
            log(io, "read(%d): chain full", handler->info());
            return read_something; // so we won't detect an EOF yet
        }
        buf = buf_pool.pop();
        buf->get_space(b, l);
    }
    log(io, "will read(%d): %p+%zu%s", handler->info(), b, l, buf? " (new)": "");
    ssize_t rv = handler->do_read(b, l);
    stats_inc(reads);

    read_wants_write = false;
    if (rv == 0) {
        err = ENOMSG;
        log(io, "read(%d): EOF", handler->info());
        handler->reset(); // NOTE: assume we can close the write side as well for EOF (problem: e.g. write-side shutdown from client?)
    } else if (rv == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // no real error
            log(io, "read(%d): 0", handler->info());
        } else if (errno == ERESTART) { // XXX: what if the other buf writes before we retry reading?
            log_errno(debug, "read(%d): write", handler->info());
            read_wants_write = true; // as we have not read anything, we still can_read() at next call
        } else {
            err = errno;
            log_errno(debug, "read(%d)", handler->info());
            handler->reset();
        }
    } else {
        log(io, "read(%d): %zd", handler->info(), rv);
        if (rx_stats) stat_add(rx_stats, rv);
        stats_inc(read_data);
        unless (stime) stime = NOW_MSEC;
        if (buf) {
            buf->add_data((size_t)rv);
            chain->add_data(buf);
            buf = NULL;
        } else {
            chain->add_data((size_t)rv);
        }
        unless (total) {
            if (data_in_handler) data_in_handler(chain); // try this once upon first incoming data
        }
        total += (size_t)rv;
        read_something = true;
        if (handler->closed() || --loops) { // try to hit eof early in case of close event, as we cannot listen again
            goto again;
        }
    }

    if (buf) {
        buf_pool.push(buf);
    }
    return read_something;
}


event_t IoSink::events(const IoWrapper* peer) const {
    assert(!peer || peer != handler);
    unless (handler->is_valid() && !handler->closed()) {
        return EVENT_NONE; // don't try to write (or read handshake) anymore. also we don't add our EVENT_CLOSE here, what could cause spinning if we cannot read for some reason.
    }
    event_t ev = events();
    if (ev == EVENT_NONE && (!peer || !peer->is_valid())) {
        return EVENT_NONE; // no data and there won't be any read in the future (not necessarily the case for peer->closed) (or we are the initial client, there is no upstream yet and we're full)
    }
    return ev|EVENT_CLOSE; // got no close yet so we can/must listen (for it)
}


#if (SPLICE)
bool IoSink::splice_to(chain_t* _out_chain) {
    if (!_out_chain) {
        out_chain = NULL;
        return true;
    }
    if (out_chain || total) {
        return false;
    }
    out_chain = _out_chain;
    return true;
}
#endif


bool IoSink::do_io() {
    unless (handler->is_valid()) return false;
    unless (!handler->closed()) return false; // don't attempt to write anymore, as we already got EPIPE or EVENT_CLOSE
    bool wrote_something = false;

    unsigned loops = MAX_IOS;
    again:
    stats_inc(write_attempts);
    buf_t* buf = NULL; // injection buf
    size_t limit = 0;
    char* b;
    size_t l;

    unless (inject_bufs.empty()) {
        size_t pos = inject_bufs.begin()->first;
        buf_t* ibuf = inject_bufs.begin()->second;
        if (total < pos) {
            log(io, "injecting at %zu (now %zu)", pos, total);
            limit = pos-total;
        } else if (total > pos) {
            log(error, "missed injecting buffer, resetting (%zu/%zu)", pos, total);
            handler->reset();
            return wrote_something;
        } else { // total == pos
            log(io, "injecting at %zu", total);
            ibuf->get_data(b, l);
            buf = ibuf;
        }
    }

    if (!buf && !in_chain->get_data(b, l)) {
        log(io, "write(%d): chain empty", handler->info());
        return wrote_something;
    }
    if (limit) l = MIN(l, limit);
    log(io, "will write(%d): %p+%zu", handler->info(), b, l);
    ssize_t rv = handler->do_write(b, l); // TODO: some alternative writev implementation? but this wont work for ssl_write
    stats_inc(writes);

    write_wants_read = false;
    if (rv == 0) {
        log(debug, "write(%d): 0?", handler->info());
    } else if (rv == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // no real error
            log(io, "write(%d): 0", handler->info());
        } else if (errno == ERESTART) {
            log_errno(debug, "write(%d): read", handler->info());
            write_wants_read = true;
        } else if (errno == EPIPE) { // not resetting handler yet, there could still be something left for reading
            err = errno;
            log_errno(debug, "write(%d): setting closed flag", handler->info());
            handler->closed(true);
        } else {
            err = errno;
            log_errno(debug, "write(%d)", handler->info());
            handler->reset();
        }
    } else {
        bool full_write = ((size_t)rv == l);
        log(io, "write(%d): %zd", handler->info(), rv);
        unless (stime) stime = NOW_MSEC;
        stats_inc(wrote_data);
        if (buf) {
            assert(buf == inject_bufs.begin()->second);
            buf->del_data((size_t)rv);
            if (buf->has_data()) {
                log(io, "%p injection partially done", buf);
            } else {
                log(io, "%p injection done", buf);
                buf_pool.push(buf);
                inject_bufs.erase(inject_bufs.begin());
            }
        } else {
            total += (size_t)rv; // TODO: account for injected data (connot do this atm because of map-key)
            buf = in_chain->del_data((size_t)rv);
            if (buf) {
                if (out_chain) {
                    if (!out_chain->is_full()) {
                        log(io, "write(%d): recycling buf to %p", handler->info(), out_chain);
                        out_chain->add_data(buf);
                    } else {
                        log(io, "write(%d): resetting out chain", handler->info());
                        out_chain = NULL;
                        buf_pool.push(buf);
                    }
                } else {
                    log(io, "write(%d): buf done", handler->info());
                    buf_pool.push(buf);
                }
            }
        }
        wrote_something = true;
        if (full_write && --loops) {
            goto again;
        }
    }

    return wrote_something;
}


bool IoSink::set_http_req(const sockaddr_t& cip, const sockaddr_t& sip, const char* sni, size_t sni_len) {
    // there should be only one inject operation s.t. we can easy revert upon error
    unless (inject_bufs.empty()) return false;

    // find http header start
    assert(in_chain->is_http_req() == TRI_TRUE); // so we have very strict contraints wrt. data position
    char* b;
    size_t l;
    unless (in_chain->get_data(b, l)) assert(false);
    char* nl = strnchr(b, '\n', l);
    unless (nl) return false;
    const bool rn = (nl[-1] == '\r');
    ++nl; // so actually after the nl

    // allocate buf
    buf_t* hdrbuf_buf = buf_pool.pop();
    size_t tmp;
    char* buf;
    hdrbuf_buf->get_space(buf, tmp);
    char* p = buf;

    // add upstream ip and port
    p = strccpy(p, "X-Forwarded-Dest: ");
    p = addr2str(sip, p, true);
    if (rn) { *p = '\r'; ++p; }
    *p = '\n'; ++p;

    // add client ip
    p = strccpy(p, "X-Forwarded-For: ");
    p = addr2str(cip, p, false);
    if (rn) { *p = '\r'; ++p; }
    *p = '\n'; ++p;

    // add sni (if any)
    p = strccpy(p, "X-Forwarded-Host: ");
    if (sni_len) {
        p = strncpy(p, sni, sni_len) + sni_len;
    } else {
        p = addr2str(sip, p, false);
    }
    if (rn) { *p = '\r'; ++p; }
    *p = '\n'; ++p;

    // inject & done
    hdrbuf_buf->add_data(p-buf);
    inject_bufs.insert(std::pair<size_t, buf_t*>(nl-b, hdrbuf_buf)); // should succeed
    log(debug, "created proxy request");
    return true;
}


IoSink::~IoSink() {
    if (!err && !in_chain->is_empty()) { // requires in_chain to be destructed after us
        err = ECANCELED;
    }
    while (!inject_bufs.empty()) {
        buf_pool.push(inject_bufs.begin()->second);
        inject_bufs.erase(inject_bufs.begin());
    }
}


TEST(io_events) {
    struct local {
        static bool test(tristate_t valid_c, tristate_t valid_s, tristate_t upstream_data, tristate_t downstream_data, event_t ev_c, event_t ev_s, event_t ev_null_c) {
            IoWrapper src, dst;
            if (valid_c == TRI_TRUE) {
                src.set(dup(null_fd()));
            } else if (valid_c == TRI_NONE) {
                src.set(dup(null_fd()));
                src.closed(true);
            }
            if (valid_s == TRI_TRUE) {
                dst.set(dup(null_fd()));
            } else if (valid_s == TRI_NONE) {
                dst.set(dup(null_fd()));
                dst.closed(true);
            }

            chain_t upstream, downstream;
            if (upstream_data == TRI_NONE) {
                upstream.add_data(NOW_STR, sizeof(NOW_STR));
            } else if (upstream_data == TRI_TRUE) {
                while (!upstream.is_full()) {
                    upstream.add_data("", 1);
                }
            }
            if (downstream_data == TRI_NONE) {
                downstream.add_data(NOW_STR, sizeof(NOW_STR));
            } else if (downstream_data == TRI_TRUE) {
                while (!downstream.is_full()) {
                    downstream.add_data("", 1);
                }
            }

            IoSource src_in(&src, &upstream), dst_in(&dst, &downstream);
            IoSink src_out(&src, &downstream), dst_out(&dst, &upstream);

            log(io, ">%" EVENT_LOG_FMT " %" EVENT_LOG_FMT " %" EVENT_LOG_FMT, EVENT_LOG(src_in.events(&dst)|src_out.events(&dst)), EVENT_LOG(dst_in.events(&src)|dst_out.events(&src)), EVENT_LOG(src_in.events(NULL)|src_out.events(NULL)));
            if ((src_in.events(NULL)|src_out.events(NULL)) != ev_null_c) return false; // check this first, before closing below
            if ((src_in.events(&dst)|src_out.events(&dst)) == EVENT_NONE) src.reset(); // as in the main event handling, we have to re-evaluate the events if the other side gave EVENT_NONE
            if ((dst_in.events(&src)|dst_out.events(&src)) == EVENT_NONE) dst.reset();
            if ((src_in.events(&dst)|src_out.events(&dst)) == EVENT_NONE) src.reset();

            log(io, "<%" EVENT_LOG_FMT " %" EVENT_LOG_FMT " %" EVENT_LOG_FMT, EVENT_LOG(src_in.events(&dst)|src_out.events(&dst)), EVENT_LOG(dst_in.events(&src)|dst_out.events(&src)), EVENT_LOG(ev_null_c));
            if ((src_in.events(&dst)|src_out.events(&dst)) != ev_c) return false;
            if ((dst_in.events(&src)|dst_out.events(&src)) != ev_s) return false;

            // check for some basic invariants TODO: better stalls and loops detection
            if (src.closed() && event_isset(ev_c, EVENT_IN|EVENT_OUT|EVENT_CLOSE) && event_isset(ev_s, EVENT_IN|EVENT_OUT)) return false; // no listening for i/o when already closed (-> spinning) and the other side could make progress for us
            if (dst.closed() && event_isset(ev_s, EVENT_IN|EVENT_OUT|EVENT_CLOSE) && event_isset(ev_c, EVENT_IN|EVENT_OUT)) return false;
            if (src.closed() && event_isset(ev_c, EVENT_OUT)) return false; // no writing to known closed socket
            if (dst.closed() && event_isset(ev_s, EVENT_OUT)) return false;
            if (!src.closed() && event_isset(ev_c, EVENT_TOUT)) return false; // TOUT only for src if it is closed
            if (!dst.closed() && event_isset(ev_s, EVENT_TOUT)) return false;
            if (event_isset(ev_null_c, EVENT_TOUT)) return false;
            if (!src.is_valid() && ev_c != EVENT_NONE) return false; // close if other side is closed (except for writing available data)
            if (!dst.is_valid() && ev_s != EVENT_NONE) return false;
            if (ev_null_c == EVENT_CLOSE) return false; // no noop for client-only mode if no progress can be made
            if (event_isset(ev_c, EVENT_IN|EVENT_OUT) && !event_isset(ev_c, EVENT_CLOSE)) return false; // ordinary events only with close
            if (event_isset(ev_s, EVENT_IN|EVENT_OUT) && !event_isset(ev_s, EVENT_CLOSE)) return false;
            if (event_isset(ev_null_c, EVENT_IN|EVENT_OUT) && !event_isset(ev_null_c, EVENT_CLOSE)) return false;
            if (event_isset(ev_c, EVENT_CLOSE) && !event_isonly(ev_c, EVENT_IN|EVENT_OUT|EVENT_CLOSE)) return false; // close only w/ ordinary events
            if (event_isset(ev_s, EVENT_CLOSE) && !event_isonly(ev_s, EVENT_IN|EVENT_OUT|EVENT_CLOSE)) return false;
            if (event_isset(ev_null_c, EVENT_CLOSE) && !event_isonly(ev_null_c, EVENT_IN|EVENT_OUT|EVENT_CLOSE)) return false;
            if (event_isset(ev_c|ev_s, EVENT_CLOSE) && !event_isset(ev_c|ev_s, EVENT_IN|EVENT_OUT)) return false; // if we're listening some progress must be possible on one side at least
            //if (!event_isset(ev_c|ev_s, EVENT_CLOSE) && ) // TODO: if we're not listening, progress must be impossible

            return true;
        }
    };

    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_TRUE,  TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_OUT|EVENT_CLOSE, EVENT_OUT|EVENT_CLOSE)); // c: !r(full)/w(!empty); s: !r(full)/w(!empty); cc: !r(full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_TRUE,  TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_ALL,             EVENT_OUT|EVENT_CLOSE)); // c: !r(full)/w(!empty); s: r(!full)/w(!empty); cc: !r(full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_TRUE,  TRI_FALSE, EVENT_CLOSE,           EVENT_ALL,             EVENT_NONE)); // c: !r(full)/!w(empty); s: r(!full)/w(!empty); cc: !r(full)/!w(empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_NONE,  TRI_TRUE,  EVENT_ALL,             EVENT_OUT|EVENT_CLOSE, EVENT_ALL)); // c: r(!full)/w(!empty); s: !r(full)/w(!empty); cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_NONE,  TRI_NONE,  EVENT_ALL,             EVENT_ALL,             EVENT_ALL)); // c: r(!full)/w(!empty); s: r(!full)/w(!empty); cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_NONE,  TRI_FALSE, EVENT_IN|EVENT_CLOSE,  EVENT_ALL,             EVENT_IN|EVENT_CLOSE)); // c: r(!full)/!w(empty); s: r(!full)/w(!empty); cc: r(!full)/!w(empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_FALSE, TRI_TRUE,  EVENT_ALL,             EVENT_CLOSE,           EVENT_ALL)); // c: r(!full)/w(!empty); s: !r(full)/!w(empty); cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_FALSE, TRI_NONE,  EVENT_ALL,             EVENT_IN|EVENT_CLOSE,  EVENT_ALL)); // c: r(!full)/w(!empty); s: r(!full)/!w(empty); cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_TRUE,  TRI_FALSE, TRI_FALSE, EVENT_IN|EVENT_CLOSE,  EVENT_IN|EVENT_CLOSE,  EVENT_IN|EVENT_CLOSE)); // c: r(!full)/!w(empty); s: r(!full)/!w(empty); cc: r(!full)/!w(empty)

    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_TRUE,  TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE)); // c/s: see NONE/TRUE below; cc: !r(full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_TRUE,  TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE)); // cc: !r(full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_TRUE,  TRI_FALSE, EVENT_CLOSE,           EVENT_IN|EVENT_CLOSE,  EVENT_NONE)); // cc: !r(full)/!w(empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_NONE,  TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_TOUT,            EVENT_ALL)); // cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_NONE,  TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_TOUT,            EVENT_ALL));
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_NONE,  TRI_FALSE, EVENT_CLOSE,           EVENT_IN|EVENT_CLOSE,  EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_FALSE, TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_TOUT,            EVENT_ALL)); // cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_FALSE, TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_TOUT,            EVENT_ALL));
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_NONE,  TRI_FALSE, TRI_FALSE, EVENT_CLOSE,           EVENT_IN|EVENT_CLOSE,  EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)

    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_TRUE,  TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_NONE,            EVENT_OUT|EVENT_CLOSE)); // c/s: see FALSE/TRUE below: cc: !r(full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_TRUE,  TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_NONE,            EVENT_OUT|EVENT_CLOSE));
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_TRUE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE)); // cc: !r(full)/!w(empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_NONE,  TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_NONE,            EVENT_ALL)); // cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_NONE,  TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_NONE,            EVENT_ALL));
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_NONE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_FALSE, TRI_TRUE,  EVENT_OUT|EVENT_CLOSE, EVENT_NONE,            EVENT_ALL)); // cc: r(!full)/w(!empty)
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_FALSE, TRI_NONE,  EVENT_OUT|EVENT_CLOSE, EVENT_NONE,            EVENT_ALL));
    TEST_ASSERT(local::test(TRI_TRUE,  TRI_FALSE, TRI_FALSE, TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_IN|EVENT_CLOSE)); // cc: r(!full)/!w(empty)

    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_TRUE,  TRI_TRUE,  EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE)); // c: !r(close ev, no eof, data/full, s.w)/!w; s: !r(!c.w)/w(!empty)
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_TRUE,  TRI_NONE,  EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_TRUE,  TRI_FALSE, EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_NONE,  TRI_TRUE,  EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_NONE,  TRI_NONE,  EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_NONE,  TRI_FALSE, EVENT_TOUT,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_FALSE, TRI_TRUE,  EVENT_IN|EVENT_CLOSE,  EVENT_CLOSE,           EVENT_NONE)); // c: r(close ev, no eof, will spin for read)/!w; s: !r(!c.w)/!w(empty)
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_FALSE, TRI_NONE,  EVENT_IN|EVENT_CLOSE,  EVENT_CLOSE,           EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_TRUE,  TRI_FALSE, TRI_FALSE, EVENT_IN|EVENT_CLOSE,  EVENT_CLOSE,           EVENT_NONE));

    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_TRUE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE)); // c: !r(!s.w)/!w; s: !r(!c.w)/!w
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_TRUE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_TRUE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_NONE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_NONE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_NONE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_FALSE, TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_FALSE, TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_NONE,  TRI_FALSE, TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_TRUE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_TRUE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_TRUE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_NONE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_NONE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_NONE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_FALSE, TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_FALSE, TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_NONE,  TRI_FALSE, TRI_FALSE, TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));

    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_TRUE,  TRI_TRUE,  EVENT_NONE,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE)); // c: !r/!w; s: !r(!c.w)/w(!empty)
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_TRUE,  TRI_NONE,  EVENT_NONE,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_TRUE,  TRI_FALSE, EVENT_NONE,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_NONE,  TRI_TRUE,  EVENT_NONE,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_NONE,  TRI_NONE,  EVENT_NONE,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_NONE,  TRI_FALSE, EVENT_NONE,            EVENT_OUT|EVENT_CLOSE, EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_FALSE, TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE)); // c: !r/!w; s: !r(!c.w)/!w(empty)
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_FALSE, TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_TRUE,  TRI_FALSE, TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));

    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_TRUE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE)); // c: !r/!w; s: !r(!c.w)/!w;
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_TRUE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_TRUE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_NONE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_NONE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_NONE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_FALSE, TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_FALSE, TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_NONE,  TRI_FALSE, TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_TRUE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_TRUE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_TRUE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_NONE,  TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_NONE,  TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_NONE,  TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_FALSE, TRI_TRUE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_FALSE, TRI_NONE,  EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    TEST_ASSERT(local::test(TRI_FALSE, TRI_FALSE, TRI_FALSE, TRI_FALSE, EVENT_NONE,            EVENT_NONE,            EVENT_NONE));
    return true;
}


#if (SPLICE)
bool AioSink::set(const char* fn, chain_t* _chain) {
    assert((fn && _chain) || (!fn && !_chain));
    if (fn) {
        aio = AioFile::getInst(fn, true, NULL, &cb);
        if (!aio) return false;
    } else {
        if (aio) aio->finish();
        aio = NULL;
    }
    if (chain) {
        chain->reset();
    }
    chain = _chain;
    return true;
}


void AioSink::cb(intptr_t ctx) {
    if (ctx) {
        log(io, "aio: free chain buf %p", (void*)ctx);
        buf_pool.push_ts((buf_t*)ctx);
    }
}


void AioSink::do_io() {
    if (!aio) { // garbage collect and bail out
        if (chain) {
            chain->reset();
        }
        return;
    }

    again:
    char* b;
    size_t l;
    if (!chain->get_data(b, l)) {
        log(io, "aio(%d): chain empty", aio->get_fd());
        return;
    }
    buf_t* freebuf = chain->will_del_data(l);
    log(io, "will aio(%d): %p+%zu (%p)", aio->get_fd(), b, l, freebuf);

    ssize_t rv = aio->enqueue((const char*)b, l, (intptr_t)freebuf);
    if (rv == -1) {
        aio->finish();
        aio = NULL;
    } else {
        assert(rv == 0 || rv == (ssize_t)l);
        buf_t* buf = chain->del_data(l);
        assert(buf == freebuf);
        if (buf) {
            if (rv == 0) { // enqueued
                log(io, "aio: not freeing chain buf %p", buf);
            } else { // directly written
                log(io, "aio: directly free chain buf %p", buf);
                buf_pool.push(freebuf);
            }
        }
        goto again;
    }
}
#endif