#include "aio.hpp"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
static TPoolPTS<AioFile::cb_t> pool;
static int aio_errno(aiocb* aio) {
ssize_t rv = aio_error(aio);
if (rv == 0) {
rv = aio_return(aio);
if (rv != (ssize_t)aio->aio_nbytes) {
return ERANGE;
} else {
return 0;
}
} else {
return rv;
}
}
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
#if (SPLICE)
atomic_t AioFile::refcount = 0;
AioFile* AioFile::getInst(const char* fn, bool excl, volatile size_t* _pending, handler_t _handler, intptr_t _handler_ctx) { // as destructor must not run until everything is done
int _fd = open(fn, O_WRONLY|O_APPEND|O_NONBLOCK|O_CREAT|(excl? O_EXCL: O_TRUNC), 0600);
if (_fd == -1) {
log_errno(error, "open(%s)", fn);
return NULL;
}
log(io, "aio(%d) created for %s", _fd, fn);
return new AioFile(_fd, _pending, _handler, _handler_ctx);
}
AioFile* AioFile::getInst(int _fd, volatile size_t* _pending, handler_t _handler, intptr_t _handler_ctx) {
log(io, "aio(%d) created", _fd);
assert(fcntl(_fd, F_GETFL, 0) & O_NONBLOCK);
return new AioFile(_fd, _pending, _handler, _handler_ctx);
}
AioFile::AioFile(int _fd, volatile size_t* _pending, handler_t _handler, intptr_t _ctx): fd(_fd), nonblock(true), handler(_handler), handler_ctx(_ctx ?: (intptr_t)this), lock(0), err(false), done(false), pending(_pending ?: &dummy) {
*pending = 0;
assert(fd != -1);
}
ssize_t AioFile::enqueue(char* buf, size_t len, bool freebuf, intptr_t ctx) {
ssize_t rv = -1;
cb_t* aio = NULL;
// preliminary checks
assert(!done); // cannot be changed in the main thread atm
if (err || !len) {
log(io, "aio(%d) already erroneous", fd);
goto done;
}
// if there is nothing pending, we can try it w/ non-blocking write
if (nonblock && !*pending) { // ok to read w/o mutex, can only be incremented by us, also everything below
rv = write(fd, buf, len);
if (is_noop_errno(rv)) {
log_errno(debug, "aio write(%d)", fd);
// switching now to aio on blocking fd
if (!set_blocking(fd, true)) {
err = true;
goto done;
}
nonblock = false; // TODO: can switch back somewhen?
} else {
if (rv == -1) {
log_errno(error, "aio write(%d)", fd);
err = true;
} else if (rv != (ssize_t)len) { // XXX: should we handle partial writes?
log(error, "aio write(%d): %zd/%zu", fd, rv, len);
rv = -1;
err = true;
} else {
log(io, "write(%d): %zd", fd, rv);
}
goto done;
}
}
// only a shortcut of the below
#if MAX_AIO == 0
log(debug, "aio(%d): needed but disabled", fd);
err = true;
goto done;
#endif
// speculatively increase refcount/pending, has to be reverted upon error
if (atomic_add(pending, len) > MAX_AIO) {
atomic_add(pending, -len);
log(error, "aio(%d): queue overflow", fd);
err = true;
goto done;
}
aio = pool.pop();
bzero(aio, sizeof(cb_t));
aio->aio.aio_fildes = fd;
aio->aio.aio_offset = (off_t)((intptr_t(this))); // as O_APPEND is obeyed, this is not needed an can be abused for this
aio->aio.aio_buf = buf;
aio->aio.aio_nbytes = len;
aio->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
aio->aio.aio_sigevent.sigev_notify_function = &cb;
aio->aio.aio_sigevent.sigev_value.sival_ptr = aio;
aio->aio.aio_sigevent.sigev_signo = (int)freebuf; // not needed as well, so we abuse it
aio->ctx = ctx;
if (aio_write((aiocb*)aio) == 0) {
aio = NULL;
buf = NULL;
rv = 0;
} else {
log_errno(error, "aio_write(%d)", fd);
err = true;
atomic_add(pending, -len);
rv = -1;
goto done;
}
log(io, "aio(%d,%p,%zu) detatched", fd, buf, len);
done:;
if (aio) pool.push(aio);
if (freebuf) free(buf);
return rv;
}
void AioFile::cb(sigval _rv) {
cb_t* aio = (cb_t*)_rv.sival_ptr;
AioFile* inst = (AioFile*)((intptr_t)aio->aio.aio_offset);
// get info and cleanup
size_t len = aio->aio.aio_nbytes;
int rv = aio_errno(&aio->aio);
#ifdef DEBUG
errno = rv;
log_errno(io, "aio(%d,%p,%zu): %d", inst->fd, aio->aio.aio_buf, aio->aio.aio_nbytes, rv);
#endif
if (aio->aio.aio_sigevent.sigev_signo) free((void*)aio->aio.aio_buf); // casting volatile away
pool.push(aio);
// update rv
if (rv != 0) {
inst->err = true;
}
// call handler. XXX: not everything is updated at this point
if (inst->handler) {
inst->handler(aio->ctx ?: inst->handler_ctx);
}
// decrease refcount and atomically check if we're done
spin_lock(&inst->lock);
atomic_add(inst->pending, -len);
if (inst->done && !*inst->pending) {
delete inst;
return;
}
spin_unlock(&inst->lock);
}
void AioFile::finish() {
log(io, "aio(%d) input done", fd);
assert(!done);
spin_lock(&lock);
done = true;
if (!*pending) {
delete this; // TODO: pooling
return;
}
spin_unlock(&lock);
}
AioFile::~AioFile() {
assert(done);
assert(!*pending);
assert(fd != -1);
EINTR_RETRY(close(fd));
log(io, "aio(%d) deleted", fd);
}
void AioFile::join() {
while (refcount) {
msleep(10);
}
}
HOOK_ADD(&AioFile::join, POST_IO, HOOK_PRIO_MID);
TEST(aio_write) { // TODO: TEST(aio_read)
ssize_t rv;
int fd[2];
TEST_ASSERT(pipe2(fd, O_NONBLOCK) == 0);
volatile size_t pending = 0;
AioFile* aio = AioFile::getInst(fd[1], &pending);
TEST_ASSERT(aio);
char buf[512] = {};
const size_t len = sizeof(buf);
// fill up the pipe until it cannot be written
do {
rv = aio->enqueue((const char*)buf, len);
if (rv == 0) {
break;
}
TEST_ASSERT(rv == (ssize_t)len);
} while (true);
// read pipe empty to make space
do {
rv = read(fd[0], buf, len);
if (rv == -1) {
if (errno == EAGAIN) {
break;
} else {
TEST_ASSERT_MESSAGE(strerror(errno), false);
}
}
} while (true);
// now wait for completion
if (pending) {
sleep(1);
TEST_ASSERT(!pending);
}
aio->finish();
close(fd[0]);
return true;
}
#endif
//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//
atomic_t AioFileIn::refcount = 0;
TPoolPTS<AioFileIn::Result> AioFileIn::Result::pool;
AioFileIn::Result* AioFileIn::Result::getInst(size_t _len) {
Result* r = pool.pop();
r->refcount = 1;
r->len = 0;
r->buf = (char*)malloc(_len); // TODO: (push) threadsafe slab pool?
return r;
}
AioFileIn::Result* AioFileIn::Result::getInst(Result* r) {
atomic_add(&r->refcount, 1);
return r;
}
void AioFileIn::Result::delInst() {
if (atomic_add(&refcount, -1) > 0) {
return;
}
free(buf);
pool.push(this);
}
AioFileIn::Result* AioFileIn::enqueue(const char* fn, int dirfd) {
// open nonblocking
int fd;
if (dirfd == -1) {
fd = open(fn, O_RDONLY|O_NONBLOCK);
} else {
fd = openat(dirfd, fn, O_RDONLY|O_NONBLOCK);
}
if (fd == -1) {
log_errno(error, "aio open(%d, %s)", dirfd, fn);
return NULL;
}
// get file size
off_t size = file_size(fd);
if (size <= 0) {
EINTR_RETRY(close(fd));
return NULL;
}
// create result object with refcount 1
Result* result = Result::getInst(size+1);
// try to read directly
size_t preread = 0;
ssize_t rv = read(fd, result->buf, size);
if (rv == -1) {
if (is_noop_errno(rv)) { // need real aio then
log_errno(debug, "aio read(%s)", fn);
} else { // real error
log_errno(error, "aio read(%s)", fn);
EINTR_RETRY(close(fd));
result->delInst();
return NULL;
}
} else if (rv == 0) { // fatal as we checked for file size > 0
log(debug, "aio read(%s): eof", fn);
EINTR_RETRY(close(fd));
result->delInst();
return NULL;
} else if (rv == size) { // already done
EINTR_RETRY(close(fd));
result->buf[size] = '\0';
result->len = rv; // flag as done
log(io, "aio sync read(%s): %zd/%ld", fn, rv, size);
return result;
} else { // retry with real aio.
log(io, "aio read(%s): %zd/%ld", fn, rv, size);
assert(rv > 0 && rv < size);
preread = (size_t)rv;
}
// set blocking for aio
if (!set_blocking(fd, true)) {
EINTR_RETRY(close(fd));
result->delInst();
return NULL;
}
// create aio request
AioFile::cb_t* aio = pool.pop();
bzero(aio, sizeof(AioFile::cb_t));
aio->aio.aio_fildes = fd;
aio->aio.aio_offset = preread;
aio->aio.aio_buf = result->buf + preread;
aio->aio.aio_nbytes = size - preread;
aio->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
aio->aio.aio_sigevent.sigev_notify_function = &cb;
aio->aio.aio_sigevent.sigev_value.sival_ptr = aio; // itself
aio->ctx = (intptr_t)result; // the rv
Result::getInst(result); // and increase its refcount
atomic_add(&refcount, 1);
if (aio_read((aiocb*)aio) == 0) {
log(io, "aio(%d,%p,%ld) detatched", fd, result->buf, size);
return result;
} else {
log_errno(error, "aio_write(%d)", fd);
atomic_add(&refcount, -1);
EINTR_RETRY(close(fd));
result->delInst();
result->delInst(); // twice, should be freed then
pool.push(aio);
return NULL;
}
}
void AioFileIn::cb(sigval _rv) {
AioFile::cb_t* aio = (AioFile::cb_t*)_rv.sival_ptr;
int rv = aio_errno(&aio->aio);
#ifdef DEBUG
errno = rv;
log_errno(io, "aio(%d,%p,%zu)", aio->aio.aio_fildes, aio->aio.aio_buf, aio->aio.aio_nbytes);
#endif
EINTR_RETRY(close(aio->aio.aio_fildes));
Result* result = (Result*)aio->ctx;
pool.push(aio);
result->len = (rv != 0)? -1: aio->aio.aio_nbytes + aio->aio.aio_offset; // flag as done or erroneous (adding possibly preread data)
result->delInst(); // decrease refcount - free if client already aborted
atomic_add(&refcount, -1);
}
void AioFileIn::join() {
while (refcount) {
msleep(10);
}
}
HOOK_ADD(&AioFileIn::join, POST_IO, HOOK_PRIO_MID);