#include "ipc.hpp"


class certd_fd_t; // fwd
static FdTable<certd_fd_t*> fds; ///< certd or client fds


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


class MultiHandler {
    private:
        typedef struct {
            int fd;
            ipc_handler_t handler;
            void* handler_ctx;
        } handler_t;
        std::vector<handler_t> handlers;

    public:
        ~MultiHandler() { reset(); } ///< as reset()
        void enqueue(int, ipc_handler_t, void*); ///< adds client fd and handler to be called
        void dequeue(int); ///< removes previously added client fd, and deletes it from #fds - O(n)
        void handle(char*, size_t); ///< calls and removes all handlers, and deletes them from #fds
        void reset(); ///< dummy-calls and removes all handlers, and deletes them from #fds
};


void MultiHandler::enqueue(int fd, ipc_handler_t handler, void* handler_ctx) {
    handlers.push_back((handler_t){fd, handler, handler_ctx});
}


void MultiHandler::dequeue(int fd) {
    for (std::vector<handler_t>::iterator it=handlers.begin(); it!=handlers.end(); ++it) {
        if (it->fd == fd) {
            assert(fds[it->fd] != NULL);
            fds[it->fd] = NULL;
            handlers.erase(it);
            return;
        }
    }
    assert(false);
}


void MultiHandler::handle(char* buf, size_t len) {
    for (std::vector<handler_t>::iterator it=handlers.begin(); it!=handlers.end(); ++it) {
        assert(fds[it->fd] != NULL);
        fds[it->fd] = NULL;
        it->handler(it->fd, it->handler_ctx, buf, len);
    }
    handlers.clear();
}


void MultiHandler::reset() {
    handle(NULL, 0);
}


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


class fd_t {
    private:
        int fd;
        char* const path; ///< copy for reconnecting from @see slab_pool
        const poll_handler_t handler;
        certd_fd_t* const parent; ///< for maintaining @see fds
        time_t last_connect;

    public:
        fd_t(const char*, poll_handler_t, certd_fd_t*);
        ~fd_t();

        int get(); ///< get fd ((re-)connect first if reasonable)
        void failed(); ///< close fd obtained via get() due to i/o error
};


fd_t::fd_t(const char* _path, poll_handler_t _handler, certd_fd_t* _parent): fd(-1), path(slab_pool.dup(_path)), handler(_handler), parent(_parent), last_connect(0) {
}


fd_t::~fd_t() {
    failed();
    slab_pool.push(path);
}


int fd_t::get() {
    if (likely(fd != -1)) {
        return fd;
    } else if (last_connect >= NOW - 30) {
        return -1;
    } else {
        log(io, "(re-)connecting to '%s'", path);
        fd = uds_connect(path);
        last_connect = NOW;
        if (fd != -1) {
            assert(fds[fd] == NULL);
            fds[fd] = parent;
            Poll::getInst()->add(fd, handler, EVENT_CLOSE|EVENT_IN, false);
        }
        return fd;
    }
}


void fd_t::failed() {
    if (fd == -1) {
        return;
    }
    log(io, "closing %d to '%s'", fd, path);
    assert(fds[fd] == parent);
    fds[fd] = NULL;
    Poll::getInst()->del(fd);
    EINTR_RETRY(close(fd));
    fd = -1;
}


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


class certd_fd_t {
    private:
        fd_t fd; ///< certd fd
        MultiHandler clients; ///< client fds to wake up
        char request[1024];

        static bool line_handler(int, char*, size_t); ///< incoming data on the given certd #fd
        static void poll_handler(int, event_t, unsigned, void*); ///< incoming event on given certd #fd, reads, and calls line_handler()

    public:
        certd_fd_t(const char*);

        int fd_get();
        void fd_failed(); ///< some i/o failed on fd obtained from fd_get(), might wakeup client

        const char* is_busy() const; ///< whether there is a request pending on fd_get() and which one if so
        void set_busy(const char*, int, ipc_handler_t, void*); ///< just wrote this request to fd_get() for the given client fd, or add fd to #clients if the request matches
        void unset_busy(int); ///< set_busy() fd is not interested anymore, discard results
};


certd_fd_t::certd_fd_t(const char* path): fd(path, &poll_handler, this) {
    *request = '\0';
}


int certd_fd_t::fd_get() {
    return fd.get();
}


void certd_fd_t::fd_failed() {
    fd.failed();
    clients.reset();
    *request = '\0';
}


const char* certd_fd_t::is_busy() const {
    return *request? request: NULL;
}


