mdbx: provide configurable ascending/descending sort-order for pgno-lists.

Change-Id: I3134c100880ff28bb0aaf46ed91affc9f6347110
This commit is contained in:
Leo Yuriev 2017-07-24 00:54:10 +03:00
parent 552b759878
commit b8b3ba8e91
4 changed files with 99 additions and 51 deletions

View File

@ -432,14 +432,23 @@ typedef struct MDBX_lockinfo {
#define MDBX_LOCK_MAGIC ((MDBX_MAGIC << 8) + MDBX_LOCK_VERSION)
/*----------------------------------------------------------------------------*/
/* Two kind lists of pages (aka IDL) */
/* Two kind lists of pages (aka PNL) */
/* An PNL is an Page Number List, a sorted array of IDs. The first
* element of the array is a counter for how many actual
* IDs are in the list. In the libmdbx PNLs are sorted in
* descending order. */
/* An PNL is an Page Number List, a sorted array of IDs. The first element of
* the array is a counter for how many actual page-numbers are in the list.
* PNLs are sorted in descending order, this allow cut off a page with lowest
* pgno (at the tail) just truncating the list */
#define MDBX_PNL_ASCENDING 0
typedef pgno_t *MDBX_PNL;
#if MDBX_PNL_ASCENDING
#define MDBX_PNL_ORDERED(first, last) ((first) < (last))
#define MDBX_PNL_DISORDERED(first, last) ((first) >= (last))
#else
#define MDBX_PNL_ORDERED(first, last) ((first) > (last))
#define MDBX_PNL_DISORDERED(first, last) ((first) <= (last))
#endif
/* List of txnid, only for MDBX_env.mt_lifo_reclaimed */
typedef txnid_t *MDBX_TXL;
@ -1201,6 +1210,11 @@ static __inline pgno_t pgno_add(pgno_t base, pgno_t augend) {
return (augend < MAX_PAGENO - base) ? base + augend : MAX_PAGENO;
}
static __inline pgno_t pgno_sub(pgno_t base, pgno_t subtrahend) {
assert(base >= MIN_PAGENO);
return (subtrahend < base - MIN_PAGENO) ? base - subtrahend : MIN_PAGENO;
}
static __inline size_t pgno_align2os_bytes(const MDBX_env *env, pgno_t pgno) {
return mdbx_roundup2(pgno2bytes(env, pgno), env->me_os_psize);
}

View File

@ -202,8 +202,8 @@ static __inline void mdbx_pnl_xappend(MDBX_PNL pl, pgno_t id) {
static bool mdbx_pnl_check(MDBX_PNL pl) {
if (pl) {
for (const pgno_t *ptr = pl + pl[0]; --ptr > pl;) {
assert(ptr[0] > ptr[1]);
if (unlikely(ptr[0] <= ptr[1]))
assert(MDBX_PNL_ORDERED(ptr[0], ptr[1]));
if (unlikely(MDBX_PNL_DISORDERED(ptr[0], ptr[1])))
return false;
}
}
@ -235,7 +235,7 @@ static void __hot mdbx_pnl_sort(MDBX_PNL pnl) {
for (j = l + 1; j <= ir; j++) {
a = pnl[j];
for (i = j - 1; i >= 1; i--) {
if (pnl[i] >= a)
if (MDBX_PNL_ORDERED(pnl[i], a))
break;
pnl[i + 1] = pnl[i];
}
@ -248,13 +248,13 @@ static void __hot mdbx_pnl_sort(MDBX_PNL pnl) {
} else {
k = (l + ir) >> 1; /* Choose median of left, center, right */
PNL_SWAP(pnl[k], pnl[l + 1]);
if (pnl[l] < pnl[ir])
if (MDBX_PNL_DISORDERED(pnl[l], pnl[ir]))
PNL_SWAP(pnl[l], pnl[ir]);
if (pnl[l + 1] < pnl[ir])
if (MDBX_PNL_DISORDERED(pnl[l + 1], pnl[ir]))
PNL_SWAP(pnl[l + 1], pnl[ir]);
if (pnl[l] < pnl[l + 1])
if (MDBX_PNL_DISORDERED(pnl[l], pnl[l + 1]))
PNL_SWAP(pnl[l], pnl[l + 1]);
i = l + 1;
@ -263,10 +263,10 @@ static void __hot mdbx_pnl_sort(MDBX_PNL pnl) {
while (1) {
do
i++;
while (pnl[i] > a);
while (MDBX_PNL_ORDERED(pnl[i], a));
do
j--;
while (pnl[j] < a);
while (MDBX_PNL_DISORDERED(pnl[j], a));
if (j < i)
break;
PNL_SWAP(pnl[i], pnl[j]);
@ -308,7 +308,8 @@ static unsigned __hot mdbx_pnl_search(MDBX_PNL pnl, pgno_t id) {
while (n > 0) {
unsigned pivot = n >> 1;
cursor = base + pivot + 1;
val = mdbx_cmp2int(pnl[cursor], id);
val = MDBX_PNL_ASCENDING ? mdbx_cmp2int(pnl[cursor], id)
: mdbx_cmp2int(id, pnl[cursor]);
if (val < 0) {
n = pivot;
@ -471,11 +472,12 @@ static void __hot mdbx_pnl_xmerge(MDBX_PNL pnl, MDBX_PNL merge) {
assert(mdbx_pnl_check(pnl));
assert(mdbx_pnl_check(merge));
pgno_t old_id, merge_id, i = merge[0], j = pnl[0], k = i + j, total = k;
pnl[0] = ~(pgno_t)0; /* delimiter for pl scan below */
pnl[0] =
MDBX_PNL_ASCENDING ? 0 : ~(pgno_t)0; /* delimiter for pl scan below */
old_id = pnl[j];
while (i) {
merge_id = merge[i--];
for (; old_id < merge_id; old_id = pnl[--j])
for (; MDBX_PNL_ORDERED(merge_id, old_id); old_id = pnl[--j])
pnl[k--] = old_id;
pnl[k--] = merge_id;
}
@ -1698,16 +1700,26 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp,
op = (flags & MDBX_LIFORECLAIM) ? MDBX_PREV : MDBX_NEXT) {
MDBX_val key, data;
/* Seek a big enough contiguous page range. Prefer
* pages at the tail, just truncating the list. */
/* Seek a big enough contiguous page range.
* Prefer pages with lower pgno. */
mdbx_tassert(txn, mdbx_pnl_check(env->me_reclaimed_pglist));
if (likely(flags & MDBX_ALLOC_CACHE) && repg_len > wanna_range &&
(!(flags & MDBX_COALESCE) || op == MDBX_FIRST)) {
#if MDBX_PNL_ASCENDING
for (repg_pos = 1; repg_pos <= repg_len - wanna_range; ++repg_pos) {
pgno = repg_list[repg_pos];
if (likely(repg_list[repg_pos + wanna_range - 1] ==
pgno + wanna_range - 1))
goto done;
}
#else
repg_pos = repg_len;
do {
pgno = repg_list[repg_pos];
if (likely(repg_list[repg_pos - wanna_range] == pgno + wanna_range))
goto done;
} while (--repg_pos > wanna_range);
#endif /* MDBX_PNL sort-order */
}
if (op == MDBX_FIRST) { /* 1st iteration, setup cursor, etc */
@ -1864,12 +1876,21 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp,
if ((flags & (MDBX_COALESCE | MDBX_ALLOC_CACHE)) ==
(MDBX_COALESCE | MDBX_ALLOC_CACHE) &&
repg_len > wanna_range) {
#if MDBX_PNL_ASCENDING
for (repg_pos = 1; repg_pos <= repg_len - wanna_range; ++repg_pos) {
pgno = repg_list[repg_pos];
if (likely(repg_list[repg_pos + wanna_range - 1] ==
pgno + wanna_range - 1))
goto done;
}
#else
repg_pos = repg_len;
do {
pgno = repg_list[repg_pos];
if (repg_list[repg_pos - wanna_range] == pgno + wanna_range)
if (likely(repg_list[repg_pos - wanna_range] == pgno + wanna_range))
goto done;
} while (--repg_pos > wanna_range);
#endif /* MDBX_PNL sort-order */
}
/* Use new pages from the map when nothing suitable in the freeDB */

View File

@ -327,8 +327,6 @@ static int handle_userdb(const uint64_t record_number, const MDBX_val *key,
static int handle_freedb(const uint64_t record_number, const MDBX_val *key,
const MDBX_val *data) {
char *bad = "";
pgno_t pg, prev;
int i, number, span = 0;
pgno_t *iptr = data->iov_base;
txnid_t txnid = *(txnid_t *)key->iov_base;
@ -342,7 +340,7 @@ static int handle_freedb(const uint64_t record_number, const MDBX_val *key,
problem_add("entry", record_number, "wrong idl size", "%" PRIuPTR "",
data->iov_len);
else {
number = *iptr++;
const intptr_t number = *iptr++;
if (number >= MDBX_PNL_UM_MAX)
problem_add("entry", record_number, "wrong idl length", "%" PRIiPTR "",
number);
@ -354,34 +352,42 @@ static int handle_freedb(const uint64_t record_number, const MDBX_val *key,
freedb_pages += number;
if (envinfo.mi_latter_reader_txnid > txnid)
reclaimable_pages += number;
for (i = number, prev = NUM_METAS - 1; --i >= 0;) {
pg = iptr[i];
pgno_t prev =
MDBX_PNL_ASCENDING ? NUM_METAS - 1 : envinfo.mi_last_pgno + 1;
intptr_t span = 1;
for (intptr_t i = 0; i < number; ++i) {
const pgno_t pg = iptr[i];
if (pg < NUM_METAS || pg > envinfo.mi_last_pgno)
problem_add("entry", record_number, "wrong idl entry",
"%u < %" PRIiPTR " < %" PRIiPTR "", NUM_METAS, pg,
envinfo.mi_last_pgno);
else if (pg <= prev) {
else if (MDBX_PNL_DISORDERED(prev, pg)) {
bad = " [bad sequence]";
problem_add("entry", record_number, "bad sequence",
"%" PRIiPTR " <= %" PRIiPTR "", pg, prev);
"%" PRIiPTR " <> %" PRIiPTR "", prev, pg);
}
prev = pg;
pg += span;
for (; i >= span && iptr[i - span] == pg; span++, pg++)
;
while (i + span < number &&
iptr[i + span] == (MDBX_PNL_ASCENDING ? pgno_add(pg, span)
: pgno_sub(pg, span)))
++span;
}
if (verbose > 2 && !only_subdb) {
print(" transaction %" PRIaTXN ", %u pages, maxspan %i%s\n", txnid,
number, span, bad);
if (verbose > 3) {
int j = number - 1;
while (j >= 0) {
pg = iptr[j];
for (span = 1; --j >= 0 && iptr[j] == pg + span; span++)
for (intptr_t i = 0; i < number; i += span) {
const pgno_t pg = iptr[i];
for (span = 1;
i + span < number &&
iptr[i + span] == (MDBX_PNL_ASCENDING ? pgno_add(pg, span)
: pgno_sub(pg, span));
++span)
;
if (span > 1)
print(" %9" PRIaPGNO "[%i]\n", pg, span);
else
if (span > 1) {
print(" %9" PRIaPGNO "[%" PRIiPTR "]\n", pg, span);
} else
print(" %9" PRIaPGNO "\n", pg);
}
}

View File

@ -234,30 +234,37 @@ int main(int argc, char *argv[]) {
break;
}
iptr = data.iov_base;
pages += *iptr;
const intptr_t number = *iptr++;
pages += number;
if (envinfo && mei.mi_latter_reader_txnid > *(size_t *)key.iov_base)
reclaimable += *iptr;
reclaimable += number;
if (freinfo > 1) {
char *bad = "";
pgno_t pg, prev;
intptr_t i, j, span = 0;
j = *iptr++;
for (i = j, prev = NUM_METAS - 1; --i >= 0;) {
pg = iptr[i];
if (pg <= prev)
pgno_t prev = MDBX_PNL_ASCENDING ? NUM_METAS - 1 : mei.mi_last_pgno + 1;
intptr_t i, span = 1;
for (i = 0; i < number; ++i) {
pgno_t pg = iptr[i];
if (MDBX_PNL_DISORDERED(prev, pg))
bad = " [bad sequence]";
prev = pg;
pg += (unsigned)span;
for (; i >= span && iptr[i - span] == pg; span++, pg++)
;
while (i + span < number &&
iptr[i + span] == (MDBX_PNL_ASCENDING ? pgno_add(pg, span)
: pgno_sub(pg, span)))
++span;
}
printf(" Transaction %" PRIaTXN ", %" PRIiPTR
" pages, maxspan %" PRIiPTR "%s\n",
*(txnid_t *)key.iov_base, j, span, bad);
*(txnid_t *)key.iov_base, number, span, bad);
if (freinfo > 2) {
for (--j; j >= 0;) {
pg = iptr[j];
for (span = 1; --j >= 0 && iptr[j] == pg + span; span++)
for (intptr_t i = 0; i < number; i += span) {
const pgno_t pg = iptr[i];
for (span = 1;
i + span < number &&
iptr[i + span] == (MDBX_PNL_ASCENDING ? pgno_add(pg, span)
: pgno_sub(pg, span));
++span)
;
if (span > 1)
printf(" %9" PRIaPGNO "[%" PRIiPTR "]\n", pg, span);