mdbx: add mdbx_cursor_get_batch().

Resolve https://github.com/erthink/libmdbx/issues/236
This commit is contained in:
Leonid Yuriev
2021-12-11 02:56:19 +03:00
parent 32e495021f
commit 6f2c1e52ad
5 changed files with 230 additions and 3 deletions

View File

@@ -3722,7 +3722,8 @@ static void mdbx_node_del(MDBX_cursor *mc, size_t ksize);
static void mdbx_node_shrink(MDBX_page *mp, unsigned indx);
static int __must_check_result mdbx_node_move(MDBX_cursor *csrc,
MDBX_cursor *cdst, bool fromleft);
static int __must_check_result mdbx_node_read(MDBX_cursor *mc, MDBX_node *leaf,
static int __must_check_result mdbx_node_read(MDBX_cursor *mc,
const MDBX_node *leaf,
MDBX_val *data,
const txnid_t front);
static int __must_check_result mdbx_rebalance(MDBX_cursor *mc);
@@ -13572,8 +13573,9 @@ __hot static int mdbx_page_search(MDBX_cursor *mc, const MDBX_val *key,
* [out] data Updated to point to the node's data.
*
* Returns 0 on success, non-zero on failure. */
static __always_inline int mdbx_node_read(MDBX_cursor *mc, MDBX_node *node,
MDBX_val *data, const txnid_t front) {
static __always_inline int mdbx_node_read(MDBX_cursor *mc,
const MDBX_node *node, MDBX_val *data,
const txnid_t front) {
data->iov_len = node_ds(node);
data->iov_base = node_data(node);
if (unlikely(F_ISSET(node_flags(node), F_BIGDATA))) {
@@ -14551,6 +14553,122 @@ int mdbx_cursor_get(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
return rc;
}
static int cursor_first_batch(MDBX_cursor *mc) {
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
int err = mdbx_page_search(mc, NULL, MDBX_PS_FIRST);
if (unlikely(err != MDBX_SUCCESS))
return err;
}
mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]));
mc->mc_flags |= C_INITIALIZED;
mc->mc_flags &= ~C_EOF;
mc->mc_ki[mc->mc_top] = 0;
return MDBX_SUCCESS;
}
static int cursor_next_batch(MDBX_cursor *mc) {
if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
return cursor_first_batch(mc);
MDBX_page *mp = mc->mc_pg[mc->mc_top];
if (unlikely(mc->mc_flags & C_EOF)) {
if ((unsigned)mc->mc_ki[mc->mc_top] + 1 >= page_numkeys(mp))
return MDBX_NOTFOUND;
mc->mc_flags ^= C_EOF;
}
int ki = mc->mc_ki[mc->mc_top];
mc->mc_ki[mc->mc_top] = (indx_t)++ki;
const int numkeys = page_numkeys(mp);
if (likely(ki >= numkeys)) {
mdbx_debug("%s", "=====> move to next sibling page");
mc->mc_ki[mc->mc_top] = (indx_t)(numkeys - 1);
int err = mdbx_cursor_sibling(mc, SIBLING_RIGHT);
if (unlikely(err != MDBX_SUCCESS)) {
mc->mc_flags |= C_EOF;
return err;
}
mp = mc->mc_pg[mc->mc_top];
mdbx_debug("next page is %" PRIaPGNO ", key index %u", mp->mp_pgno,
mc->mc_ki[mc->mc_top]);
}
return MDBX_SUCCESS;
}
int mdbx_cursor_get_batch(MDBX_cursor *mc, size_t *count, MDBX_val *pairs,
size_t limit, MDBX_cursor_op op) {
if (unlikely(mc == NULL || count == NULL || limit < 4))
return MDBX_EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_LIVE))
return (mc->mc_signature == MDBX_MC_READY4CLOSE) ? MDBX_EINVAL
: MDBX_EBADSIGN;
int rc = check_txn(mc->mc_txn, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(mc->mc_db->md_flags & MDBX_DUPSORT))
return MDBX_INCOMPATIBLE /* must be a non-dupsort subDB */;
switch (op) {
case MDBX_FIRST:
rc = cursor_first_batch(mc);
break;
case MDBX_NEXT:
rc = cursor_next_batch(mc);
break;
case MDBX_GET_CURRENT:
rc = likely(mc->mc_flags & C_INITIALIZED) ? MDBX_SUCCESS : MDBX_ENODATA;
break;
default:
mdbx_debug("unhandled/unimplemented cursor operation %u", op);
rc = EINVAL;
break;
}
if (unlikely(rc != MDBX_SUCCESS)) {
*count = 0;
return rc;
}
const MDBX_page *const page = mc->mc_pg[mc->mc_top];
const unsigned nkeys = page_numkeys(page);
unsigned i = mc->mc_ki[mc->mc_top], n = 0;
if (unlikely(i >= nkeys)) {
mdbx_cassert(mc, op == MDBX_GET_CURRENT);
mdbx_cassert(mc, mdbx_cursor_on_last(mc) == MDBX_RESULT_TRUE);
*count = 0;
if (mc->mc_flags & C_EOF) {
mdbx_cassert(mc, mdbx_cursor_on_last(mc) == MDBX_RESULT_TRUE);
return MDBX_ENODATA;
}
if (mdbx_cursor_on_last(mc) != MDBX_RESULT_TRUE)
return MDBX_EINVAL /* again MDBX_GET_CURRENT after MDBX_GET_CURRENT */;
mc->mc_flags |= C_EOF;
return MDBX_NOTFOUND;
}
const txnid_t pp_txnid = pp_txnid4chk(page, mc->mc_txn);
do {
if (unlikely(n + 2 > limit)) {
rc = MDBX_RESULT_TRUE;
break;
}
const MDBX_node *leaf = page_node(page, i);
get_key(leaf, &pairs[n]);
rc = mdbx_node_read(mc, leaf, &pairs[n + 1], pp_txnid);
if (unlikely(rc != MDBX_SUCCESS))
break;
n += 2;
} while (++i < nkeys);
mc->mc_ki[mc->mc_top] = (indx_t)i;
*count = n;
return rc;
}
static int mdbx_touch_dbi(MDBX_cursor *mc) {
mdbx_cassert(mc, (*mc->mc_dbistate & DBI_DIRTY) == 0);
*mc->mc_dbistate |= DBI_DIRTY;