void certd_fd_t::set_busy(const char* req, int cfd, ipc_handler_t handler, void* handler_ctx) {
    assert(*req != '\0');
    if (!is_busy()) {
        assert(strlen(req) < sizeof(request));
        strcpy(request, req);
    } else {
        assert(strcmp(request, req) == 0);
    }
    clients.enqueue(cfd, handler, handler_ctx);
    assert(fds[cfd] == NULL);
    fds[cfd] = this;
}


void certd_fd_t::unset_busy(int cfd) {
    clients.dequeue(cfd);
}


bool certd_fd_t::line_handler(int _fd, char* b, size_t l) {
    certd_fd_t* inst = fds[_fd];
    log(debug, "ipc result from %d", inst->fd_get());
    inst->clients.handle(b, l);
    *inst->request = '\0'; // not busy anymore
    return true; // go on
}


void certd_fd_t::poll_handler(int _fd, event_t ev, unsigned, void*) {
    certd_fd_t* inst = fds[_fd];
    assert(inst);

    // upon shutdown (i.e. wakeup broadcast), don't collect results anymore. instead, remove fd from the pollset s.t. it can get empty.
    if (unlikely(SHUTDOWN)) {
        if (!inst->is_busy() || SHUTDOWN > SHUTDOWN_GRACEFUL) {
            // close and might call handler with null
            inst->fd_failed();
            return;
        }
    }

    // don't care about events, fd is nonblocking anyhow and we will detect errors then. timeouts are not interesting here, too.
    if (separator_read(_fd, &line_handler) == TRI_FALSE) {
        inst->fd_failed();
        return;
    }

    // try again to more gracefully close fd in case of shutdown
    if (unlikely(SHUTDOWN)) {
        if (!inst->is_busy()) {
            inst->fd_failed();
            return;
        }
    }
}


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


typedef std::vector<certd_fd_t*> certd_fds_t;


static bool certd_sock_config(void*& ctx, char* value) {
    certd_fds_t* socks = (certd_fds_t*)ctx;
    if (!socks) {
        socks = new certd_fds_t();
        ctx = (void*)socks;
    }
    socks->push_back(new certd_fd_t(value));
    return true;
}


static void certd_sock_unconfig(void*& ctx) {
    if (ctx) {
        certd_fds_t* socks = (certd_fds_t*)ctx;
        for (certd_fds_t::iterator it = socks->begin(); it != socks->end(); ++it) {
            delete *it; // aborts pending requests
        }
        delete socks;
        ctx = NULL;
    }
}


CONF_DEF(config) {
    ConfigKey certd_sock;
};
CONF_INIT(config) {
    CONF_KEY_INIT(certd_sock, true, true, &certd_sock_config, &certd_sock_unconfig);
}


//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//\\//


bool ipc_enqueue(int cfd, ipc_handler_t handler, void* handler_ctx, const char* buf, size_t len) {
    // start early not accepting new requests
    if (unlikely(SHUTDOWN)) {
        return false;
    }

    // check config
    certd_fds_t* socks = config? config->certd_sock.ctx_as<certd_fds_t>(): NULL;
    unless (socks && socks->size()) {
        return false;
    }

    // pick a socket
    unsigned tries = MIN(2, socks->size());
    size_t num = hash(buf, len);
    next_try:
    certd_fd_t* sock = (*socks)[num % socks->size()];
    unless (tries--) {
        // TODO: collapse calls for the same input/domain
        log(info, "ipc: cannot find free socket for '%s'", buf);
        return false;
    }
    if (sock->is_busy()) {
        // due to hashing and increment, there is a chance we can enqueue it if the hostname matches
        const char* busy_req = sock->is_busy();
        if (strcmp(busy_req, buf) == 0) {
            sock->set_busy(buf, cfd, handler, handler_ctx);
            log(debug, "enqueued '%s' for %d on existing instance", buf, cfd);
            return true;
        }
        num++;
        goto next_try;
    }
    int sockfd = sock->fd_get();
    if (sockfd == -1) {
        num++;
        goto next_try;
    }

    switch (separator_write(sockfd, buf, len)) {
        case TRI_FALSE:
            sock->fd_failed();
            num++;
            goto next_try;
            break;
        case TRI_NONE:
            num++; // blocked?
            goto next_try;
            break;
        case TRI_TRUE:
            break;
    }

    sock->set_busy(buf, cfd, handler, handler_ctx);
    log(debug, "enqueued '%s' for %d", buf, cfd);

    return true;
}


void ipc_dequeue(int fd) {
    certd_fd_t* sock = fds[fd];
    if (!sock) {
        log(notice, "ipc: cannot dequeue");
        return;
    }
    sock->unset_busy(fd);
}