#include "aio.hpp"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>


static TPoolPTS<AioFile::cb_t> pool;


static int aio_errno(aiocb* aio) {
    ssize_t rv = aio_error(aio);
    if (rv == 0) {
        rv = aio_return(aio);
        if (rv != (ssize_t)aio->aio_nbytes) {
            return ERANGE;
        } else {
            return 0;
        }
    } else {
        return rv;
    }
}


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


#if (SPLICE)
atomic_t AioFile::refcount = 0;


AioFile* AioFile::getInst(const char* fn, bool excl, volatile size_t* _pending, handler_t _handler, intptr_t _handler_ctx) { // as destructor must not run until everything is done
    int _fd = open(fn, O_WRONLY|O_APPEND|O_NONBLOCK|O_CREAT|(excl? O_EXCL: O_TRUNC), 0600);
    if (_fd == -1) {
        log_errno(error, "open(%s)", fn);
        return NULL;
    }
    log(io, "aio(%d) created for %s", _fd, fn);
    return new AioFile(_fd, _pending, _handler, _handler_ctx);
}


AioFile* AioFile::getInst(int _fd, volatile size_t* _pending, handler_t _handler, intptr_t _handler_ctx) {
    log(io, "aio(%d) created", _fd);
    assert(fcntl(_fd, F_GETFL, 0) & O_NONBLOCK);
    return new AioFile(_fd, _pending, _handler, _handler_ctx);
}


AioFile::AioFile(int _fd, volatile size_t* _pending, handler_t _handler, intptr_t _ctx): fd(_fd), nonblock(true), handler(_handler), handler_ctx(_ctx ?: (intptr_t)this), lock(0), err(false), done(false), pending(_pending ?: &dummy) {
    *pending = 0;
    assert(fd != -1);
}


ssize_t AioFile::enqueue(char* buf, size_t len, bool freebuf, intptr_t ctx) {
    ssize_t rv = -1;
    cb_t* aio = NULL;

    // preliminary checks
    assert(!done); // cannot be changed in the main thread atm
    if (err || !len) {
        log(io, "aio(%d) already erroneous", fd);
        goto done;
    }

    // if there is nothing pending, we can try it w/ non-blocking write
    if (nonblock && !*pending) { // ok to read w/o mutex, can only be incremented by us, also everything below
        rv = write(fd, buf, len);
        if (is_noop_errno(rv)) {
            log_errno(debug, "aio write(%d)", fd);
            // switching now to aio on blocking fd
            if (!set_blocking(fd, true)) {
                err = true;
                goto done;
            }
            nonblock = false; // TODO: can switch back somewhen?
        } else {
            if (rv == -1) {
                log_errno(error, "aio write(%d)", fd);
                err = true;
            } else if (rv != (ssize_t)len) { // XXX: should we handle partial writes?
                log(error, "aio write(%d): %zd/%zu", fd, rv, len);
                rv = -1;
                err = true;
            } else {
                log(io, "write(%d): %zd", fd, rv);
            }
            goto done;
        }
    }

    // only a shortcut of the below
    #if MAX_AIO == 0
        log(debug, "aio(%d): needed but disabled", fd);
        err = true;
        goto done;
    #endif

    // speculatively increase refcount/pending, has to be reverted upon error
    if (atomic_add(pending, len) > MAX_AIO) {
        atomic_add(pending, -len);
        log(error, "aio(%d): queue overflow", fd);
        err = true;
        goto done;
    }

    aio = pool.pop();
    bzero(aio, sizeof(cb_t));
    aio->aio.aio_fildes = fd;
    aio->aio.aio_offset = (off_t)((intptr_t(this))); // as O_APPEND is obeyed, this is not needed an can be abused for this
    aio->aio.aio_buf = buf;
    aio->aio.aio_nbytes = len;
    aio->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
    aio->aio.aio_sigevent.sigev_notify_function = &cb;
    aio->aio.aio_sigevent.sigev_value.sival_ptr = aio;
    aio->aio.aio_sigevent.sigev_signo = (int)freebuf; // not needed as well, so we abuse it
    aio->ctx = ctx;

    if (aio_write((aiocb*)aio) == 0) {
        aio = NULL;
        buf = NULL;
        rv = 0;
    } else {
        log_errno(error, "aio_write(%d)", fd);
        err = true;
        atomic_add(pending, -len);
        rv = -1;
        goto done;
    }

    log(io, "aio(%d,%p,%zu) detatched", fd, buf, len);
    done:;
    if (aio) pool.push(aio);
    if (freebuf) free(buf);
    return rv;
}


