mdbx: replace internal qsort implementation (up to 25% faster).

Change-Id: I20cd1c7cf9d5d7dbf70f5fb1865efeecb4bc42fc
This commit is contained in:
Leonid Yuriev 2019-09-22 04:02:02 +03:00
parent 5a94d734cc
commit aceab9be44

View File

@ -829,6 +829,121 @@ static int lcklist_detach_locked(MDBX_env *env) {
return rc;
* LY: State of the art quicksort-based sorting, with internal stack and
* shell-insertion-sort for small chunks (less than half of SORT_THRESHOLD).
/* LY: Large threshold give some boost due less overhead in the inner qsort
* loops, but also a penalty in cases reverse-sorted data.
* So, 42 is magically but reasonable:
* - 0-3% faster than std::sort (from GNU C++ STL 2018) in most cases.
* - slower by a few ticks in a few cases for sequences shorter than 21. */
#define SORT_SWAP(TYPE, a, b) \
do { \
const TYPE swap_tmp = (a); \
(a) = (b); \
(b) = swap_tmp; \
} while (0)
#define SORT_SHELLPASS(TYPE, CMP, begin, end, gap) \
for (TYPE *i = begin + gap; i < end; ++i) { \
for (TYPE *j = i - (gap); j >= begin && CMP(*i, *j); j -= gap) { \
const TYPE tmp = *i; \
do { \
j[gap] = *j; \
j -= gap; \
} while (j >= begin && CMP(tmp, *j)); \
j[gap] = tmp; \
break; \
} \
#define SORT_PUSH(low, high) \
do { \
top->lo = (low); \
top->hi = (high); \
++top; \
} while (0)
#define SORT_POP(low, high) \
do { \
--top; \
low = top->lo; \
high = top->hi; \
} while (0)
typedef struct { \
TYPE *lo, *hi; \
} NAME##_stack; \
static __hot void NAME(TYPE *const begin, TYPE *const end) { \
const ptrdiff_t length = end - begin; \
if (length < 2) \
return; \
if (length > SORT_THRESHOLD / 2) { \
NAME##_stack stack[sizeof(unsigned) * CHAR_BIT], *top = stack; \
TYPE *hi = end - 1; \
TYPE *lo = begin; \
while (top >= stack) { \
TYPE *mid = lo + ((hi - lo) >> 1); \
if (CMP(*mid, *lo)) \
SORT_SWAP(TYPE, *mid, *lo); \
if (CMP(*hi, *mid)) { \
SORT_SWAP(TYPE, *hi, *mid); \
if (CMP(*mid, *lo)) \
SORT_SWAP(TYPE, *mid, *lo); \
} \
TYPE *right = hi - 1; \
TYPE *left = lo + 1; \
do { \
while (CMP(*mid, *right)) \
--right; \
while (CMP(*left, *mid)) \
++left; \
if (left < right) { \
SORT_SWAP(TYPE, *left, *right); \
if (mid == left) \
mid = right; \
else if (mid == right) \
mid = left; \
++left; \
--right; \
} else if (left == right) { \
++left; \
--right; \
break; \
} \
} while (left <= right); \
if (lo + SORT_THRESHOLD > right) { \
if (left + SORT_THRESHOLD > hi) \
SORT_POP(lo, hi); \
else \
lo = left; \
} else if (left + SORT_THRESHOLD > hi) \
hi = right; \
else if (right - lo > hi - left) { \
SORT_PUSH(lo, right); \
lo = left; \
} else { \
SORT_PUSH(left, hi); \
hi = right; \
} \
} \
} \
SORT_SHELLPASS(TYPE, CMP, begin, end, 8); \
SORT_SHELLPASS(TYPE, CMP, begin, end, 1); \
static __inline size_t pnl2bytes(const size_t size) {
@ -1007,82 +1122,9 @@ static void __hot mdbx_pnl_xmerge(MDBX_PNL pnl, MDBX_PNL merge) {
assert(mdbx_pnl_check(pnl, true));
/* Sort an PNL */
static void __hot mdbx_pnl_sort(MDBX_PNL pnl) {
/* Max possible depth of int-indexed tree * 2 items/level */
int istack[sizeof(int) * CHAR_BIT * 2];
int i, j, k, l, ir, jstack;
pgno_t a;
/* Quicksort + Insertion sort for small arrays */
#define PNL_SMALL 8
#define PNL_SWAP(a, b) \
do { \
pgno_t tmp_pgno = (a); \
(a) = (b); \
(b) = tmp_pgno; \
} while (0)
ir = (int)MDBX_PNL_SIZE(pnl);
l = 1;
jstack = 0;
while (1) {
if (ir - l < PNL_SMALL) { /* Insertion sort */
for (j = l + 1; j <= ir; j++) {
a = pnl[j];
for (i = j - 1; i >= 1; i--) {
if (MDBX_PNL_DISORDERED(a, pnl[i]))
pnl[i + 1] = pnl[i];
pnl[i + 1] = a;
if (jstack == 0)
ir = istack[jstack--];
l = istack[jstack--];
} else {
k = (l + ir) >> 1; /* Choose median of left, center, right */
PNL_SWAP(pnl[k], pnl[l + 1]);
if (MDBX_PNL_ORDERED(pnl[ir], pnl[l]))
PNL_SWAP(pnl[l], pnl[ir]);
if (MDBX_PNL_ORDERED(pnl[ir], pnl[l + 1]))
PNL_SWAP(pnl[l + 1], pnl[ir]);
if (MDBX_PNL_ORDERED(pnl[l + 1], pnl[l]))
PNL_SWAP(pnl[l], pnl[l + 1]);
i = l + 1;
j = ir;
a = pnl[l + 1];
while (1) {
while (MDBX_PNL_ORDERED(pnl[i], a));
while (MDBX_PNL_ORDERED(a, pnl[j]));
if (j < i)
PNL_SWAP(pnl[i], pnl[j]);
pnl[l + 1] = pnl[j];
pnl[j] = a;
jstack += 2;
if (ir - i + 1 >= j - l) {
istack[jstack] = ir;
istack[jstack - 1] = i;
ir = j - 1;
} else {
istack[jstack] = j - 1;
istack[jstack - 1] = l;
l = i;
#undef PNL_SMALL
#undef PNL_SWAP
SORT_IMPL(pgno_sort, pgno_t, MDBX_PNL_ORDERED)
static __hot void mdbx_pnl_sort(MDBX_PNL pnl) {
pgno_sort(MDBX_PNL_BEGIN(pnl), MDBX_PNL_END(pnl));
assert(mdbx_pnl_check(pnl, false));
@ -1204,15 +1246,10 @@ static __inline void mdbx_txl_xappend(MDBX_TXL tl, txnid_t id) {
MDBX_PNL_LAST(tl) = id;
static int mdbx_txl_cmp(const void *pa, const void *pb) {
const txnid_t a = *(MDBX_TXL)pa;
const txnid_t b = *(MDBX_TXL)pb;
return mdbx_cmp2int(b, a);
static void mdbx_txl_sort(MDBX_TXL ptr) {
/* LY: temporary */
qsort(ptr + 1, (size_t)ptr[0], sizeof(*ptr), mdbx_txl_cmp);
#define TXNID_SORT_CMP(first, last) ((first) > (last))
SORT_IMPL(txnid_sort, txnid_t, TXNID_SORT_CMP)
static void mdbx_txl_sort(MDBX_TXL tl) {
txnid_sort(MDBX_PNL_BEGIN(tl), MDBX_PNL_END(tl));
static int __must_check_result mdbx_txl_append(MDBX_TXL *ptl, txnid_t id) {
@ -1239,18 +1276,14 @@ static int __must_check_result mdbx_txl_append_list(MDBX_TXL *ptl,
static __hot int mdbx_dpl_cmp(const void *pa, const void *pb) {
const MDBX_DP a = *(MDBX_DPL)pa;
const MDBX_DP b = *(MDBX_DPL)pb;
return mdbx_cmp2int(a.pgno, b.pgno);
#define DP_SORT_CMP(first, last) ((first).pgno < (last).pgno)
static __inline MDBX_DPL mdbx_dpl_sort(MDBX_DPL dl) {
assert(dl->length <= MDBX_DPL_TXNFULL);
assert(dl->sorted <= dl->length);
if (dl->sorted != dl->length) {
/* LY: temporary */
dl->sorted = dl->length;
qsort(dl + 1, dl->length, sizeof(*dl), mdbx_dpl_cmp);
dp_sort(dl + 1, dl + dl->length + 1);
return dl;