#pragma once
#include "common.hpp"
#include "pool.hpp"
#include "files.hpp"
#include "test.hpp"
#include "hooks.hpp"
#include <aio.h> // link with -lrt
class AioFile {
public:
typedef void (*handler_t)(intptr_t); ///< If no default context or per-enqueue context is given, the AioFile is passed. XXX: w/o locking, so unsure if it can run in parallel
typedef struct {
aiocb aio; // first
intptr_t ctx;
} cb_t;
private:
static atomic_t refcount;
const int fd;
bool nonblock; ///< whether fd is still in nonblocking mode (for speculative writes) or is blocking for upcoming real aio
const handler_t handler;
intptr_t handler_ctx;
atomic_t lock;
volatile bool err;
volatile bool done;
volatile size_t dummy; ///< internal counter if no external #pending given
volatile size_t* pending; ///< refcount, gan be given externally (where it must not be changed!)
ssize_t enqueue(char*, size_t, bool, intptr_t); ///< returns how much could be instantly written, 0 for AIO (handler will be called then), -1 upon entering the error state
static void cb(sigval); ///< callback thread
AioFile(int, volatile size_t*, handler_t, intptr_t);
~AioFile(); ///< private, as destructor must not run until everything is done
public:
static AioFile* getInst(const char*, bool=true, volatile size_t* =NULL, handler_t=NULL, intptr_t=0);
static AioFile* getInst(int, volatile size_t* =NULL, handler_t=NULL, intptr_t=0); ///< will manage this non-blocking fd instead
INLINE ssize_t enqueue(char* buf, size_t len, intptr_t ctx=0) { return enqueue(buf, len, true, ctx); }
INLINE ssize_t enqueue(const char* buf, size_t len, intptr_t ctx=0) { return enqueue((char*)buf, len, false, ctx); }
INLINE int get_fd() const { return fd; }
void finish(); ///< will be deleted as soon as #pending reaches 0
static void join(); ///< return as soon as #refcount reaches 0
};
class AioFileIn {
public:
class Result {
private:
atomic_t refcount;
static TPoolPTS<Result> pool;
public:
char* buf;
ssize_t len; ///< 0 if pending, -1 upon error, length if done
static Result* getInst(size_t); ///< allocates #buf, inits #refcount to 1, returns new instance
static Result* getInst(Result*); ///< increments #refcount by 1
void delInst(); ///< frees #buf and this instance as soon as #refcount reaches 0
};
private:
static void cb(sigval); ///< callback thread
static atomic_t refcount;
public:
static Result* enqueue(const char*, int=-1); ///< NULL upon error, check #AioFileIn::Result::len otherwise, optional dir fd for openat
static void join(); ///< return as soon as #refcount reaches 0
};