void AioFile::cb(sigval _rv) {
    cb_t* aio = (cb_t*)_rv.sival_ptr;
    AioFile* inst = (AioFile*)((intptr_t)aio->aio.aio_offset);

    // get info and cleanup
    size_t len = aio->aio.aio_nbytes;
    int rv = aio_errno(&aio->aio);
    #ifdef DEBUG
        errno = rv;
        log_errno(io, "aio(%d,%p,%zu): %d", inst->fd, aio->aio.aio_buf, aio->aio.aio_nbytes, rv);
    #endif
    if (aio->aio.aio_sigevent.sigev_signo) free((void*)aio->aio.aio_buf); // casting volatile away
    pool.push(aio);

    // update rv
    if (rv != 0) {
        inst->err = true;
    }

    // call handler. XXX: not everything is updated at this point
    if (inst->handler) {
        inst->handler(aio->ctx ?: inst->handler_ctx);
    }

    // decrease refcount and atomically check if we're done
    spin_lock(&inst->lock);
    atomic_add(inst->pending, -len);
    if (inst->done && !*inst->pending) {
        delete inst;
        return;
    }
    spin_unlock(&inst->lock);
}


void AioFile::finish() {
    log(io, "aio(%d) input done", fd);
    assert(!done);
    spin_lock(&lock);
    done = true;
    if (!*pending) {
        delete this; // TODO: pooling
        return;
    }
    spin_unlock(&lock);
}


AioFile::~AioFile() {
    assert(done);
    assert(!*pending);
    assert(fd != -1);
    EINTR_RETRY(close(fd));
    log(io, "aio(%d) deleted", fd);
}


void AioFile::join() {
    while (refcount) {
        msleep(10);
    }
}
HOOK_ADD(&AioFile::join, POST_IO, HOOK_PRIO_MID);


TEST(aio_write) { // TODO: TEST(aio_read)
    ssize_t rv;

    int fd[2];
    TEST_ASSERT(pipe2(fd, O_NONBLOCK) == 0);

    volatile size_t pending = 0;
    AioFile* aio = AioFile::getInst(fd[1], &pending);
    TEST_ASSERT(aio);

    char buf[512] = {};
    const size_t len = sizeof(buf);

    // fill up the pipe until it cannot be written
    do {
        rv = aio->enqueue((const char*)buf, len);
        if (rv == 0) {
            break;
        }
        TEST_ASSERT(rv == (ssize_t)len);
    } while (true);

    // read pipe empty to make space
    do {
        rv = read(fd[0], buf, len);
        if (rv == -1) {
            if (errno == EAGAIN) {
                break;
            } else {
                TEST_ASSERT_MESSAGE(strerror(errno), false);
            }
        }
    } while (true);

    // now wait for completion
    if (pending) {
        sleep(1);
        TEST_ASSERT(!pending);
    }

    aio->finish();
    close(fd[0]);
    return true;
}
#endif


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


atomic_t AioFileIn::refcount = 0;


TPoolPTS<AioFileIn::Result> AioFileIn::Result::pool;


AioFileIn::Result* AioFileIn::Result::getInst(size_t _len) {
    Result* r = pool.pop();
    r->refcount = 1;
    r->len = 0;
    r->buf = (char*)malloc(_len); // TODO: (push) threadsafe slab pool?
    return r;
}


AioFileIn::Result* AioFileIn::Result::getInst(Result* r) {
    atomic_add(&r->refcount, 1);
    return r;
}


