diff --git a/src/elements/core.c b/src/elements/core.c index 8c2cd3c8..2a175d13 100644 --- a/src/elements/core.c +++ b/src/elements/core.c @@ -829,6 +829,121 @@ static int lcklist_detach_locked(MDBX_env *env) { return rc; } +/*------------------------------------------------------------------------------ + * LY: State of the art quicksort-based sorting, with internal stack and + * shell-insertion-sort for small chunks (less than half of SORT_THRESHOLD). + */ + +/* LY: Large threshold give some boost due less overhead in the inner qsort + * loops, but also a penalty in cases reverse-sorted data. + * So, 42 is magically but reasonable: + * - 0-3% faster than std::sort (from GNU C++ STL 2018) in most cases. + * - slower by a few ticks in a few cases for sequences shorter than 21. */ +#define SORT_THRESHOLD 42 + +#define SORT_SWAP(TYPE, a, b) \ + do { \ + const TYPE swap_tmp = (a); \ + (a) = (b); \ + (b) = swap_tmp; \ + } while (0) + +#define SORT_SHELLPASS(TYPE, CMP, begin, end, gap) \ + for (TYPE *i = begin + gap; i < end; ++i) { \ + for (TYPE *j = i - (gap); j >= begin && CMP(*i, *j); j -= gap) { \ + const TYPE tmp = *i; \ + do { \ + j[gap] = *j; \ + j -= gap; \ + } while (j >= begin && CMP(tmp, *j)); \ + j[gap] = tmp; \ + break; \ + } \ + } + +#define SORT_PUSH(low, high) \ + do { \ + top->lo = (low); \ + top->hi = (high); \ + ++top; \ + } while (0) + +#define SORT_POP(low, high) \ + do { \ + --top; \ + low = top->lo; \ + high = top->hi; \ + } while (0) + +#define SORT_IMPL(NAME, TYPE, CMP) \ + \ + typedef struct { \ + TYPE *lo, *hi; \ + } NAME##_stack; \ + \ + static __hot void NAME(TYPE *const begin, TYPE *const end) { \ + const ptrdiff_t length = end - begin; \ + if (length < 2) \ + return; \ + \ + if (length > SORT_THRESHOLD / 2) { \ + NAME##_stack stack[sizeof(unsigned) * CHAR_BIT], *top = stack; \ + \ + TYPE *hi = end - 1; \ + TYPE *lo = begin; \ + while (top >= stack) { \ + TYPE *mid = lo + ((hi - lo) >> 1); \ + if (CMP(*mid, *lo)) \ + SORT_SWAP(TYPE, *mid, *lo); \ + if (CMP(*hi, *mid)) { \ + SORT_SWAP(TYPE, *hi, *mid); \ + if (CMP(*mid, *lo)) \ + SORT_SWAP(TYPE, *mid, *lo); \ + } \ + \ + TYPE *right = hi - 1; \ + TYPE *left = lo + 1; \ + do { \ + while (CMP(*mid, *right)) \ + --right; \ + while (CMP(*left, *mid)) \ + ++left; \ + if (left < right) { \ + SORT_SWAP(TYPE, *left, *right); \ + if (mid == left) \ + mid = right; \ + else if (mid == right) \ + mid = left; \ + ++left; \ + --right; \ + } else if (left == right) { \ + ++left; \ + --right; \ + break; \ + } \ + } while (left <= right); \ + \ + if (lo + SORT_THRESHOLD > right) { \ + if (left + SORT_THRESHOLD > hi) \ + SORT_POP(lo, hi); \ + else \ + lo = left; \ + } else if (left + SORT_THRESHOLD > hi) \ + hi = right; \ + else if (right - lo > hi - left) { \ + SORT_PUSH(lo, right); \ + lo = left; \ + } else { \ + SORT_PUSH(left, hi); \ + hi = right; \ + } \ + } \ + } \ + \ + SORT_SHELLPASS(TYPE, CMP, begin, end, 8); \ + SORT_SHELLPASS(TYPE, CMP, begin, end, 1); \ + } + /*----------------------------------------------------------------------------*/ static __inline size_t pnl2bytes(const size_t size) { @@ -1007,82 +1122,9 @@ static void __hot mdbx_pnl_xmerge(MDBX_PNL pnl, MDBX_PNL merge) { assert(mdbx_pnl_check(pnl, true)); } -/* Sort an PNL */ -static void __hot mdbx_pnl_sort(MDBX_PNL pnl) { - /* Max possible depth of int-indexed tree * 2 items/level */ - int istack[sizeof(int) * CHAR_BIT * 2]; - int i, j, k, l, ir, jstack; - pgno_t a; - -/* Quicksort + Insertion sort for small arrays */ -#define PNL_SMALL 8 -#define PNL_SWAP(a, b) \ - do { \ - pgno_t tmp_pgno = (a); \ - (a) = (b); \ - (b) = tmp_pgno; \ - } while (0) - - ir = (int)MDBX_PNL_SIZE(pnl); - l = 1; - jstack = 0; - while (1) { - if (ir - l < PNL_SMALL) { /* Insertion sort */ - for (j = l + 1; j <= ir; j++) { - a = pnl[j]; - for (i = j - 1; i >= 1; i--) { - if (MDBX_PNL_DISORDERED(a, pnl[i])) - break; - pnl[i + 1] = pnl[i]; - } - pnl[i + 1] = a; - } - if (jstack == 0) - break; - ir = istack[jstack--]; - l = istack[jstack--]; - } else { - k = (l + ir) >> 1; /* Choose median of left, center, right */ - PNL_SWAP(pnl[k], pnl[l + 1]); - if (MDBX_PNL_ORDERED(pnl[ir], pnl[l])) - PNL_SWAP(pnl[l], pnl[ir]); - - if (MDBX_PNL_ORDERED(pnl[ir], pnl[l + 1])) - PNL_SWAP(pnl[l + 1], pnl[ir]); - - if (MDBX_PNL_ORDERED(pnl[l + 1], pnl[l])) - PNL_SWAP(pnl[l], pnl[l + 1]); - - i = l + 1; - j = ir; - a = pnl[l + 1]; - while (1) { - do - i++; - while (MDBX_PNL_ORDERED(pnl[i], a)); - do - j--; - while (MDBX_PNL_ORDERED(a, pnl[j])); - if (j < i) - break; - PNL_SWAP(pnl[i], pnl[j]); - } - pnl[l + 1] = pnl[j]; - pnl[j] = a; - jstack += 2; - if (ir - i + 1 >= j - l) { - istack[jstack] = ir; - istack[jstack - 1] = i; - ir = j - 1; - } else { - istack[jstack] = j - 1; - istack[jstack - 1] = l; - l = i; - } - } - } -#undef PNL_SMALL -#undef PNL_SWAP +SORT_IMPL(pgno_sort, pgno_t, MDBX_PNL_ORDERED) +static __hot void mdbx_pnl_sort(MDBX_PNL pnl) { + pgno_sort(MDBX_PNL_BEGIN(pnl), MDBX_PNL_END(pnl)); assert(mdbx_pnl_check(pnl, false)); } @@ -1204,15 +1246,10 @@ static __inline void mdbx_txl_xappend(MDBX_TXL tl, txnid_t id) { MDBX_PNL_LAST(tl) = id; } -static int mdbx_txl_cmp(const void *pa, const void *pb) { - const txnid_t a = *(MDBX_TXL)pa; - const txnid_t b = *(MDBX_TXL)pb; - return mdbx_cmp2int(b, a); -} - -static void mdbx_txl_sort(MDBX_TXL ptr) { - /* LY: temporary */ - qsort(ptr + 1, (size_t)ptr[0], sizeof(*ptr), mdbx_txl_cmp); +#define TXNID_SORT_CMP(first, last) ((first) > (last)) +SORT_IMPL(txnid_sort, txnid_t, TXNID_SORT_CMP) +static void mdbx_txl_sort(MDBX_TXL tl) { + txnid_sort(MDBX_PNL_BEGIN(tl), MDBX_PNL_END(tl)); } static int __must_check_result mdbx_txl_append(MDBX_TXL *ptl, txnid_t id) { @@ -1239,18 +1276,14 @@ static int __must_check_result mdbx_txl_append_list(MDBX_TXL *ptl, /*----------------------------------------------------------------------------*/ -static __hot int mdbx_dpl_cmp(const void *pa, const void *pb) { - const MDBX_DP a = *(MDBX_DPL)pa; - const MDBX_DP b = *(MDBX_DPL)pb; - return mdbx_cmp2int(a.pgno, b.pgno); -} - +#define DP_SORT_CMP(first, last) ((first).pgno < (last).pgno) +SORT_IMPL(dp_sort, MDBX_DP, DP_SORT_CMP) static __inline MDBX_DPL mdbx_dpl_sort(MDBX_DPL dl) { assert(dl->length <= MDBX_DPL_TXNFULL); + assert(dl->sorted <= dl->length); if (dl->sorted != dl->length) { - /* LY: temporary */ dl->sorted = dl->length; - qsort(dl + 1, dl->length, sizeof(*dl), mdbx_dpl_cmp); + dp_sort(dl + 1, dl + dl->length + 1); } return dl; }