mdbx: reduce gap/backlog of linear scan inside dpl_search().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-08-04 17:08:00 +03:00
parent eac3d0499f
commit 0dd4532473

View File

@ -2985,7 +2985,7 @@ static __always_inline MDBX_dpl *dpl_sort(const MDBX_txn *txn) {
#define DP_SEARCH_CMP(dp, id) ((dp).pgno < (id)) #define DP_SEARCH_CMP(dp, id) ((dp).pgno < (id))
SEARCH_IMPL(dp_bsearch, MDBX_dp, pgno_t, DP_SEARCH_CMP) SEARCH_IMPL(dp_bsearch, MDBX_dp, pgno_t, DP_SEARCH_CMP)
static unsigned __hot mdbx_dpl_search(const MDBX_txn *txn, pgno_t pgno) { __hot static unsigned dpl_search(const MDBX_txn *txn, pgno_t pgno) {
MDBX_dpl *dl = txn->tw.dirtylist; MDBX_dpl *dl = txn->tw.dirtylist;
assert(dl->items[0].pgno == 0 && dl->items[dl->length + 1].pgno == P_INVALID); assert(dl->items[0].pgno == 0 && dl->items[dl->length + 1].pgno == P_INVALID);
if (mdbx_audit_enabled()) { if (mdbx_audit_enabled()) {
@ -3010,22 +3010,13 @@ static unsigned __hot mdbx_dpl_search(const MDBX_txn *txn, pgno_t pgno) {
return dl->length - N + 1; \ return dl->length - N + 1; \
__fallthrough __fallthrough
/* try linear search until the threshold */ /* use linear scan until the threshold */
LINEAR_SEARCH_CASE(16); /* fall through */ LINEAR_SEARCH_CASE(7); /* fall through */
LINEAR_SEARCH_CASE(15); /* fall through */ LINEAR_SEARCH_CASE(6); /* fall through */
LINEAR_SEARCH_CASE(14); /* fall through */ LINEAR_SEARCH_CASE(5); /* fall through */
LINEAR_SEARCH_CASE(13); /* fall through */ LINEAR_SEARCH_CASE(4); /* fall through */
LINEAR_SEARCH_CASE(12); /* fall through */ LINEAR_SEARCH_CASE(3); /* fall through */
LINEAR_SEARCH_CASE(11); /* fall through */ LINEAR_SEARCH_CASE(2); /* fall through */
LINEAR_SEARCH_CASE(10); /* fall through */
LINEAR_SEARCH_CASE(9); /* fall through */
LINEAR_SEARCH_CASE(8); /* fall through */
LINEAR_SEARCH_CASE(7); /* fall through */
LINEAR_SEARCH_CASE(6); /* fall through */
LINEAR_SEARCH_CASE(5); /* fall through */
LINEAR_SEARCH_CASE(4); /* fall through */
LINEAR_SEARCH_CASE(3); /* fall through */
LINEAR_SEARCH_CASE(2); /* fall through */
case 1: case 1:
if (dl->items[dl->length].pgno == pgno) if (dl->items[dl->length].pgno == pgno)
return dl->length; return dl->length;
@ -3053,7 +3044,7 @@ static __inline bool mdbx_dpl_intersect(const MDBX_txn *txn, pgno_t pgno,
MDBX_dpl *dl = txn->tw.dirtylist; MDBX_dpl *dl = txn->tw.dirtylist;
assert(dl->sorted == dl->length); assert(dl->sorted == dl->length);
assert(dl->items[0].pgno == 0 && dl->items[dl->length + 1].pgno == P_INVALID); assert(dl->items[0].pgno == 0 && dl->items[dl->length + 1].pgno == P_INVALID);
unsigned const n = mdbx_dpl_search(txn, pgno); unsigned const n = dpl_search(txn, pgno);
assert(n >= 1 && n <= dl->length + 1); assert(n >= 1 && n <= dl->length + 1);
assert(pgno <= dl->items[n].pgno); assert(pgno <= dl->items[n].pgno);
assert(pgno > dl->items[n - 1].pgno); assert(pgno > dl->items[n - 1].pgno);
@ -3075,7 +3066,7 @@ static __inline bool mdbx_dpl_intersect(const MDBX_txn *txn, pgno_t pgno,
static __always_inline unsigned mdbx_dpl_exist(MDBX_txn *txn, pgno_t pgno) { static __always_inline unsigned mdbx_dpl_exist(MDBX_txn *txn, pgno_t pgno) {
MDBX_dpl *dl = txn->tw.dirtylist; MDBX_dpl *dl = txn->tw.dirtylist;
unsigned i = mdbx_dpl_search(txn, pgno); unsigned i = dpl_search(txn, pgno);
assert((int)i > 0); assert((int)i > 0);
return (dl->items[i].pgno == pgno) ? i : 0; return (dl->items[i].pgno == pgno) ? i : 0;
} }
@ -4646,7 +4637,7 @@ static unsigned mdbx_cursor_keep(MDBX_txn *txn, MDBX_cursor *mc) {
for (unsigned i = 0; i < mc->mc_snum; ++i) { for (unsigned i = 0; i < mc->mc_snum; ++i) {
const MDBX_page *mp = mc->mc_pg[i]; const MDBX_page *mp = mc->mc_pg[i];
if (IS_MODIFIABLE(txn, mp) && !IS_SUBP(mp)) { if (IS_MODIFIABLE(txn, mp) && !IS_SUBP(mp)) {
unsigned const n = mdbx_dpl_search(txn, mp->mp_pgno); unsigned const n = dpl_search(txn, mp->mp_pgno);
if (txn->tw.dirtylist->items[n].pgno == mp->mp_pgno && if (txn->tw.dirtylist->items[n].pgno == mp->mp_pgno &&
mdbx_dpl_age(txn, n)) { mdbx_dpl_age(txn, n)) {
txn->tw.dirtylist->items[n].lru = txn->tw.dirtylru; txn->tw.dirtylist->items[n].lru = txn->tw.dirtylru;
@ -5398,7 +5389,7 @@ static int __must_check_result mdbx_page_dirty(MDBX_txn *txn, MDBX_page *mp,
rc = mdbx_pnl_insert_range(&txn->tw.reclaimed_pglist, loose->mp_pgno, 1); rc = mdbx_pnl_insert_range(&txn->tw.reclaimed_pglist, loose->mp_pgno, 1);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
unsigned di = mdbx_dpl_search(txn, loose->mp_pgno); unsigned di = dpl_search(txn, loose->mp_pgno);
mdbx_tassert(txn, txn->tw.dirtylist->items[di].ptr == loose); mdbx_tassert(txn, txn->tw.dirtylist->items[di].ptr == loose);
mdbx_dpl_remove(txn, di); mdbx_dpl_remove(txn, di);
txn->tw.loose_pages = loose->mp_next; txn->tw.loose_pages = loose->mp_next;
@ -8395,7 +8386,7 @@ static void mdbx_dpl_sift(MDBX_txn *const txn, MDBX_PNL pl,
const int end = MDBX_PNL_ASCENDING ? MDBX_PNL_SIZE(pl) + 1 : 0; const int end = MDBX_PNL_ASCENDING ? MDBX_PNL_SIZE(pl) + 1 : 0;
mdbx_tassert(txn, pl[begin] <= pl[end - step]); mdbx_tassert(txn, pl[begin] <= pl[end - step]);
unsigned r = mdbx_dpl_search(txn, pl[begin] >> spilled); unsigned r = dpl_search(txn, pl[begin] >> spilled);
mdbx_tassert(txn, dl->sorted == dl->length); mdbx_tassert(txn, dl->sorted == dl->length);
for (int i = begin; r <= dl->length;) { /* scan loop */ for (int i = begin; r <= dl->length;) { /* scan loop */
assert(i != end); assert(i != end);
@ -13621,7 +13612,7 @@ __hot static __noinline MDBX_page *page_lookup_spilled(MDBX_txn *const txn,
mdbx_search_spilled(spiller, pgno)) mdbx_search_spilled(spiller, pgno))
break; break;
const unsigned i = mdbx_dpl_search(spiller, pgno); const unsigned i = dpl_search(spiller, pgno);
mdbx_tassert(txn, (int)i > 0); mdbx_tassert(txn, (int)i > 0);
if (spiller->tw.dirtylist->items[i].pgno == pgno) { if (spiller->tw.dirtylist->items[i].pgno == pgno) {
spiller->tw.dirtylist->items[i].lru = txn->tw.dirtylru++; spiller->tw.dirtylist->items[i].lru = txn->tw.dirtylru++;