void AioFileIn::Result::delInst() {
    if (atomic_add(&refcount, -1) > 0) {
        return;
    }
    free(buf);
    pool.push(this);
}


AioFileIn::Result* AioFileIn::enqueue(const char* fn, int dirfd) {
    // open nonblocking
    int fd;
    if (dirfd == -1) {
        fd = open(fn, O_RDONLY|O_NONBLOCK);
    } else {
        fd = openat(dirfd, fn, O_RDONLY|O_NONBLOCK);
    }
    if (fd == -1) {
        log_errno(error, "aio open(%d, %s)", dirfd, fn);
        return NULL;
    }

    // get file size
    off_t size = file_size(fd);
    if (size <= 0) {
        EINTR_RETRY(close(fd));
        return NULL;
    }

    // create result object with refcount 1
    Result* result = Result::getInst(size+1);

    // try to read directly
    size_t preread = 0;
    ssize_t rv = read(fd, result->buf, size);
    if (rv == -1) {
        if (is_noop_errno(rv)) { // need real aio then
            log_errno(debug, "aio read(%s)", fn);
        } else { // real error
            log_errno(error, "aio read(%s)", fn);
            EINTR_RETRY(close(fd));
            result->delInst();
            return NULL;
        }
    } else if (rv == 0) { // fatal as we checked for file size > 0
        log(debug, "aio read(%s): eof", fn);
        EINTR_RETRY(close(fd));
        result->delInst();
        return NULL;
    } else if (rv == size) { // already done
        EINTR_RETRY(close(fd));
        result->buf[size] = '\0';
        result->len = rv; // flag as done
        log(io, "aio sync read(%s): %zd/%ld", fn, rv, size);
        return result;
    } else { // retry with real aio.
        log(io, "aio read(%s): %zd/%ld", fn, rv, size);
        assert(rv > 0 && rv < size);
        preread = (size_t)rv;
    }

    // set blocking for aio
    if (!set_blocking(fd, true)) {
        EINTR_RETRY(close(fd));
        result->delInst();
        return NULL;
    }

    // create aio request
    AioFile::cb_t* aio = pool.pop();
    bzero(aio, sizeof(AioFile::cb_t));
    aio->aio.aio_fildes = fd;
    aio->aio.aio_offset = preread;
    aio->aio.aio_buf = result->buf + preread;
    aio->aio.aio_nbytes = size - preread;
    aio->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
    aio->aio.aio_sigevent.sigev_notify_function = &cb;
    aio->aio.aio_sigevent.sigev_value.sival_ptr = aio; // itself
    aio->ctx = (intptr_t)result; // the rv
    Result::getInst(result); // and increase its refcount

    atomic_add(&refcount, 1);
    if (aio_read((aiocb*)aio) == 0) {
        log(io, "aio(%d,%p,%ld) detatched", fd, result->buf, size);
        return result;
    } else {
        log_errno(error, "aio_write(%d)", fd);
        atomic_add(&refcount, -1);
        EINTR_RETRY(close(fd));
        result->delInst();
        result->delInst(); // twice, should be freed then
        pool.push(aio);
        return NULL;
    }
}


void AioFileIn::cb(sigval _rv) {
    AioFile::cb_t* aio = (AioFile::cb_t*)_rv.sival_ptr;

    int rv = aio_errno(&aio->aio);
    #ifdef DEBUG
        errno = rv;
        log_errno(io, "aio(%d,%p,%zu)", aio->aio.aio_fildes, aio->aio.aio_buf, aio->aio.aio_nbytes);
    #endif

    EINTR_RETRY(close(aio->aio.aio_fildes));
    Result* result = (Result*)aio->ctx;
    pool.push(aio);

    result->len = (rv != 0)? -1: aio->aio.aio_nbytes + aio->aio.aio_offset; // flag as done or erroneous (adding possibly preread data)
    result->delInst(); // decrease refcount - free if client already aborted

    atomic_add(&refcount, -1);
}


void AioFileIn::join() {
    while (refcount) {
        msleep(10);
    }
}
HOOK_ADD(&AioFileIn::join, POST_IO, HOOK_PRIO_MID);