diff --git a/src/core.c b/src/core.c index e13c5cbf..215c840c 100644 --- a/src/core.c +++ b/src/core.c @@ -2717,11 +2717,9 @@ static int __must_check_result txl_append(MDBX_TXL *ptl, txnid_t id) { /*----------------------------------------------------------------------------*/ -#define MDBX_DPL_UNSORTED_BACKLOG 16 -#define MDBX_DPL_GAP_FOR_MERGESORT MDBX_DPL_UNSORTED_BACKLOG -#define MDBX_DPL_GAP_FOR_EDGING 2 -#define MDBX_DPL_RESERVE_GAP \ - (MDBX_DPL_GAP_FOR_MERGESORT + MDBX_DPL_GAP_FOR_EDGING) +#define MDBX_DPL_GAP_MERGESORT 16 +#define MDBX_DPL_GAP_EDGING 2 +#define MDBX_DPL_RESERVE_GAP (MDBX_DPL_GAP_MERGESORT + MDBX_DPL_GAP_EDGING) static __always_inline size_t dpl_size2bytes(ptrdiff_t size) { assert(size > CURSOR_STACK && (size_t)size <= MDBX_PGL_LIMIT); @@ -2847,7 +2845,7 @@ __hot __noinline static MDBX_dpl *dpl_sort_slowpath(const MDBX_txn *txn) { unlikely(!dpl_radixsort(dl->items + 1, dl->length))) { if (dl->sorted > unsorted / 4 + 4 && (MDBX_DPL_PREALLOC_FOR_RADIXSORT || - dl->length + unsorted < dl->detent + MDBX_DPL_GAP_FOR_MERGESORT)) { + dl->length + unsorted < dl->detent + MDBX_DPL_GAP_MERGESORT)) { MDBX_dp *const sorted_begin = dl->items + 1; MDBX_dp *const sorted_end = sorted_begin + dl->sorted; MDBX_dp *const end = @@ -3094,12 +3092,6 @@ static __always_inline int __must_check_result dpl_append(MDBX_txn *txn, } } - const size_t length = dl->length + 1; - const size_t sorted = - (dl->sorted == dl->length && dl->items[dl->length].pgno < pgno) - ? length - : dl->sorted; - if (unlikely(dl->length == dl->detent)) { if (unlikely(dl->detent >= MDBX_PGL_LIMIT)) { ERROR("DPL is full (MDBX_PGL_LIMIT %zu)", MDBX_PGL_LIMIT); @@ -3114,14 +3106,78 @@ static __always_inline int __must_check_result dpl_append(MDBX_txn *txn, tASSERT(txn, dl->length < dl->detent); } - /* copy the stub beyond the end */ - dl->items[length + 1] = dl->items[length]; - /* append page */ - dl->items[length] = dp; - dl->length = length; - dl->sorted = sorted; + /* Сортировка нужна для быстрого поиска, используем несколько тактик: + * 1) Сохраняем упорядоченность при естественной вставке в нужном порядке. + * 2) Добавляем в не-сортированный хвост, который сортируем и сливаем + * с отсортированной головой по необходимости, а пока хвост короткий + * ищем в нём сканированием, избегая большой пересортировки. + * 3) Если не-сортированный хвост короткий, а добавляемый элемент близок + * к концу отсортированной головы, то выгоднее сразу вставить элемент + * в нужное место. + * + * Алгоритмически: + * - добавлять в не-сортированный хвост следует только если вставка сильно + * дорогая, т.е. если целевая позиция элемента сильно далека от конца; + * - для быстрой проверки достаточно сравнить добавляемый элемент с отстоящим + * от конца на максимально-приемлемое расстояние; + * - если список короче, либо элемент в этой позиции меньше вставляемого, + * то следует перемещать элементы и вставлять в отсортированную голову; + * - если не-сортированный хвост длиннее, либо элемент в этой позиции больше, + * то следует добавлять в не-сортированный хвост. */ + dl->pages_including_loose += npages; + MDBX_dp *i = dl->items + dl->length; + +#define MDBX_DPL_INSERTION_THRESHOLD 42 + const ptrdiff_t pivot = (ptrdiff_t)dl->length - MDBX_DPL_INSERTION_THRESHOLD; +#if MDBX_HAVE_CMOV + const pgno_t pivot_pgno = + dl->items[(dl->length < MDBX_DPL_INSERTION_THRESHOLD) + ? 0 + : dl->length - MDBX_DPL_INSERTION_THRESHOLD] + .pgno; +#endif /* MDBX_HAVE_CMOV */ + + /* copy the stub beyond the end */ + i[2] = i[1]; + dl->length += 1; + + if (likely(pivot <= (ptrdiff_t)dl->sorted) && +#if MDBX_HAVE_CMOV + pivot_pgno < dp.pgno) { +#else + (pivot <= 0 || dl->items[pivot].pgno < dp.pgno)) { +#endif /* MDBX_HAVE_CMOV */ + dl->sorted += 1; + + /* сдвигаем несортированный хвост */ + while (i >= dl->items + dl->sorted) { +#if !defined(__GNUC__) /* пытаемся избежать вызова memmove() */ + i[1] = *i; +#elif MDBX_WORDBITS == 64 && \ + (defined(__SIZEOF_INT128__) || \ + (defined(_INTEGRAL_MAX_BITS) && _INTEGRAL_MAX_BITS >= 128)) + STATIC_ASSERT(sizeof(MDBX_dp) == sizeof(__uint128_t)); + ((__uint128_t *)i)[1] = *(volatile __uint128_t *)i; +#else + i[1].ptr = i->ptr; + i[1].pgno = i->pgno; + i[1].mlru = i->mlru; +#endif + --i; + } + /* ищем нужную позицию сдвигая отсортированные элементы */ + while (i->pgno > pgno) { + tASSERT(txn, i > dl->items); + i[1] = *i; + --i; + } + tASSERT(txn, i->pgno < dp.pgno); + } + + i[1] = dp; assert(dl->items[0].pgno == 0 && dl->items[dl->length + 1].pgno == P_INVALID); + assert(dl->sorted <= dl->length); return MDBX_SUCCESS; } @@ -10082,7 +10138,8 @@ retry: MDBX_dpl *const dl = txn->tw.dirtylist; if (dl) { tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC); - size_t w = 0; + tASSERT(txn, dl->sorted <= dl->length); + size_t w = 0, sorted_out = 0; for (size_t r = w; ++r <= dl->length;) { MDBX_page *dp = dl->items[r].ptr; tASSERT(txn, dp->mp_flags == P_LOOSE || IS_MODIFIABLE(txn, dp)); @@ -10092,6 +10149,7 @@ retry: dl->items[w] = dl->items[r]; } else { tASSERT(txn, dp->mp_flags == P_LOOSE); + sorted_out += dl->sorted >= r; if (!MDBX_AVOID_MSYNC || !(env->me_flags & MDBX_WRITEMAP)) { tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0); dpage_free(env, dp, 1); @@ -10101,8 +10159,9 @@ retry: TRACE("%s: filtered-out loose-pages from %zu -> %zu dirty-pages", dbg_prefix_mode, dl->length, w); tASSERT(txn, txn->tw.loose_count == dl->length - w); + dl->sorted -= sorted_out; + tASSERT(txn, dl->sorted <= w); dpl_setlen(dl, w); - dl->sorted = 0; dl->pages_including_loose -= txn->tw.loose_count; txn->tw.dirtyroom += txn->tw.loose_count; tASSERT(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length == diff --git a/src/internals.h b/src/internals.h index 8cdb75e6..d67c912c 100644 --- a/src/internals.h +++ b/src/internals.h @@ -892,7 +892,7 @@ typedef struct MDBX_lockinfo { #endif /* MDBX_WORDBITS */ #define MDBX_READERS_LIMIT 32767 -#define MDBX_RADIXSORT_THRESHOLD 333 +#define MDBX_RADIXSORT_THRESHOLD 142 /*----------------------------------------------------------------------------*/