diff --git a/src/elements/core.c b/src/elements/core.c index a278660e..02e50ff9 100644 --- a/src/elements/core.c +++ b/src/elements/core.c @@ -1465,9 +1465,8 @@ static int lcklist_detach_locked(MDBX_env *env) { } /*------------------------------------------------------------------------------ - * LY: State of the art quicksort-based sorting, with internal stack and - * shell-insertion-sort for small chunks (less than half of SORT_THRESHOLD). - */ + * LY: State of the art quicksort-based sorting, with internal stack + * and bitonic-sort for small chunks. */ #define SORT_CMP_SWAP(TYPE, CMP, a, b) \ do { \ @@ -1942,24 +1941,10 @@ static int lcklist_detach_locked(MDBX_env *env) { SORT_CMP_SWAP(TYPE, CMP, begin[8], begin[9]); \ } while (0) -#define SORT_SHELLPASS(TYPE, CMP, begin, end, gap) \ - for (TYPE *i = begin + gap; i < end; ++i) { \ - for (TYPE *j = i - (gap); j >= begin && CMP(*i, *j); j -= gap) { \ - const TYPE tmp = *i; \ - do { \ - j[gap] = *j; \ - j -= gap; \ - } while (j >= begin && CMP(tmp, *j)); \ - j[gap] = tmp; \ - break; \ - } \ - } - -#define SORT_INTRA(TYPE, CMP, begin, end, len) \ +#define SORT_INNER(TYPE, CMP, begin, end, len) \ switch (len) { \ default: \ - SORT_SHELLPASS(TYPE, CMP, begin, end, 8); \ - SORT_SHELLPASS(TYPE, CMP, begin, end, 1); \ + __unreachable(); \ case 0: \ case 1: \ break; \ @@ -2010,8 +1995,6 @@ static int lcklist_detach_locked(MDBX_env *env) { break; \ } -#define SORT_THRESHOLD 16 - #define SORT_SWAP(TYPE, a, b) \ do { \ const TYPE swap_tmp = (a); \ @@ -2033,7 +2016,14 @@ static int lcklist_detach_locked(MDBX_env *env) { high = top->hi; \ } while (0) -#define SORT_IMPL(NAME, TYPE, CMP) \ +#define SORT_IMPL(NAME, EXPECT_LOW_CARDINALITY_OR_PRESORTED, TYPE, CMP) \ + \ + static __inline bool NAME##_is_sorted(const TYPE *first, const TYPE *last) { \ + while (++first <= last) \ + if (CMP(first[0], first[-1])) \ + return false; \ + return true; \ + } \ \ typedef struct { \ TYPE *lo, *hi; \ @@ -2046,9 +2036,9 @@ static int lcklist_detach_locked(MDBX_env *env) { TYPE *lo = begin; \ while (true) { \ const ptrdiff_t len = hi - lo; \ - if (len < SORT_THRESHOLD) { \ - SORT_INTRA(TYPE, CMP, lo, hi + 1, len + 1); \ - if (top == stack) \ + if (len < 16) { \ + SORT_INNER(TYPE, CMP, lo, hi + 1, len + 1); \ + if (unlikely(top == stack)) \ break; \ SORT_POP(lo, hi); \ continue; \ @@ -2061,22 +2051,25 @@ static int lcklist_detach_locked(MDBX_env *env) { \ TYPE *right = hi - 1; \ TYPE *left = lo + 1; \ - do { \ - while (CMP(*mid, *right)) \ - --right; \ + while (1) { \ while (CMP(*left, *mid)) \ ++left; \ - if (left < right) { \ - SORT_SWAP(TYPE, *left, *right); \ - mid = (mid == left) ? right : (mid == right) ? left : mid; \ - ++left; \ - --right; \ - } else if (left == right) { \ - ++left; \ + while (CMP(*mid, *right)) \ --right; \ + if (unlikely(left > right)) { \ + if (EXPECT_LOW_CARDINALITY_OR_PRESORTED) { \ + if (NAME##_is_sorted(lo, right)) \ + lo = right + 1; \ + if (NAME##_is_sorted(left, hi)) \ + hi = left; \ + } \ break; \ } \ - } while (left <= right); \ + SORT_SWAP(TYPE, *left, *right); \ + mid = (mid == left) ? right : (mid == right) ? left : mid; \ + ++left; \ + --right; \ + } \ \ if (right - lo > hi - left) { \ SORT_PUSH(lo, right); \ @@ -2375,7 +2368,7 @@ static void __hot mdbx_pnl_xmerge(MDBX_PNL dst, const MDBX_PNL src) { assert(mdbx_pnl_check4assert(dst, MAX_PAGENO + 1)); } -SORT_IMPL(pgno_sort, pgno_t, MDBX_PNL_ORDERED) +SORT_IMPL(pgno_sort, false, pgno_t, MDBX_PNL_ORDERED) static __hot void mdbx_pnl_sort(MDBX_PNL pnl) { pgno_sort(MDBX_PNL_BEGIN(pnl), MDBX_PNL_END(pnl)); assert(mdbx_pnl_check(pnl, MAX_PAGENO + 1)); @@ -2485,7 +2478,7 @@ static __always_inline void mdbx_txl_xappend(MDBX_TXL tl, txnid_t id) { } #define TXNID_SORT_CMP(first, last) ((first) > (last)) -SORT_IMPL(txnid_sort, txnid_t, TXNID_SORT_CMP) +SORT_IMPL(txnid_sort, false, txnid_t, TXNID_SORT_CMP) static void mdbx_txl_sort(MDBX_TXL tl) { txnid_sort(MDBX_PNL_BEGIN(tl), MDBX_PNL_END(tl)); } @@ -2503,7 +2496,7 @@ static int __must_check_result mdbx_txl_append(MDBX_TXL *ptl, txnid_t id) { /*----------------------------------------------------------------------------*/ #define DP_SORT_CMP(first, last) ((first).pgno < (last).pgno) -SORT_IMPL(dp_sort, MDBX_DP, DP_SORT_CMP) +SORT_IMPL(dp_sort, false, MDBX_DP, DP_SORT_CMP) static __always_inline MDBX_DPL mdbx_dpl_sort(MDBX_DPL dl) { assert(dl->length <= MDBX_DPL_TXNFULL); assert(dl->sorted <= dl->length); @@ -3255,8 +3248,7 @@ static __cold __maybe_unused bool mdbx_dirtylist_check(MDBX_txn *txn) { if (unlikely(loose != txn->tw.loose_count)) return false; - if (txn->tw.dirtylist->length - txn->tw.dirtylist->sorted < - SORT_THRESHOLD / 2) { + if (txn->tw.dirtylist->length - txn->tw.dirtylist->sorted < 16) { for (unsigned i = 1; i <= MDBX_PNL_SIZE(txn->tw.retired_pages); ++i) { const MDBX_page *const dp = mdbx_dpl_find(txn->tw.dirtylist, txn->tw.retired_pages[i]);