mdbx: оптимизация поддержки сортировки в dpl_append().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-12-06 22:20:00 +03:00
parent a9163f6307
commit 3a77af7d8a
2 changed files with 80 additions and 21 deletions

View File

@ -2717,11 +2717,9 @@ static int __must_check_result txl_append(MDBX_TXL *ptl, txnid_t id) {
/*----------------------------------------------------------------------------*/
#define MDBX_DPL_UNSORTED_BACKLOG 16
#define MDBX_DPL_GAP_FOR_MERGESORT MDBX_DPL_UNSORTED_BACKLOG
#define MDBX_DPL_GAP_FOR_EDGING 2
#define MDBX_DPL_RESERVE_GAP \
(MDBX_DPL_GAP_FOR_MERGESORT + MDBX_DPL_GAP_FOR_EDGING)
#define MDBX_DPL_GAP_MERGESORT 16
#define MDBX_DPL_GAP_EDGING 2
#define MDBX_DPL_RESERVE_GAP (MDBX_DPL_GAP_MERGESORT + MDBX_DPL_GAP_EDGING)
static __always_inline size_t dpl_size2bytes(ptrdiff_t size) {
assert(size > CURSOR_STACK && (size_t)size <= MDBX_PGL_LIMIT);
@ -2847,7 +2845,7 @@ __hot __noinline static MDBX_dpl *dpl_sort_slowpath(const MDBX_txn *txn) {
unlikely(!dpl_radixsort(dl->items + 1, dl->length))) {
if (dl->sorted > unsorted / 4 + 4 &&
(MDBX_DPL_PREALLOC_FOR_RADIXSORT ||
dl->length + unsorted < dl->detent + MDBX_DPL_GAP_FOR_MERGESORT)) {
dl->length + unsorted < dl->detent + MDBX_DPL_GAP_MERGESORT)) {
MDBX_dp *const sorted_begin = dl->items + 1;
MDBX_dp *const sorted_end = sorted_begin + dl->sorted;
MDBX_dp *const end =
@ -3094,12 +3092,6 @@ static __always_inline int __must_check_result dpl_append(MDBX_txn *txn,
}
}
const size_t length = dl->length + 1;
const size_t sorted =
(dl->sorted == dl->length && dl->items[dl->length].pgno < pgno)
? length
: dl->sorted;
if (unlikely(dl->length == dl->detent)) {
if (unlikely(dl->detent >= MDBX_PGL_LIMIT)) {
ERROR("DPL is full (MDBX_PGL_LIMIT %zu)", MDBX_PGL_LIMIT);
@ -3114,14 +3106,78 @@ static __always_inline int __must_check_result dpl_append(MDBX_txn *txn,
tASSERT(txn, dl->length < dl->detent);
}
/* copy the stub beyond the end */
dl->items[length + 1] = dl->items[length];
/* append page */
dl->items[length] = dp;
dl->length = length;
dl->sorted = sorted;
/* Сортировка нужна для быстрого поиска, используем несколько тактик:
* 1) Сохраняем упорядоченность при естественной вставке в нужном порядке.
* 2) Добавляем в не-сортированный хвост, который сортируем и сливаем
* с отсортированной головой по необходимости, а пока хвост короткий
* ищем в нём сканированием, избегая большой пересортировки.
* 3) Если не-сортированный хвост короткий, а добавляемый элемент близок
* к концу отсортированной головы, то выгоднее сразу вставить элемент
* в нужное место.
*
* Алгоритмически:
* - добавлять в не-сортированный хвост следует только если вставка сильно
* дорогая, т.е. если целевая позиция элемента сильно далека от конца;
* - для быстрой проверки достаточно сравнить добавляемый элемент с отстоящим
* от конца на максимально-приемлемое расстояние;
* - если список короче, либо элемент в этой позиции меньше вставляемого,
* то следует перемещать элементы и вставлять в отсортированную голову;
* - если не-сортированный хвост длиннее, либо элемент в этой позиции больше,
* то следует добавлять в не-сортированный хвост. */
dl->pages_including_loose += npages;
MDBX_dp *i = dl->items + dl->length;
#define MDBX_DPL_INSERTION_THRESHOLD 42
const ptrdiff_t pivot = (ptrdiff_t)dl->length - MDBX_DPL_INSERTION_THRESHOLD;
#if MDBX_HAVE_CMOV
const pgno_t pivot_pgno =
dl->items[(dl->length < MDBX_DPL_INSERTION_THRESHOLD)
? 0
: dl->length - MDBX_DPL_INSERTION_THRESHOLD]
.pgno;
#endif /* MDBX_HAVE_CMOV */
/* copy the stub beyond the end */
i[2] = i[1];
dl->length += 1;
if (likely(pivot <= (ptrdiff_t)dl->sorted) &&
#if MDBX_HAVE_CMOV
pivot_pgno < dp.pgno) {
#else
(pivot <= 0 || dl->items[pivot].pgno < dp.pgno)) {
#endif /* MDBX_HAVE_CMOV */
dl->sorted += 1;
/* сдвигаем несортированный хвост */
while (i >= dl->items + dl->sorted) {
#if !defined(__GNUC__) /* пытаемся избежать вызова memmove() */
i[1] = *i;
#elif MDBX_WORDBITS == 64 && \
(defined(__SIZEOF_INT128__) || \
(defined(_INTEGRAL_MAX_BITS) && _INTEGRAL_MAX_BITS >= 128))
STATIC_ASSERT(sizeof(MDBX_dp) == sizeof(__uint128_t));
((__uint128_t *)i)[1] = *(volatile __uint128_t *)i;
#else
i[1].ptr = i->ptr;
i[1].pgno = i->pgno;
i[1].mlru = i->mlru;
#endif
--i;
}
/* ищем нужную позицию сдвигая отсортированные элементы */
while (i->pgno > pgno) {
tASSERT(txn, i > dl->items);
i[1] = *i;
--i;
}
tASSERT(txn, i->pgno < dp.pgno);
}
i[1] = dp;
assert(dl->items[0].pgno == 0 && dl->items[dl->length + 1].pgno == P_INVALID);
assert(dl->sorted <= dl->length);
return MDBX_SUCCESS;
}
@ -10082,7 +10138,8 @@ retry:
MDBX_dpl *const dl = txn->tw.dirtylist;
if (dl) {
tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC);
size_t w = 0;
tASSERT(txn, dl->sorted <= dl->length);
size_t w = 0, sorted_out = 0;
for (size_t r = w; ++r <= dl->length;) {
MDBX_page *dp = dl->items[r].ptr;
tASSERT(txn, dp->mp_flags == P_LOOSE || IS_MODIFIABLE(txn, dp));
@ -10092,6 +10149,7 @@ retry:
dl->items[w] = dl->items[r];
} else {
tASSERT(txn, dp->mp_flags == P_LOOSE);
sorted_out += dl->sorted >= r;
if (!MDBX_AVOID_MSYNC || !(env->me_flags & MDBX_WRITEMAP)) {
tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0);
dpage_free(env, dp, 1);
@ -10101,8 +10159,9 @@ retry:
TRACE("%s: filtered-out loose-pages from %zu -> %zu dirty-pages",
dbg_prefix_mode, dl->length, w);
tASSERT(txn, txn->tw.loose_count == dl->length - w);
dl->sorted -= sorted_out;
tASSERT(txn, dl->sorted <= w);
dpl_setlen(dl, w);
dl->sorted = 0;
dl->pages_including_loose -= txn->tw.loose_count;
txn->tw.dirtyroom += txn->tw.loose_count;
tASSERT(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length ==

View File

@ -892,7 +892,7 @@ typedef struct MDBX_lockinfo {
#endif /* MDBX_WORDBITS */
#define MDBX_READERS_LIMIT 32767
#define MDBX_RADIXSORT_THRESHOLD 333
#define MDBX_RADIXSORT_THRESHOLD 142
/*----------------------------------------------------------------------------*/