mdbx: рефакторинг и микро-оптимизация cursor_next|_prev() для dupsort-узлов.

- меньше сравнений и переходов.
 - вложенный курсор всегда сбрасывается/очищается при переходе с dupsort-узла.
This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2023-11-26 01:11:12 +03:00
parent 225f548339
commit 10abf73191

View File

@ -3391,7 +3391,8 @@ static int __must_check_result cursor_last(MDBX_cursor *mc, MDBX_val *key,
static int __must_check_result cursor_init(MDBX_cursor *mc, const MDBX_txn *txn,
size_t dbi);
static int __must_check_result cursor_xinit0(MDBX_cursor *mc);
static int __must_check_result cursor_xinit1(MDBX_cursor *mc, MDBX_node *node,
static int __must_check_result cursor_xinit1(MDBX_cursor *mc,
const MDBX_node *node,
const MDBX_page *mp);
static int __must_check_result cursor_xinit2(MDBX_cursor *mc,
MDBX_xcursor *src_mx,
@ -16616,39 +16617,41 @@ static int cursor_sibling(MDBX_cursor *mc, int dir) {
/* Move the cursor to the next data item. */
static int cursor_next(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
MDBX_cursor_op op) {
MDBX_page *mp;
MDBX_node *node;
assert(op == MDBX_NEXT || op == MDBX_NEXT_DUP || op == MDBX_NEXT_NODUP);
int rc;
if (unlikely(mc->mc_flags & C_DEL) && op == MDBX_NEXT_DUP)
return MDBX_NOTFOUND;
if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
if (unlikely(!(mc->mc_flags & C_INITIALIZED))) {
if (unlikely(mc->mc_flags & C_SUB))
return MDBX_NOTFOUND;
return cursor_first(mc, key, data);
}
mp = mc->mc_pg[mc->mc_top];
const MDBX_page *mp = mc->mc_pg[mc->mc_top];
if (unlikely(mc->mc_flags & C_EOF)) {
if (mc->mc_ki[mc->mc_top] + (size_t)1 >= page_numkeys(mp))
return MDBX_NOTFOUND;
mc->mc_flags ^= C_EOF;
}
if (mc->mc_db->md_flags & MDBX_DUPSORT) {
node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
if (op == MDBX_NEXT || op == MDBX_NEXT_DUP) {
if (mc->mc_xcursor) {
if (op != MDBX_NEXT_NODUP) {
const MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
rc = cursor_next(&mc->mc_xcursor->mx_cursor, data, NULL, MDBX_NEXT);
if (op != MDBX_NEXT || rc != MDBX_NOTFOUND) {
if (likely(rc == MDBX_SUCCESS))
get_key_optional(node, key);
return rc;
if (likely(rc == MDBX_SUCCESS)) {
get_key_optional(node, key);
return MDBX_SUCCESS;
}
if (unlikely(rc != MDBX_NOTFOUND))
return rc;
}
} else {
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED | C_EOF);
if (op == MDBX_NEXT_DUP)
if (op != MDBX_NEXT)
return MDBX_NOTFOUND;
}
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED | C_EOF);
}
DEBUG("cursor_next: top page is %" PRIaPGNO " in cursor %p", mp->mp_pgno,
@ -16692,7 +16695,7 @@ skip:
return MDBX_SUCCESS;
}
node = page_node(mp, mc->mc_ki[mc->mc_top]);
const MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
rc = cursor_xinit1(mc, node, mp);
if (unlikely(rc != MDBX_SUCCESS))
@ -16713,40 +16716,41 @@ skip:
/* Move the cursor to the previous data item. */
static int cursor_prev(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
MDBX_cursor_op op) {
MDBX_page *mp;
MDBX_node *node;
assert(op == MDBX_PREV || op == MDBX_PREV_DUP || op == MDBX_PREV_NODUP);
int rc;
if (unlikely(mc->mc_flags & C_DEL) && op == MDBX_PREV_DUP)
return MDBX_NOTFOUND;
if (unlikely(!(mc->mc_flags & C_INITIALIZED))) {
if (unlikely(mc->mc_flags & C_SUB))
return MDBX_NOTFOUND;
rc = cursor_last(mc, key, data);
if (unlikely(rc))
if (unlikely(rc != MDBX_SUCCESS))
return rc;
mc->mc_ki[mc->mc_top]++;
}
mp = mc->mc_pg[mc->mc_top];
if ((mc->mc_db->md_flags & MDBX_DUPSORT) &&
mc->mc_ki[mc->mc_top] < page_numkeys(mp)) {
node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
if (op == MDBX_PREV || op == MDBX_PREV_DUP) {
rc = cursor_prev(&mc->mc_xcursor->mx_cursor, data, NULL, MDBX_PREV);
if (op != MDBX_PREV || rc != MDBX_NOTFOUND) {
const MDBX_page *mp = mc->mc_pg[mc->mc_top];
if (mc->mc_xcursor) {
if (op != MDBX_PREV_NODUP) {
if (likely(mc->mc_ki[mc->mc_top] < page_numkeys(mp))) {
const MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
rc = cursor_prev(&mc->mc_xcursor->mx_cursor, data, NULL, MDBX_PREV);
if (likely(rc == MDBX_SUCCESS)) {
get_key_optional(node, key);
mc->mc_flags &= ~C_EOF;
return MDBX_SUCCESS;
}
return rc;
if (unlikely(rc != MDBX_NOTFOUND))
return rc;
}
}
} else {
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED | C_EOF);
if (op == MDBX_PREV_DUP)
if (op != MDBX_PREV)
return MDBX_NOTFOUND;
}
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED | C_EOF);
}
DEBUG("cursor_prev: top page is %" PRIaPGNO " in cursor %p", mp->mp_pgno,
@ -16782,8 +16786,7 @@ static int cursor_prev(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
return MDBX_SUCCESS;
}
node = page_node(mp, mc->mc_ki[mc->mc_top]);
const MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
rc = cursor_xinit1(mc, node, mp);
if (unlikely(rc != MDBX_SUCCESS))
@ -17234,6 +17237,8 @@ static __hot int cursor_get(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
return rc;
}
} else {
cASSERT(mc, !mc->mc_xcursor || !(mc->mc_xcursor->mx_cursor.mc_flags &
C_INITIALIZED));
rc = node_read(mc, node, data, mp);
if (unlikely(rc))
return rc;
@ -19025,7 +19030,7 @@ static int cursor_xinit0(MDBX_cursor *mc) {
* [in] mc The main cursor whose sorted-dups cursor is to be initialized.
* [in] node The data containing the MDBX_db record for the sorted-dup database.
*/
static int cursor_xinit1(MDBX_cursor *mc, MDBX_node *node,
static int cursor_xinit1(MDBX_cursor *mc, const MDBX_node *node,
const MDBX_page *mp) {
MDBX_xcursor *mx = mc->mc_xcursor;
if (!MDBX_DISABLE_VALIDATION && unlikely(mx == nullptr)) {