From e00dce3543c5d7128ca8e8232d25e0157bc6e371 Mon Sep 17 00:00:00 2001 From: Leo Yuriev Date: Sat, 1 Sep 2018 19:36:45 +0300 Subject: [PATCH] mdbx: rework PNL, DPL and TXN lists. Change-Id: I79c7399912516b17cc255fc8e24b5941338e5eb1 --- src/bits.h | 59 ++-- src/mdbx.c | 758 +++++++++++++++++++++++-------------------- src/tools/mdbx_chk.c | 2 +- 3 files changed, 438 insertions(+), 381 deletions(-) diff --git a/src/bits.h b/src/bits.h index 7c9604e4..11e8c1d8 100644 --- a/src/bits.h +++ b/src/bits.h @@ -479,6 +479,10 @@ typedef struct MDBX_lockinfo { #define MDBX_LOCK_MAGIC ((MDBX_MAGIC << 8) + MDBX_LOCK_VERSION) +#ifndef MDBX_ASSUME_MALLOC_OVERHEAD +#define MDBX_ASSUME_MALLOC_OVERHEAD (sizeof(void *) * 2u) +#endif /* MDBX_ASSUME_MALLOC_OVERHEAD */ + /*----------------------------------------------------------------------------*/ /* Two kind lists of pages (aka PNL) */ @@ -500,33 +504,46 @@ typedef pgno_t *MDBX_PNL; /* List of txnid, only for MDBX_env.mt_lifo_reclaimed */ typedef txnid_t *MDBX_TXL; -/* An ID2 is an ID/pointer pair. */ -typedef union MDBX_ID2 { +/* An Dirty-Page list item is an pgno/pointer pair. */ +typedef union MDBX_DP { struct { pgno_t pgno; void *ptr; }; - unsigned limit, length; -} MDBX_ID2; + struct { + pgno_t unused; + unsigned length; + }; +} MDBX_DP; -/* An ID2L is an ID2 List, a sorted array of ID2s. - * The first element's mid member is a count of how many actual - * elements are in the array. The mptr member of the first element is - * unused. The array is sorted in ascending order by mid. */ -typedef MDBX_ID2 *MDBX_ID2L; +/* An DPL (dirty-page list) is a sorted array of MDBX_DPs. + * The first element's length member is a count of how many actual + * elements are in the array. */ +typedef MDBX_DP *MDBX_DPL; -/* PNL sizes - likely should be even bigger - * limiting factors: sizeof(pgno_t), thread stack size */ -#define MDBX_LIST_MAX ((1 << 24) - 1) +/* PNL sizes - likely should be even bigger */ +#define MDBX_PNL_GRANULATE 1024 +#define MDBX_PNL_INITIAL \ + (MDBX_PNL_GRANULATE - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(pgno_t)) +#define MDBX_PNL_MAX \ + ((1u << 24) - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(pgno_t)) +#define MDBX_DPL_TXNFULL (MDBX_PNL_MAX / 4) -#define MDBX_PNL_SIZEOF(pl) (((pl)[0] + 1) * sizeof(pgno_t)) -#define MDBX_PNL_IS_ZERO(pl) ((pl)[0] == 0) -#define MDBX_PNL_CPY(dst, src) (memcpy(dst, src, MDBX_PNL_SIZEOF(src))) -#define MDBX_PNL_FIRST(pl) ((pl)[1]) -#define MDBX_PNL_LAST(pl) ((pl)[(pl)[0]]) +#define MDBX_TXL_GRANULATE 32 +#define MDBX_TXL_INITIAL \ + (MDBX_TXL_GRANULATE - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(txnid_t)) +#define MDBX_TXL_MAX \ + ((1u << 17) - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(txnid_t)) -/* Current max length of an mdbx_pnl_alloc()ed PNL */ #define MDBX_PNL_ALLOCLEN(pl) ((pl)[-1]) +#define MDBX_PNL_SIZE(pl) ((pl)[0]) +#define MDBX_PNL_FIRST(pl) ((pl)[1]) +#define MDBX_PNL_LAST(pl) ((pl)[MDBX_PNL_SIZE(pl)]) +#define MDBX_PNL_BEGIN(pl) (&(pl)[1]) +#define MDBX_PNL_END(pl) (&(pl)[MDBX_PNL_SIZE(pl) + 1]) + +#define MDBX_PNL_SIZEOF(pl) ((MDBX_PNL_SIZE(pl) + 1) * sizeof(pgno_t)) +#define MDBX_PNL_IS_EMPTY(pl) (MDBX_PNL_SIZE(pl) == 0) /*----------------------------------------------------------------------------*/ /* Internal structures */ @@ -570,7 +587,7 @@ struct MDBX_txn { MDBX_PNL mt_spill_pages; union { /* For write txns: Modified pages. Sorted when not MDBX_WRITEMAP. */ - MDBX_ID2L mt_rw_dirtylist; + MDBX_DPL mt_rw_dirtylist; /* For read txns: This thread/txn's reader table slot, or NULL. */ MDBX_reader *mt_ro_reader; }; @@ -767,8 +784,8 @@ struct MDBX_env { MDBX_page *me_dpages; /* list of malloc'd blocks for re-use */ /* PNL of pages that became unused in a write txn */ MDBX_PNL me_free_pgs; - /* ID2L of pages written during a write txn. Length MDBX_LIST_MAX. */ - MDBX_ID2L me_dirtylist; + /* MDBX_DP of pages written during a write txn. Length MDBX_DPL_TXNFULL. */ + MDBX_DPL me_dirtylist; /* Number of freelist items that can fit in a single overflow page */ unsigned me_maxgc_ov1page; /* Max size of a node on a page */ diff --git a/src/mdbx.c b/src/mdbx.c index ea6843bb..4412a634 100644 --- a/src/mdbx.c +++ b/src/mdbx.c @@ -497,67 +497,183 @@ __cold void mdbx_rthc_remove(const mdbx_thread_key_t key) { /*----------------------------------------------------------------------------*/ -/* Allocate an PNL. - * Allocates memory for an PNL of the given size. - * Returns PNL on success, NULL on failure. */ +static __inline size_t pnl2bytes(const size_t size) { + assert(size > 0 && size <= MDBX_PNL_MAX * 2); + size_t bytes = + mdbx_roundup2(MDBX_ASSUME_MALLOC_OVERHEAD + sizeof(pgno_t) * (size + 2), + MDBX_PNL_GRANULATE * sizeof(pgno_t)) - + MDBX_ASSUME_MALLOC_OVERHEAD; + return bytes; +} + +static __inline pgno_t bytes2pnl(const size_t bytes) { + size_t size = bytes / sizeof(pgno_t); + assert(size > 2 && size <= MDBX_PNL_MAX * 2); + return (pgno_t)size - 2; +} + static MDBX_PNL mdbx_pnl_alloc(size_t size) { - assert(size <= MDBX_LIST_MAX); - MDBX_PNL pl = malloc((size + 2) * sizeof(pgno_t)); + const size_t bytes = pnl2bytes(size); + MDBX_PNL pl = malloc(bytes); if (likely(pl)) { - *pl++ = (pgno_t)size; - *pl = 0; +#if __GLIBC_PREREQ(2, 12) + const size_t bytes = malloc_usable_size(pl); +#endif + pl[0] = bytes2pnl(bytes); + assert(pl[0] >= size); + pl[1] = 0; + pl += 1; } return pl; } -static MDBX_TXL mdbx_txl_alloc(void) { - const size_t malloc_overhead = sizeof(void *) * 2; - const size_t bytes = mdbx_roundup2(malloc_overhead + sizeof(txnid_t) * 61, - MDBX_CACHELINE_SIZE) - - malloc_overhead; - MDBX_TXL ptr = malloc(bytes); - if (likely(ptr)) { - *ptr++ = bytes / sizeof(txnid_t) - 2; - *ptr = 0; - } - return ptr; -} - -/* Free an PNL. - * [in] pl The PNL to free. */ static void mdbx_pnl_free(MDBX_PNL pl) { if (likely(pl)) free(pl - 1); } -static void mdbx_txl_free(MDBX_TXL list) { - if (likely(list)) - free(list - 1); +/* Shrink the PNL to the default size if it has grown larger */ +static void mdbx_pnl_shrink(MDBX_PNL *ppl) { + assert(bytes2pnl(pnl2bytes(MDBX_PNL_INITIAL)) == MDBX_PNL_INITIAL); + assert(MDBX_PNL_SIZE(*ppl) <= MDBX_PNL_MAX && + MDBX_PNL_ALLOCLEN(*ppl) >= MDBX_PNL_SIZE(*ppl)); + MDBX_PNL_SIZE(*ppl) = 0; + if (unlikely(MDBX_PNL_ALLOCLEN(*ppl) > + MDBX_PNL_INITIAL + MDBX_CACHELINE_SIZE / sizeof(pgno_t))) { + const size_t bytes = pnl2bytes(MDBX_PNL_INITIAL); + MDBX_PNL pl = realloc(*ppl - 1, bytes); + if (likely(pl)) { +#if __GLIBC_PREREQ(2, 12) + const size_t bytes = malloc_usable_size(pl); +#endif + *pl = bytes2pnl(bytes); + *ppl = pl + 1; + } + } +} + +/* Grow the PNL to the size growed to at least given size */ +static int mdbx_pnl_reserve(MDBX_PNL *ppl, const size_t wanna) { + const size_t allocated = MDBX_PNL_ALLOCLEN(*ppl); + assert(MDBX_PNL_SIZE(*ppl) <= MDBX_PNL_MAX && + MDBX_PNL_ALLOCLEN(*ppl) >= MDBX_PNL_SIZE(*ppl)); + if (likely(allocated >= wanna)) + return MDBX_SUCCESS; + + if (unlikely(wanna > /* paranoia */ MDBX_PNL_MAX)) + return MDBX_TXN_FULL; + + const size_t size = (wanna + wanna - allocated < MDBX_PNL_MAX) + ? wanna + wanna - allocated + : MDBX_PNL_MAX; + const size_t bytes = pnl2bytes(size); + MDBX_PNL pl = realloc(*ppl - 1, bytes); + if (likely(pl)) { +#if __GLIBC_PREREQ(2, 12) + const size_t bytes = malloc_usable_size(pl); +#endif + *pl = bytes2pnl(bytes); + assert(*pl >= wanna); + *ppl = pl + 1; + return MDBX_SUCCESS; + } + return MDBX_ENOMEM; +} + +/* Make room for num additional elements in an PNL */ +static __inline int __must_check_result mdbx_pnl_need(MDBX_PNL *ppl, + size_t num) { + assert(MDBX_PNL_SIZE(*ppl) <= MDBX_PNL_MAX && + MDBX_PNL_ALLOCLEN(*ppl) >= MDBX_PNL_SIZE(*ppl)); + assert(num <= MDBX_PNL_MAX); + const size_t wanna = MDBX_PNL_SIZE(*ppl) + num; + return likely(MDBX_PNL_ALLOCLEN(*ppl) >= wanna) + ? MDBX_SUCCESS + : mdbx_pnl_reserve(ppl, wanna); } -/* Append ID to PNL. The PNL must be big enough. */ static __inline void mdbx_pnl_xappend(MDBX_PNL pl, pgno_t id) { - assert(pl[0] + (size_t)1 <= MDBX_PNL_ALLOCLEN(pl)); - pl[pl[0] += 1] = id; + assert(MDBX_PNL_SIZE(pl) < MDBX_PNL_ALLOCLEN(pl)); + MDBX_PNL_SIZE(pl) += 1; + MDBX_PNL_LAST(pl) = id; +} + +/* Append an ID onto an PNL */ +static int __must_check_result mdbx_pnl_append(MDBX_PNL *ppl, pgno_t id) { + /* Too big? */ + if (unlikely(MDBX_PNL_SIZE(*ppl) == MDBX_PNL_ALLOCLEN(*ppl))) { + int rc = mdbx_pnl_need(ppl, MDBX_PNL_GRANULATE); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + } + mdbx_pnl_xappend(*ppl, id); + return MDBX_SUCCESS; +} + +/* Append an PNL onto an PNL */ +static int __must_check_result mdbx_pnl_append_list(MDBX_PNL *ppl, + MDBX_PNL append) { + int rc = mdbx_pnl_need(ppl, MDBX_PNL_SIZE(append)); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + memcpy(MDBX_PNL_END(*ppl), MDBX_PNL_BEGIN(append), + MDBX_PNL_SIZE(append) * sizeof(pgno_t)); + MDBX_PNL_SIZE(*ppl) += MDBX_PNL_SIZE(append); + return MDBX_SUCCESS; +} + +/* Append an ID range onto an PNL */ +static int __must_check_result mdbx_pnl_append_range(MDBX_PNL *ppl, pgno_t id, + size_t n) { + int rc = mdbx_pnl_need(ppl, n); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + pgno_t *ap = MDBX_PNL_END(*ppl); + MDBX_PNL_SIZE(*ppl) += (unsigned)n; + for (pgno_t *const end = MDBX_PNL_END(*ppl); ap < end;) + *ap++ = id++; + return MDBX_SUCCESS; } static bool mdbx_pnl_check(MDBX_PNL pl, bool allocated) { if (pl) { + assert(MDBX_PNL_SIZE(pl) <= MDBX_PNL_MAX); if (allocated) { - assert(pl[0] <= MDBX_LIST_MAX && pl[0] <= pl[-1]); + assert(MDBX_PNL_ALLOCLEN(pl) >= MDBX_PNL_SIZE(pl)); } - for (const pgno_t *ptr = pl + pl[0]; --ptr > pl;) { - assert(MDBX_PNL_ORDERED(ptr[0], ptr[1])); - assert(ptr[0] >= NUM_METAS); - if (unlikely(MDBX_PNL_DISORDERED(ptr[0], ptr[1]) || ptr[0] < NUM_METAS)) + for (const pgno_t *scan = &MDBX_PNL_LAST(pl); --scan > pl;) { + assert(MDBX_PNL_ORDERED(scan[0], scan[1])); + assert(scan[0] >= NUM_METAS); + if (unlikely(MDBX_PNL_DISORDERED(scan[0], scan[1]) || + scan[0] < NUM_METAS)) return false; } } return true; } -/* Sort an PNL. - * [in,out] pnl The PNL to sort. */ +/* Merge an PNL onto an PNL. The destination PNL must be big enough */ +static void __hot mdbx_pnl_xmerge(MDBX_PNL pnl, MDBX_PNL merge) { + assert(mdbx_pnl_check(pnl, true)); + assert(mdbx_pnl_check(merge, false)); + pgno_t old_id, merge_id, i = MDBX_PNL_SIZE(merge), j = MDBX_PNL_SIZE(pnl), + k = i + j, total = k; + pnl[0] = + MDBX_PNL_ASCENDING ? 0 : ~(pgno_t)0; /* delimiter for pl scan below */ + old_id = pnl[j]; + while (i) { + merge_id = merge[i--]; + for (; MDBX_PNL_ORDERED(merge_id, old_id); old_id = pnl[--j]) + pnl[k--] = old_id; + pnl[k--] = merge_id; + } + MDBX_PNL_SIZE(pnl) = total; + assert(mdbx_pnl_check(pnl, true)); +} + +/* Sort an PNL */ static void __hot mdbx_pnl_sort(MDBX_PNL pnl) { /* Max possible depth of int-indexed tree * 2 items/level */ int istack[sizeof(int) * CHAR_BIT * 2]; @@ -573,7 +689,7 @@ static void __hot mdbx_pnl_sort(MDBX_PNL pnl) { (b) = tmp_pgno; \ } while (0) - ir = (int)pnl[0]; + ir = (int)MDBX_PNL_SIZE(pnl); l = 1; jstack = 0; while (1) { @@ -649,13 +765,13 @@ static unsigned __hot mdbx_pnl_search(MDBX_PNL pnl, pgno_t id) { unsigned base = 0; unsigned cursor = 1; int val = 0; - unsigned n = pnl[0]; + unsigned n = MDBX_PNL_SIZE(pnl); while (n > 0) { unsigned pivot = n >> 1; cursor = base + pivot + 1; - val = MDBX_PNL_ASCENDING ? mdbx_cmp2int(pnl[cursor], id) - : mdbx_cmp2int(id, pnl[cursor]); + val = MDBX_PNL_ASCENDING ? mdbx_cmp2int(id, pnl[cursor]) + : mdbx_cmp2int(pnl[cursor], id); if (val < 0) { n = pivot; @@ -673,152 +789,85 @@ static unsigned __hot mdbx_pnl_search(MDBX_PNL pnl, pgno_t id) { return cursor; } -/* Shrink an PNL. - * Return the PNL to the default size if it has grown larger. - * [in,out] ppl Address of the PNL to shrink. */ -static void mdbx_pnl_shrink(MDBX_PNL *ppl) { - MDBX_PNL pl = *ppl - 1; - pl[1] = 0; - if (unlikely(*pl > MDBX_LIST_MAX)) { - /* shrink to MDBX_LIST_MAX */ - pl = realloc(pl, (MDBX_LIST_MAX + 2) * sizeof(pgno_t)); - if (likely(pl)) { - *pl = MDBX_LIST_MAX; - *ppl = pl + 1; - } - } +/*----------------------------------------------------------------------------*/ + +static __inline size_t txl2bytes(const size_t size) { + assert(size > 0 && size <= MDBX_TXL_MAX * 2); + size_t bytes = + mdbx_roundup2(MDBX_ASSUME_MALLOC_OVERHEAD + sizeof(txnid_t) * (size + 2), + MDBX_TXL_GRANULATE * sizeof(txnid_t)) - + MDBX_ASSUME_MALLOC_OVERHEAD; + return bytes; } -/* Grow an PNL. - * Return the PNL to the size growed by given number. - * [in,out] ppl Address of the PNL to grow. */ -static int __must_check_result mdbx_pnl_grow(MDBX_PNL *ppl, size_t num) { - MDBX_PNL pl = *ppl - 1; - assert(pl[1] <= MDBX_LIST_MAX && pl[1] <= pl[0]); - assert(num <= MDBX_LIST_MAX); - num += pl[0]; - if (unlikely(num > MDBX_LIST_MAX)) +static __inline size_t bytes2txl(const size_t bytes) { + size_t size = bytes / sizeof(txnid_t); + assert(size > 2 && size <= MDBX_TXL_MAX * 2); + return size - 2; +} + +static MDBX_TXL mdbx_txl_alloc(void) { + const size_t bytes = txl2bytes(MDBX_TXL_INITIAL); + MDBX_TXL tl = malloc(bytes); + if (likely(tl)) { +#if __GLIBC_PREREQ(2, 12) + const size_t bytes = malloc_usable_size(tl); +#endif + tl[0] = bytes2txl(bytes); + assert(tl[0] >= MDBX_TXL_INITIAL); + tl[1] = 0; + tl += 1; + } + return tl; +} + +static void mdbx_txl_free(MDBX_TXL tl) { + if (likely(tl)) + free(tl - 1); +} + +static int mdbx_txl_reserve(MDBX_TXL *ptl, const size_t wanna) { + const size_t allocated = (size_t)MDBX_PNL_ALLOCLEN(*ptl); + assert(MDBX_PNL_SIZE(*ptl) <= MDBX_TXL_MAX && + MDBX_PNL_ALLOCLEN(*ptl) >= MDBX_PNL_SIZE(*ptl)); + if (likely(allocated >= wanna)) + return MDBX_SUCCESS; + + if (unlikely(wanna > /* paranoia */ MDBX_TXL_MAX)) return MDBX_TXN_FULL; - /* grow it */ - pl = realloc(pl, (num + 2) * sizeof(pgno_t)); - if (unlikely(!pl)) - return MDBX_ENOMEM; - *pl = (pgno_t)num; - *ppl = pl + 1; - return MDBX_SUCCESS; -} -/* Make room for num additional elements in an PNL. - * [in,out] ppl Address of the PNL. - * [in] num Number of elements to make room for. - * Returns 0 on success, MDBX_ENOMEM on failure. */ -static int __must_check_result mdbx_pnl_need(MDBX_PNL *ppl, size_t num) { - MDBX_PNL pl = *ppl - 1; - assert(pl[1] <= MDBX_LIST_MAX && pl[1] <= pl[0]); - assert(num <= MDBX_LIST_MAX); - num += pl[1]; - if (unlikely(num > pl[0])) { - if (unlikely(num > MDBX_LIST_MAX)) - return MDBX_TXN_FULL; - num = (num + num / 4 + (256 + 2)) & ~255u; - num = (num < MDBX_LIST_MAX + 2) ? num : MDBX_LIST_MAX + 2; - pl = realloc(pl, num * sizeof(pgno_t)); - if (unlikely(!pl)) - return MDBX_ENOMEM; - *pl = (pgno_t)num - 2; - *ppl = pl + 1; + const size_t size = (wanna + wanna - allocated < MDBX_TXL_MAX) + ? wanna + wanna - allocated + : MDBX_TXL_MAX; + const size_t bytes = txl2bytes(size); + MDBX_TXL tl = realloc(*ptl - 1, bytes); + if (likely(tl)) { +#if __GLIBC_PREREQ(2, 12) + const size_t bytes = malloc_usable_size(tl); +#endif + *tl = bytes2txl(bytes); + assert(*tl >= wanna); + *ptl = tl + 1; + return MDBX_SUCCESS; } - return MDBX_SUCCESS; + return MDBX_ENOMEM; } -/* Append an ID onto an PNL. - * [in,out] ppl Address of the PNL to append to. - * [in] id The ID to append. - * Returns 0 on success, MDBX_ENOMEM if the PNL is too large. */ -static int __must_check_result mdbx_pnl_append(MDBX_PNL *ppl, pgno_t id) { - MDBX_PNL pl = *ppl; - /* Too big? */ - if (unlikely(pl[0] >= pl[-1])) { - int rc = mdbx_pnl_grow(ppl, MDBX_LIST_MAX); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; - pl = *ppl; - } - pl[0]++; - pl[pl[0]] = id; - return MDBX_SUCCESS; +static __inline int __must_check_result mdbx_txl_need(MDBX_TXL *ptl, + size_t num) { + assert(MDBX_PNL_SIZE(*ptl) <= MDBX_TXL_MAX && + MDBX_PNL_ALLOCLEN(*ptl) >= MDBX_PNL_SIZE(*ptl)); + assert(num <= MDBX_PNL_MAX); + const size_t wanna = (size_t)MDBX_PNL_SIZE(*ptl) + num; + return likely(MDBX_PNL_ALLOCLEN(*ptl) >= wanna) + ? MDBX_SUCCESS + : mdbx_txl_reserve(ptl, wanna); } -/* Append an PNL onto an PNL. - * [in,out] ppl Address of the PNL to append to. - * [in] app The PNL to append. - * Returns 0 on success, MDBX_ENOMEM if the PNL is too large. */ -static int __must_check_result mdbx_pnl_append_list(MDBX_PNL *ppl, - MDBX_PNL app) { - MDBX_PNL pnl = *ppl; - /* Too big? */ - if (unlikely(pnl[0] + app[0] >= pnl[-1])) { - int rc = mdbx_pnl_grow(ppl, app[0]); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; - pnl = *ppl; - } - memcpy(&pnl[pnl[0] + 1], &app[1], app[0] * sizeof(pgno_t)); - pnl[0] += app[0]; - return MDBX_SUCCESS; -} - -/* Append an ID range onto an PNL. - * [in,out] ppl Address of the PNL to append to. - * [in] id The lowest ID to append. - * [in] n Number of IDs to append. - * Returns 0 on success, MDBX_ENOMEM if the PNL is too large. */ -static int __must_check_result mdbx_pnl_append_range(MDBX_PNL *ppl, pgno_t id, - size_t n) { - pgno_t *pnl = *ppl, len = pnl[0]; - /* Too big? */ - if (unlikely(len + n > pnl[-1])) { - int rc = mdbx_pnl_grow(ppl, n | MDBX_LIST_MAX); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; - pnl = *ppl; - } - pnl[0] = len + (pgno_t)n; - pnl += len; - while (n) - pnl[n--] = id++; - return MDBX_SUCCESS; -} - -/* Merge an PNL onto an PNL. The destination PNL must be big enough. - * [in] pl The PNL to merge into. - * [in] merge The PNL to merge. */ -static void __hot mdbx_pnl_xmerge(MDBX_PNL pnl, MDBX_PNL merge) { - assert(mdbx_pnl_check(pnl, true)); - assert(mdbx_pnl_check(merge, false)); - pgno_t old_id, merge_id, i = merge[0], j = pnl[0], k = i + j, total = k; - pnl[0] = - MDBX_PNL_ASCENDING ? 0 : ~(pgno_t)0; /* delimiter for pl scan below */ - old_id = pnl[j]; - while (i) { - merge_id = merge[i--]; - for (; MDBX_PNL_ORDERED(merge_id, old_id); old_id = pnl[--j]) - pnl[k--] = old_id; - pnl[k--] = merge_id; - } - pnl[0] = total; - assert(mdbx_pnl_check(pnl, true)); -} - -static int __must_check_result mdbx_txl_grow(MDBX_TXL *ptr, size_t num) { - MDBX_TXL list = *ptr - 1; - /* grow it */ - list = realloc(list, ((size_t)*list + num + 2) * sizeof(txnid_t)); - if (unlikely(!list)) - return MDBX_ENOMEM; - *list += num; - *ptr = list + 1; - return MDBX_SUCCESS; +static __inline void mdbx_txl_xappend(MDBX_TXL tl, txnid_t id) { + assert(MDBX_PNL_SIZE(tl) < MDBX_PNL_ALLOCLEN(tl)); + MDBX_PNL_SIZE(tl) += 1; + MDBX_PNL_LAST(tl) = id; } static int mdbx_txl_cmp(const void *pa, const void *pb) { @@ -832,51 +881,43 @@ static void mdbx_txl_sort(MDBX_TXL ptr) { qsort(ptr + 1, (size_t)ptr[0], sizeof(*ptr), mdbx_txl_cmp); } -static int __must_check_result mdbx_txl_append(MDBX_TXL *ptr, txnid_t id) { - MDBX_TXL list = *ptr; - /* Too big? */ - if (unlikely(list[0] >= list[-1])) { - int rc = mdbx_txl_grow(ptr, (size_t)list[0]); +static int __must_check_result mdbx_txl_append(MDBX_TXL *ptl, txnid_t id) { + if (unlikely(MDBX_PNL_SIZE(*ptl) == MDBX_PNL_ALLOCLEN(*ptl))) { + int rc = mdbx_txl_need(ptl, MDBX_TXL_GRANULATE); if (unlikely(rc != MDBX_SUCCESS)) return rc; - list = *ptr; } - list[0]++; - list[list[0]] = id; + mdbx_txl_xappend(*ptl, id); return MDBX_SUCCESS; } -static int __must_check_result mdbx_txl_append_list(MDBX_TXL *ptr, +static int __must_check_result mdbx_txl_append_list(MDBX_TXL *ptl, MDBX_TXL append) { - MDBX_TXL list = *ptr; - /* Too big? */ - if (unlikely(list[0] + append[0] >= list[-1])) { - int rc = mdbx_txl_grow(ptr, (size_t)append[0]); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; - list = *ptr; - } - memcpy(&list[list[0] + 1], &append[1], (size_t)append[0] * sizeof(txnid_t)); - list[0] += append[0]; + int rc = mdbx_txl_need(ptl, (size_t)MDBX_PNL_SIZE(append)); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + memcpy(MDBX_PNL_END(*ptl), MDBX_PNL_BEGIN(append), + (size_t)MDBX_PNL_SIZE(append) * sizeof(txnid_t)); + MDBX_PNL_SIZE(*ptl) += MDBX_PNL_SIZE(append); return MDBX_SUCCESS; } -/* Search for an ID in an ID2L. - * [in] pnl The ID2L to search. - * [in] id The ID to search for. - * Returns The index of the first ID2 whose mid member is greater than - * or equal to id. */ -static unsigned __hot mdbx_mid2l_search(MDBX_ID2L pnl, pgno_t id) { - /* binary search of id in pnl +/*----------------------------------------------------------------------------*/ + +/* Returns the index of the first dirty-page whose pgno + * member is greater than or equal to id. */ +static unsigned __hot mdbx_dpl_search(MDBX_DPL dl, pgno_t id) { + /* binary search of id in array * if found, returns position of id * if not found, returns first position greater than id */ unsigned base = 0; unsigned cursor = 1; int val = 0; - unsigned n = pnl[0].length; + unsigned n = dl->length; #if MDBX_DEBUG - for (const MDBX_ID2 *ptr = pnl + pnl[0].length; --ptr > pnl;) { + for (const MDBX_DP *ptr = dl + dl->length; --ptr > dl;) { assert(ptr[0].pgno < ptr[1].pgno); assert(ptr[0].pgno >= NUM_METAS); } @@ -885,7 +926,7 @@ static unsigned __hot mdbx_mid2l_search(MDBX_ID2L pnl, pgno_t id) { while (n > 0) { unsigned pivot = n >> 1; cursor = base + pivot + 1; - val = mdbx_cmp2int(id, pnl[cursor].pgno); + val = mdbx_cmp2int(id, dl[cursor].pgno); if (val < 0) { n = pivot; @@ -903,60 +944,56 @@ static unsigned __hot mdbx_mid2l_search(MDBX_ID2L pnl, pgno_t id) { return cursor; } -static int mdbx_mid2l_cmp(const void *pa, const void *pb) { - const MDBX_ID2 a = *(MDBX_ID2L)pa; - const MDBX_ID2 b = *(MDBX_ID2L)pb; +static int mdbx_dpl_cmp(const void *pa, const void *pb) { + const MDBX_DP a = *(MDBX_DPL)pa; + const MDBX_DP b = *(MDBX_DPL)pb; return mdbx_cmp2int(a.pgno, b.pgno); } -static void mdbx_mid2l_sort(MDBX_ID2L pnl) { - assert(pnl[0].limit >= pnl[0].length); +static void mdbx_dpl_sort(MDBX_DPL dl) { + assert(dl->length <= MDBX_DPL_TXNFULL); /* LY: temporary */ - qsort(pnl + 1, pnl->length, sizeof(*pnl), mdbx_mid2l_cmp); + qsort(dl + 1, dl->length, sizeof(*dl), mdbx_dpl_cmp); } -static int __must_check_result mdbx_mid2l_insert(MDBX_ID2L pnl, pgno_t pgno, - MDBX_page *page) { - assert(pnl[0].limit >= pnl[0].length); - unsigned x = mdbx_mid2l_search(pnl, pgno); +static int __must_check_result mdbx_dpl_insert(MDBX_DPL dl, pgno_t pgno, + MDBX_page *page) { + assert(dl->length <= MDBX_DPL_TXNFULL); + unsigned x = mdbx_dpl_search(dl, pgno); assert((int)x > 0); - if (unlikely(pnl[x].pgno == pgno && x <= pnl[0].length)) + if (unlikely(dl[x].pgno == pgno && x <= dl->length)) return /* duplicate */ MDBX_PROBLEM; - if (unlikely(pnl[0].length >= MDBX_LIST_MAX)) - return /* too big */ MDBX_TXN_FULL; + if (unlikely(dl->length == MDBX_DPL_TXNFULL)) + return MDBX_TXN_FULL; - /* insert id */ - assert(pnl[0].limit >= pnl[0].length); - pnl[0].length++; - for (unsigned i = pnl[0].length; i > x; i--) - pnl[i] = pnl[i - 1]; + /* insert page */ + for (unsigned i = dl->length += 1; i > x; --i) + dl[i] = dl[i - 1]; - MDBX_ID2 *entry = &pnl[x]; - entry->pgno = pgno; - entry->ptr = page; + dl[x].pgno = pgno; + dl[x].ptr = page; return MDBX_SUCCESS; } -static int __must_check_result mdbx_mid2l_append(MDBX_ID2L pnl, pgno_t pgno, - MDBX_page *page) { - assert(pnl[0].limit >= pnl[0].length); +static int __must_check_result mdbx_dpl_append(MDBX_DPL dl, pgno_t pgno, + MDBX_page *page) { + assert(dl->length <= MDBX_DPL_TXNFULL); #if MDBX_DEBUG - for (unsigned i = pnl[0].length; i > 0; --i) { - assert(pnl[i].pgno != pgno); - if (unlikely(pnl[i].pgno == pgno)) + for (unsigned i = dl->length; i > 0; --i) { + assert(dl[i].pgno != pgno); + if (unlikely(dl[i].pgno == pgno)) return MDBX_PROBLEM; } #endif - /* Too big? */ - if (unlikely(pnl[0].length >= MDBX_LIST_MAX)) - return /* too big */ MDBX_TXN_FULL; + if (unlikely(dl->length == MDBX_DPL_TXNFULL)) + return MDBX_TXN_FULL; - pnl[0].length += 1; - MDBX_ID2 *entry = &pnl[pnl[0].length]; - entry->pgno = pgno; - entry->ptr = page; + /* append page */ + const unsigned i = dl->length += 1; + dl[i].pgno = pgno; + dl[i].ptr = page; return MDBX_SUCCESS; } @@ -1451,13 +1488,13 @@ static void mdbx_dpage_free(MDBX_env *env, MDBX_page *dp) { /* Return all dirty pages to dpage list */ static void mdbx_dlist_free(MDBX_txn *txn) { MDBX_env *env = txn->mt_env; - MDBX_ID2L dl = txn->mt_rw_dirtylist; - size_t i, n = dl[0].length; + MDBX_DPL dl = txn->mt_rw_dirtylist; + size_t i, n = dl->length; for (i = 1; i <= n; i++) mdbx_dpage_free(env, dl[i].ptr); - dl[0].length = 0; + dl->length = 0; } static size_t bytes_align2os_bytes(const MDBX_env *env, size_t bytes) { @@ -1543,12 +1580,12 @@ static int mdbx_page_loose(MDBX_cursor *mc, MDBX_page *mp) { if ((mp->mp_flags & P_DIRTY) && mc->mc_dbi != FREE_DBI) { if (txn->mt_parent) { mdbx_cassert(mc, (txn->mt_env->me_flags & MDBX_WRITEMAP) == 0); - MDBX_ID2 *dl = txn->mt_rw_dirtylist; + MDBX_DP *dl = txn->mt_rw_dirtylist; /* If txn has a parent, * make sure the page is in our dirty list. */ - if (dl[0].length) { - unsigned x = mdbx_mid2l_search(dl, pgno); - if (x <= dl[0].length && dl[x].pgno == pgno) { + if (dl->length) { + unsigned x = mdbx_dpl_search(dl, pgno); + if (x <= dl->length && dl[x].pgno == pgno) { if (unlikely(mp != dl[x].ptr)) { /* bad cursor? */ mdbx_error("wrong page 0x%p #%" PRIaPGNO " in the dirtylist[%d], expecting %p", @@ -1693,7 +1730,7 @@ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep); * Returns 0 on success, non-zero on failure. */ static int mdbx_page_spill(MDBX_cursor *m0, MDBX_val *key, MDBX_val *data) { MDBX_txn *txn = m0->mc_txn; - MDBX_ID2L dl = txn->mt_rw_dirtylist; + MDBX_DPL dl = txn->mt_rw_dirtylist; if (m0->mc_flags & C_SUB) return MDBX_SUCCESS; @@ -1713,18 +1750,18 @@ static int mdbx_page_spill(MDBX_cursor *m0, MDBX_val *key, MDBX_val *data) { return MDBX_SUCCESS; if (!txn->mt_spill_pages) { - txn->mt_spill_pages = mdbx_pnl_alloc(MDBX_LIST_MAX); + txn->mt_spill_pages = mdbx_pnl_alloc(MDBX_DPL_TXNFULL); if (unlikely(!txn->mt_spill_pages)) return MDBX_ENOMEM; } else { /* purge deleted slots */ MDBX_PNL sl = txn->mt_spill_pages; - pgno_t num = sl[0], j = 0; + pgno_t num = MDBX_PNL_SIZE(sl), j = 0; for (i = 1; i <= num; i++) { if ((sl[i] & 1) == 0) sl[++j] = sl[i]; } - sl[0] = j; + MDBX_PNL_SIZE(sl) = j; } /* Preserve pages which may soon be dirtied again */ @@ -1738,12 +1775,12 @@ static int mdbx_page_spill(MDBX_cursor *m0, MDBX_val *key, MDBX_val *data) { * of those pages will need to be used again. So now we spill only 1/8th * of the dirty pages. Testing revealed this to be a good tradeoff, * better than 1/2, 1/4, or 1/10. */ - if (need < MDBX_LIST_MAX / 8) - need = MDBX_LIST_MAX / 8; + if (need < MDBX_DPL_TXNFULL / 8) + need = MDBX_DPL_TXNFULL / 8; /* Save the page IDs of all the pages we're flushing */ /* flush from the tail forward, this saves a lot of shifting later on. */ - for (i = dl[0].length; i && need; i--) { + for (i = dl->length; i && need; i--) { pgno_t pn = dl[i].pgno << 1; MDBX_page *dp = dl[i].ptr; if (dp->mp_flags & (P_LOOSE | P_KEEP)) @@ -1755,7 +1792,8 @@ static int mdbx_page_spill(MDBX_cursor *m0, MDBX_val *key, MDBX_val *data) { for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { if (tx2->mt_spill_pages) { unsigned j = mdbx_pnl_search(tx2->mt_spill_pages, pn); - if (j <= tx2->mt_spill_pages[0] && tx2->mt_spill_pages[j] == pn) { + if (j <= MDBX_PNL_SIZE(tx2->mt_spill_pages) && + tx2->mt_spill_pages[j] == pn) { dp->mp_flags |= P_KEEP; break; } @@ -2012,9 +2050,8 @@ static txnid_t mdbx_find_oldest(MDBX_txn *txn) { /* Add a page to the txn's dirty list */ static int __must_check_result mdbx_page_dirty(MDBX_txn *txn, MDBX_page *mp) { - int (*const adder)(MDBX_ID2L, pgno_t pgno, MDBX_page * page) = - (txn->mt_flags & MDBX_TXN_WRITEMAP) ? mdbx_mid2l_append - : mdbx_mid2l_insert; + int (*const adder)(MDBX_DPL, pgno_t pgno, MDBX_page * page) = + (txn->mt_flags & MDBX_TXN_WRITEMAP) ? mdbx_dpl_append : mdbx_dpl_insert; const int rc = adder(txn->mt_rw_dirtylist, mp->mp_pgno, mp); if (unlikely(rc != MDBX_SUCCESS)) { txn->mt_flags |= MDBX_TXN_ERROR; @@ -2207,7 +2244,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, mdbx_tassert(txn, mdbx_pnl_check(env->me_reclaimed_pglist, true)); pgno_t pgno, *repg_list = env->me_reclaimed_pglist; - unsigned repg_pos = 0, repg_len = repg_list ? repg_list[0] : 0; + unsigned repg_pos = 0, repg_len = repg_list ? MDBX_PNL_SIZE(repg_list) : 0; txnid_t oldest = 0, last = 0; const unsigned wanna_range = num - 1; @@ -2251,7 +2288,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, /* Prepare to fetch more and coalesce */ oldest = (flags & MDBX_LIFORECLAIM) ? mdbx_find_oldest(txn) - : env->me_oldest[0]; + : *env->me_oldest; rc = mdbx_cursor_init(&recur, txn, FREE_DBI); if (unlikely(rc != MDBX_SUCCESS)) goto fail; @@ -2313,7 +2350,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, /* skip IDs of records that already reclaimed */ if (txn->mt_lifo_reclaimed) { unsigned i; - for (i = (unsigned)txn->mt_lifo_reclaimed[0]; i > 0; --i) + for (i = (unsigned)MDBX_PNL_SIZE(txn->mt_lifo_reclaimed); i > 0; --i) if (txn->mt_lifo_reclaimed[i] == last) break; if (i) @@ -2338,10 +2375,9 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, /* Append PNL from FreeDB record to me_reclaimed_pglist */ mdbx_cassert(mc, (mc->mc_flags & C_GCFREEZE) == 0); pgno_t *re_pnl = (pgno_t *)data.iov_base; - mdbx_tassert(txn, re_pnl[0] == 0 || - data.iov_len == (re_pnl[0] + 1) * sizeof(pgno_t)); + mdbx_tassert(txn, data.iov_len == MDBX_PNL_SIZEOF(re_pnl)); mdbx_tassert(txn, mdbx_pnl_check(re_pnl, false)); - repg_pos = re_pnl[0]; + repg_pos = MDBX_PNL_SIZE(re_pnl); if (!repg_list) { if (unlikely(!(env->me_reclaimed_pglist = repg_list = mdbx_pnl_alloc(repg_pos)))) { @@ -2374,7 +2410,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, /* Merge in descending sorted order */ mdbx_pnl_xmerge(repg_list, re_pnl); - repg_len = repg_list[0]; + repg_len = MDBX_PNL_SIZE(repg_list); if (unlikely((flags & MDBX_ALLOC_CACHE) == 0)) { /* Done for a kick-reclaim mode, actually no page needed */ return MDBX_SUCCESS; @@ -2406,7 +2442,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, for (pgno_t *move = begin; higest < end; ++move, ++higest) *move = *higest; #endif /* MDBX_PNL sort-order */ - repg_list[0] = repg_len; + MDBX_PNL_SIZE(repg_list) = repg_len; mdbx_info("refunded %" PRIaPGNO " pages: %" PRIaPGNO " -> %" PRIaPGNO, tail - txn->mt_next_pgno, tail, txn->mt_next_pgno); txn->mt_next_pgno = tail; @@ -2415,7 +2451,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp, } /* Don't try to coalesce too much. */ - if (unlikely(repg_len > MDBX_LIST_MAX / 2)) + if (unlikely(repg_len > MDBX_DPL_TXNFULL / 2)) break; if (flags & MDBX_COALESCE) { if (repg_len /* current size */ >= env->me_maxgc_ov1page || @@ -2549,7 +2585,7 @@ done: mdbx_tassert(txn, pgno < txn->mt_next_pgno); mdbx_tassert(txn, pgno == repg_list[repg_pos]); /* Cutoff allocated pages from me_reclaimed_pglist */ - repg_list[0] = repg_len -= num; + MDBX_PNL_SIZE(repg_list) = repg_len -= num; for (unsigned i = repg_pos - num; i < repg_len;) repg_list[++i] = repg_list[++repg_pos]; mdbx_tassert(txn, mdbx_pnl_check(env->me_reclaimed_pglist, true)); @@ -2617,7 +2653,8 @@ static int __must_check_result mdbx_page_unspill(MDBX_txn *txn, MDBX_page *mp, if (!tx2->mt_spill_pages) continue; x = mdbx_pnl_search(tx2->mt_spill_pages, pn); - if (x <= tx2->mt_spill_pages[0] && tx2->mt_spill_pages[x] == pn) { + if (x <= MDBX_PNL_SIZE(tx2->mt_spill_pages) && + tx2->mt_spill_pages[x] == pn) { MDBX_page *np; int num; if (txn->mt_dirtyroom == 0) @@ -2639,8 +2676,8 @@ static int __must_check_result mdbx_page_unspill(MDBX_txn *txn, MDBX_page *mp, /* If in current txn, this page is no longer spilled. * If it happens to be the last page, truncate the spill list. * Otherwise mark it as deleted by setting the LSB. */ - if (x == txn->mt_spill_pages[0]) - txn->mt_spill_pages[0]--; + if (x == MDBX_PNL_SIZE(txn->mt_spill_pages)) + MDBX_PNL_SIZE(txn->mt_spill_pages)--; else txn->mt_spill_pages[x] |= 1; } /* otherwise, if belonging to a parent txn, the @@ -2700,12 +2737,12 @@ static int mdbx_page_touch(MDBX_cursor *mc) { } } else if (txn->mt_parent && !IS_SUBP(mp)) { mdbx_tassert(txn, (txn->mt_env->me_flags & MDBX_WRITEMAP) == 0); - MDBX_ID2 *dl = txn->mt_rw_dirtylist; + MDBX_DP *dl = txn->mt_rw_dirtylist; pgno = mp->mp_pgno; /* If txn has a parent, make sure the page is in our dirty list. */ - if (dl[0].length) { - unsigned x = mdbx_mid2l_search(dl, pgno); - if (x <= dl[0].length && dl[x].pgno == pgno) { + if (dl->length) { + unsigned x = mdbx_dpl_search(dl, pgno); + if (x <= dl->length && dl[x].pgno == pgno) { if (unlikely(mp != dl[x].ptr)) { /* bad cursor? */ mdbx_error("wrong page 0x%p #%" PRIaPGNO " in the dirtylist[%d], expecting %p", @@ -2719,14 +2756,14 @@ static int mdbx_page_touch(MDBX_cursor *mc) { } mdbx_debug("clone db %d page %" PRIaPGNO, DDBI(mc), mp->mp_pgno); - mdbx_cassert(mc, dl[0].length < MDBX_LIST_MAX); + mdbx_cassert(mc, dl->length <= MDBX_DPL_TXNFULL); /* No - copy it */ np = mdbx_page_malloc(txn, 1); if (unlikely(!np)) { rc = MDBX_ENOMEM; goto fail; } - rc = mdbx_mid2l_insert(dl, pgno, np); + rc = mdbx_dpl_insert(dl, pgno, np); if (unlikely(rc)) { mdbx_dpage_free(txn->mt_env, np); goto fail; @@ -3088,14 +3125,14 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) { txn->mt_child = NULL; txn->mt_loose_pages = NULL; txn->mt_loose_count = 0; - txn->mt_dirtyroom = MDBX_LIST_MAX; + txn->mt_dirtyroom = MDBX_DPL_TXNFULL; txn->mt_rw_dirtylist = env->me_dirtylist; - txn->mt_rw_dirtylist[0].length = 0; + txn->mt_rw_dirtylist->length = 0; txn->mt_befree_pages = env->me_free_pgs; - txn->mt_befree_pages[0] = 0; + MDBX_PNL_SIZE(txn->mt_befree_pages) = 0; txn->mt_spill_pages = NULL; if (txn->mt_lifo_reclaimed) - txn->mt_lifo_reclaimed[0] = 0; + MDBX_PNL_SIZE(txn->mt_lifo_reclaimed) = 0; env->me_txn = txn; memcpy(txn->mt_dbiseqs, env->me_dbiseqs, env->me_maxdbs * sizeof(unsigned)); /* Copy the DB info and flags */ @@ -3241,17 +3278,16 @@ int mdbx_txn_begin(MDBX_env *env, MDBX_txn *parent, unsigned flags, unsigned i; txn->mt_cursors = (MDBX_cursor **)(txn->mt_dbs + env->me_maxdbs); txn->mt_dbiseqs = parent->mt_dbiseqs; - txn->mt_rw_dirtylist = malloc(sizeof(MDBX_ID2) * (MDBX_LIST_MAX + 1)); + txn->mt_rw_dirtylist = malloc(sizeof(MDBX_DP) * (MDBX_DPL_TXNFULL + 1)); if (!txn->mt_rw_dirtylist || - !(txn->mt_befree_pages = mdbx_pnl_alloc(MDBX_LIST_MAX))) { + !(txn->mt_befree_pages = mdbx_pnl_alloc(MDBX_PNL_INITIAL))) { free(txn->mt_rw_dirtylist); free(txn); return MDBX_ENOMEM; } txn->mt_txnid = parent->mt_txnid; txn->mt_dirtyroom = parent->mt_dirtyroom; - txn->mt_rw_dirtylist[0].limit = MDBX_LIST_MAX; - txn->mt_rw_dirtylist[0].length = 0; + txn->mt_rw_dirtylist->length = 0; txn->mt_spill_pages = NULL; txn->mt_next_pgno = parent->mt_next_pgno; txn->mt_end_pgno = parent->mt_end_pgno; @@ -3269,7 +3305,8 @@ int mdbx_txn_begin(MDBX_env *env, MDBX_txn *parent, unsigned flags, env->me_pgstate; /* save parent me_reclaimed_pglist & co */ if (env->me_reclaimed_pglist) { size = MDBX_PNL_SIZEOF(env->me_reclaimed_pglist); - env->me_reclaimed_pglist = mdbx_pnl_alloc(env->me_reclaimed_pglist[0]); + env->me_reclaimed_pglist = + mdbx_pnl_alloc(MDBX_PNL_SIZE(env->me_reclaimed_pglist)); if (likely(env->me_reclaimed_pglist)) memcpy(env->me_reclaimed_pglist, ntxn->mnt_pgstate.mf_reclaimed_pglist, size); @@ -3408,7 +3445,7 @@ static int mdbx_txn_end(MDBX_txn *txn, unsigned mode) { } if (txn->mt_lifo_reclaimed) { - txn->mt_lifo_reclaimed[0] = 0; + MDBX_PNL_SIZE(txn->mt_lifo_reclaimed) = 0; if (txn != env->me_txn0) { mdbx_txl_free(txn->mt_lifo_reclaimed); txn->mt_lifo_reclaimed = NULL; @@ -3496,7 +3533,7 @@ int mdbx_txn_abort(MDBX_txn *txn) { static __inline int mdbx_backlog_size(MDBX_txn *txn) { int reclaimed = txn->mt_env->me_reclaimed_pglist - ? txn->mt_env->me_reclaimed_pglist[0] + ? MDBX_PNL_SIZE(txn->mt_env->me_reclaimed_pglist) : 0; return reclaimed + txn->mt_loose_count + txn->mt_end_pgno - txn->mt_next_pgno; } @@ -3543,10 +3580,11 @@ static int mdbx_audit(MDBX_txn *txn, unsigned befree_stored) { ? 0 : txn->mt_loose_count + (txn->mt_env->me_reclaimed_pglist - ? txn->mt_env->me_reclaimed_pglist[0] + ? MDBX_PNL_SIZE(txn->mt_env->me_reclaimed_pglist) : 0) + - (txn->mt_befree_pages ? txn->mt_befree_pages[0] - befree_stored - : 0); + (txn->mt_befree_pages + ? MDBX_PNL_SIZE(txn->mt_befree_pages) - befree_stored + : 0); MDBX_cursor_couple cx; int rc = mdbx_cursor_init(&cx.outer, txn, FREE_DBI); @@ -3603,13 +3641,14 @@ static int mdbx_audit(MDBX_txn *txn, unsigned befree_stored) { return MDBX_SUCCESS; if ((txn->mt_flags & MDBX_RDONLY) == 0) - mdbx_error( - "audit @%" PRIaTXN ": %u(pending) = %u(loose-count) + " - "%u(reclaimed-list) + %u(befree-pending) - %u(befree-stored)", - txn->mt_txnid, pending, txn->mt_loose_count, - txn->mt_env->me_reclaimed_pglist ? txn->mt_env->me_reclaimed_pglist[0] - : 0, - txn->mt_befree_pages ? txn->mt_befree_pages[0] : 0, befree_stored); + mdbx_error("audit @%" PRIaTXN ": %u(pending) = %u(loose-count) + " + "%u(reclaimed-list) + %u(befree-pending) - %u(befree-stored)", + txn->mt_txnid, pending, txn->mt_loose_count, + txn->mt_env->me_reclaimed_pglist + ? MDBX_PNL_SIZE(txn->mt_env->me_reclaimed_pglist) + : 0, + txn->mt_befree_pages ? MDBX_PNL_SIZE(txn->mt_befree_pages) : 0, + befree_stored); mdbx_error("audit @%" PRIaTXN ": %" PRIaPGNO "(pending) + %" PRIaPGNO "(free) + %" PRIaPGNO "(count) = %" PRIaPGNO "(total) <> %" PRIaPGNO "(next-pgno)", @@ -3745,15 +3784,15 @@ retry: } // filter-out list of dirty-pages from loose-pages - MDBX_ID2L dl = txn->mt_rw_dirtylist; - mdbx_mid2l_sort(dl); + MDBX_DPL dl = txn->mt_rw_dirtylist; + mdbx_dpl_sort(dl); unsigned left = dl->length; for (MDBX_page *mp = txn->mt_loose_pages; mp;) { mdbx_tassert(txn, mp->mp_pgno < txn->mt_next_pgno); mdbx_ensure(env, mp->mp_pgno >= NUM_METAS); if (left > 0) { - const unsigned i = mdbx_mid2l_search(dl, mp->mp_pgno); + const unsigned i = mdbx_dpl_search(dl, mp->mp_pgno); if (i <= dl->length && dl[i].pgno == mp->mp_pgno) { mdbx_tassert(txn, i > 0 && dl[i].ptr != dl); dl[i].ptr = dl /* mark for deletion */; @@ -3768,7 +3807,7 @@ retry: } if (left > 0) { - MDBX_ID2L r, w, end = dl + dl->length; + MDBX_DPL r, w, end = dl + dl->length; for (r = w = dl + 1; r <= end; r++) { if (r->ptr != dl) { if (r != w) @@ -4143,8 +4182,8 @@ bailout: * Returns 0 on success, non-zero on failure. */ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep) { MDBX_env *env = txn->mt_env; - MDBX_ID2L dl = txn->mt_rw_dirtylist; - unsigned i, j, pagecount = dl[0].length; + MDBX_DPL dl = txn->mt_rw_dirtylist; + unsigned i, j, pagecount = dl->length; int rc; size_t size = 0, pos = 0; pgno_t pgno = 0; @@ -4233,7 +4272,7 @@ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep) { done: i--; txn->mt_dirtyroom += i - j; - dl[0].length = j; + dl->length = j; return MDBX_SUCCESS; } @@ -4317,7 +4356,7 @@ int mdbx_txn_commit(MDBX_txn *txn) { if (txn->mt_parent) { MDBX_txn *parent = txn->mt_parent; MDBX_page **lp; - MDBX_ID2L dst, src; + MDBX_DPL dst, src; MDBX_PNL pspill; unsigned i, x, y, len, ps_len; @@ -4363,11 +4402,11 @@ int mdbx_txn_commit(MDBX_txn *txn) { dst = parent->mt_rw_dirtylist; src = txn->mt_rw_dirtylist; /* Remove anything in our dirty list from parent's spill list */ - if ((pspill = parent->mt_spill_pages) && (ps_len = pspill[0])) { + if ((pspill = parent->mt_spill_pages) && (ps_len = MDBX_PNL_SIZE(pspill))) { x = y = ps_len; - pspill[0] = (pgno_t)-1; + MDBX_PNL_SIZE(pspill) = ~(pgno_t)0; /* Mark our dirty pages as deleted in parent spill list */ - for (i = 0, len = src[0].length; ++i <= len;) { + for (i = 0, len = src->length; ++i <= len;) { pgno_t pn = src[i].pgno << 1; while (pn > pspill[x]) x--; @@ -4380,34 +4419,34 @@ int mdbx_txn_commit(MDBX_txn *txn) { for (x = y; ++x <= ps_len;) if ((pspill[x] & 1) == 0) pspill[++y] = pspill[x]; - pspill[0] = y; + MDBX_PNL_SIZE(pspill) = y; } /* Remove anything in our spill list from parent's dirty list */ - if (txn->mt_spill_pages && txn->mt_spill_pages[0]) { - for (i = 1; i <= txn->mt_spill_pages[0]; i++) { + if (txn->mt_spill_pages && MDBX_PNL_SIZE(txn->mt_spill_pages)) { + for (i = 1; i <= MDBX_PNL_SIZE(txn->mt_spill_pages); i++) { pgno_t pn = txn->mt_spill_pages[i]; if (pn & 1) continue; /* deleted spillpg */ pn >>= 1; - y = mdbx_mid2l_search(dst, pn); - if (y <= dst[0].length && dst[y].pgno == pn) { + y = mdbx_dpl_search(dst, pn); + if (y <= dst->length && dst[y].pgno == pn) { free(dst[y].ptr); - while (y < dst[0].length) { + while (y < dst->length) { dst[y] = dst[y + 1]; y++; } - dst[0].length--; + dst->length--; } } } /* Find len = length of merging our dirty list with parent's */ - x = dst[0].length; - dst[0].length = 0; /* simplify loops */ + x = dst->length; + dst->length = 0; /* simplify loops */ if (parent->mt_parent) { - len = x + src[0].length; - y = mdbx_mid2l_search(src, dst[x].pgno + 1) - 1; + len = x + src->length; + y = mdbx_dpl_search(src, dst[x].pgno + 1) - 1; for (i = x; y && i; y--) { pgno_t yp = src[y].pgno; while (yp < dst[i].pgno) @@ -4418,10 +4457,10 @@ int mdbx_txn_commit(MDBX_txn *txn) { } } } else { /* Simplify the above for single-ancestor case */ - len = MDBX_LIST_MAX - txn->mt_dirtyroom; + len = MDBX_DPL_TXNFULL - txn->mt_dirtyroom; } /* Merge our dirty list with parent's */ - y = src[0].length; + y = src->length; for (i = len; y; dst[i--] = src[y--]) { pgno_t yp = src[y].pgno; while (yp < dst[x].pgno) @@ -4430,7 +4469,7 @@ int mdbx_txn_commit(MDBX_txn *txn) { free(dst[x--].ptr); } mdbx_tassert(txn, i == x); - dst[0].length = len; + dst->length = len; free(txn->mt_rw_dirtylist); parent->mt_dirtyroom = txn->mt_dirtyroom; if (txn->mt_spill_pages) { @@ -4468,7 +4507,7 @@ int mdbx_txn_commit(MDBX_txn *txn) { mdbx_cursors_eot(txn, 0); end_mode |= MDBX_END_EOTDONE; - if (txn->mt_rw_dirtylist[0].length == 0 && + if (txn->mt_rw_dirtylist->length == 0 && !(txn->mt_flags & (MDBX_TXN_DIRTY | MDBX_TXN_SPILLS))) goto done; @@ -5133,9 +5172,10 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) { env->me_psize = (unsigned)pagesize; STATIC_ASSERT(mdbx_maxgc_ov1page(MIN_PAGESIZE) > 42); - STATIC_ASSERT(mdbx_maxgc_ov1page(MAX_PAGESIZE) < MDBX_LIST_MAX); + STATIC_ASSERT(mdbx_maxgc_ov1page(MAX_PAGESIZE) < MDBX_DPL_TXNFULL); const intptr_t maxgc_ov1page = (pagesize - PAGEHDRSZ) / sizeof(pgno_t) - 1; - mdbx_ensure(env, maxgc_ov1page > 42 && maxgc_ov1page < MDBX_LIST_MAX); + mdbx_ensure(env, + maxgc_ov1page > 42 && maxgc_ov1page < (intptr_t)MDBX_DPL_TXNFULL); env->me_maxgc_ov1page = (unsigned)maxgc_ov1page; STATIC_ASSERT(mdbx_nodemax(MIN_PAGESIZE) > 42); @@ -6048,11 +6088,9 @@ int __cold mdbx_env_open(MDBX_env *env, const char *path, unsigned flags, flags &= ~(MDBX_WRITEMAP | MDBX_MAPASYNC | MDBX_NOSYNC | MDBX_NOMETASYNC | MDBX_COALESCE | MDBX_LIFORECLAIM | MDBX_NOMEMINIT); } else { - if (!((env->me_free_pgs = mdbx_pnl_alloc(MDBX_LIST_MAX)) && - (env->me_dirtylist = calloc(MDBX_LIST_MAX + 1, sizeof(MDBX_ID2))))) + if (!((env->me_free_pgs = mdbx_pnl_alloc(MDBX_PNL_INITIAL)) && + (env->me_dirtylist = calloc(MDBX_DPL_TXNFULL + 1, sizeof(MDBX_DP))))) rc = MDBX_ENOMEM; - else - env->me_dirtylist[0].limit = MDBX_LIST_MAX; } const uint32_t saved_me_flags = env->me_flags; @@ -6587,7 +6625,7 @@ static int mdbx_page_get(MDBX_cursor *mc, pgno_t pgno, MDBX_page **ret, MDBX_txn *tx2 = txn; level = 1; do { - MDBX_ID2L dl = tx2->mt_rw_dirtylist; + MDBX_DPL dl = tx2->mt_rw_dirtylist; /* Spilled pages were dirtied in this txn and flushed * because the dirty list got full. Bring this page * back in from the map (but don't unspill it here, @@ -6595,12 +6633,13 @@ static int mdbx_page_get(MDBX_cursor *mc, pgno_t pgno, MDBX_page **ret, if (tx2->mt_spill_pages) { pgno_t pn = pgno << 1; unsigned x = mdbx_pnl_search(tx2->mt_spill_pages, pn); - if (x <= tx2->mt_spill_pages[0] && tx2->mt_spill_pages[x] == pn) + if (x <= MDBX_PNL_SIZE(tx2->mt_spill_pages) && + tx2->mt_spill_pages[x] == pn) goto mapped; } - if (dl[0].length) { - unsigned y = mdbx_mid2l_search(dl, pgno); - if (y <= dl[0].length && dl[y].pgno == pgno) { + if (dl->length) { + unsigned y = mdbx_dpl_search(dl, pgno); + if (y <= dl->length && dl[y].pgno == pgno) { p = dl[y].ptr; goto done; } @@ -6756,7 +6795,7 @@ static int mdbx_page_search(MDBX_cursor *mc, MDBX_val *key, int flags) { return MDBX_BAD_TXN; } - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); /* Make sure we're using an up-to-date root */ if (unlikely(*mc->mc_dbflag & DB_STALE)) { MDBX_cursor mc2; @@ -6798,7 +6837,7 @@ static int mdbx_page_search(MDBX_cursor *mc, MDBX_val *key, int flags) { return MDBX_NOTFOUND; } - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); mdbx_cassert(mc, root >= NUM_METAS); if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root) if (unlikely((rc = mdbx_page_get(mc, root, &mc->mc_pg[0], NULL)) != 0)) @@ -6843,24 +6882,25 @@ static int mdbx_ovpage_free(MDBX_cursor *mc, MDBX_page *mp) { * range in ancestor txns' dirty and spilled lists. */ if (env->me_reclaimed_pglist && !txn->mt_parent && ((mp->mp_flags & P_DIRTY) || - (sl && (x = mdbx_pnl_search(sl, pn)) <= sl[0] && sl[x] == pn))) { + (sl && (x = mdbx_pnl_search(sl, pn)) <= MDBX_PNL_SIZE(sl) && + sl[x] == pn))) { unsigned i, j; pgno_t *mop; - MDBX_ID2 *dl, ix, iy; + MDBX_DP *dl, ix, iy; rc = mdbx_pnl_need(&env->me_reclaimed_pglist, ovpages); if (unlikely(rc)) return rc; if (!(mp->mp_flags & P_DIRTY)) { /* This page is no longer spilled */ - if (x == sl[0]) - sl[0]--; + if (x == MDBX_PNL_SIZE(sl)) + MDBX_PNL_SIZE(sl)--; else sl[x] |= 1; goto release; } /* Remove from dirty list */ dl = txn->mt_rw_dirtylist; - x = dl[0].length--; + x = dl->length--; for (ix = dl[x]; ix.ptr != mp; ix = iy) { if (likely(x > 1)) { x--; @@ -6870,7 +6910,7 @@ static int mdbx_ovpage_free(MDBX_cursor *mc, MDBX_page *mp) { mdbx_cassert(mc, x > 1); mdbx_error("not found page 0x%p #%" PRIaPGNO " in the dirtylist", mp, mp->mp_pgno); - j = dl[0].length += 1; + j = dl->length += 1; dl[j] = ix; /* Unsorted. OK when MDBX_TXN_ERROR. */ txn->mt_flags |= MDBX_TXN_ERROR; return MDBX_PROBLEM; @@ -6882,12 +6922,12 @@ static int mdbx_ovpage_free(MDBX_cursor *mc, MDBX_page *mp) { release: /* Insert in me_reclaimed_pglist */ mop = env->me_reclaimed_pglist; - j = mop[0] + ovpages; - for (i = mop[0]; i && mop[i] < pg; i--) + j = MDBX_PNL_SIZE(mop) + ovpages; + for (i = MDBX_PNL_SIZE(mop); i && mop[i] < pg; i--) mop[j--] = mop[i]; while (j > i) mop[j--] = pg++; - mop[0] += ovpages; + MDBX_PNL_SIZE(mop) += ovpages; } else { rc = mdbx_pnl_append_range(&txn->mt_befree_pages, pg, ovpages); if (unlikely(rc)) @@ -6915,7 +6955,7 @@ static __inline int mdbx_node_read(MDBX_cursor *mc, MDBX_node *leaf, pgno_t pgno; int rc; - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) { data->iov_len = NODEDSZ(leaf); data->iov_base = NODEDATA(leaf); @@ -6976,7 +7016,7 @@ static int mdbx_cursor_sibling(MDBX_cursor *mc, int move_right) { MDBX_node *indx; MDBX_page *mp; - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); if (unlikely(mc->mc_snum < 2)) { return MDBX_NOTFOUND; /* root has no siblings */ } @@ -7212,7 +7252,7 @@ static int mdbx_cursor_set(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, MDBX_node *leaf = NULL; DKBUF; - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); if ((mc->mc_db->md_flags & MDBX_INTEGERKEY) && unlikely(key->iov_len != sizeof(uint32_t) && key->iov_len != sizeof(uint64_t))) { @@ -7508,7 +7548,7 @@ int mdbx_cursor_get(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, if (unlikely(mc->mc_txn->mt_flags & MDBX_TXN_BLOCKED)) return MDBX_BAD_TXN; - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); switch (op) { case MDBX_GET_CURRENT: { if (unlikely(!(mc->mc_flags & C_INITIALIZED))) @@ -8099,7 +8139,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, if (unlikely(!np)) return MDBX_ENOMEM; /* Note - this page is already counted in parent's dirtyroom */ - rc2 = mdbx_mid2l_insert(mc->mc_txn->mt_rw_dirtylist, pg, np); + rc2 = mdbx_dpl_insert(mc->mc_txn->mt_rw_dirtylist, pg, np); if (unlikely(rc2 != MDBX_SUCCESS)) { rc = rc2; mdbx_dpage_free(env, np); @@ -8813,7 +8853,7 @@ static int mdbx_xcursor_init1(MDBX_cursor *mc, MDBX_node *node) { if (unlikely(mx == nullptr)) return MDBX_CORRUPTED; - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); if (node->mn_flags & F_SUBDATA) { memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDBX_db)); mx->mx_cursor.mc_pg[0] = 0; @@ -8867,7 +8907,7 @@ static int mdbx_xcursor_init2(MDBX_cursor *mc, MDBX_xcursor *src_mx, if (unlikely(mx == nullptr)) return MDBX_CORRUPTED; - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); if (new_dupdata) { mx->mx_cursor.mc_snum = 1; mx->mx_cursor.mc_top = 0; @@ -8913,7 +8953,7 @@ static int mdbx_cursor_init(MDBX_cursor *mc, MDBX_txn *txn, MDBX_dbi dbi) { return rc; } - mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]); + mdbx_cassert(mc, mc->mc_txn->mt_txnid >= *mc->mc_txn->mt_env->me_oldest); int rc = MDBX_SUCCESS; if (unlikely(*mc->mc_dbflag & DB_STALE)) { rc = mdbx_page_search(mc, NULL, MDBX_PS_ROOTONLY); @@ -9555,7 +9595,7 @@ static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) { unsigned i; mdbx_cassert(csrc, - csrc->mc_txn->mt_txnid >= csrc->mc_txn->mt_env->me_oldest[0]); + csrc->mc_txn->mt_txnid >= *csrc->mc_txn->mt_env->me_oldest); cdst->mc_txn = csrc->mc_txn; cdst->mc_dbi = csrc->mc_dbi; cdst->mc_db = csrc->mc_db; @@ -11782,7 +11822,7 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) { txnid_t oldest = mdbx_reclaiming_detent(env); mdbx_assert(env, oldest < env->me_txn0->mt_txnid); mdbx_assert(env, oldest >= laggard); - mdbx_assert(env, oldest >= env->me_oldest[0]); + mdbx_assert(env, oldest >= *env->me_oldest); if (oldest == laggard || unlikely(env->me_lck == NULL /* exclusive mode */)) return oldest; @@ -11810,9 +11850,9 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) { (gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, -retry); } mdbx_notice("oom-kick: update oldest %" PRIaTXN " -> %" PRIaTXN, - env->me_oldest[0], oldest); - mdbx_assert(env, env->me_oldest[0] <= oldest); - return env->me_oldest[0] = oldest; + *env->me_oldest, oldest); + mdbx_assert(env, *env->me_oldest <= oldest); + return *env->me_oldest = oldest; } mdbx_tid_t tid; @@ -12633,7 +12673,7 @@ __cold intptr_t mdbx_limits_txnsize_max(intptr_t pagesize) { !mdbx_is_power2((size_t)pagesize))) return -MDBX_EINVAL; - return pagesize * (MDBX_LIST_MAX - 1); + return pagesize * (MDBX_DPL_TXNFULL - 1); } /*----------------------------------------------------------------------------*/ diff --git a/src/tools/mdbx_chk.c b/src/tools/mdbx_chk.c index 1442f626..e11305f8 100644 --- a/src/tools/mdbx_chk.c +++ b/src/tools/mdbx_chk.c @@ -412,7 +412,7 @@ static int handle_freedb(const uint64_t record_number, const MDBX_val *key, problem_add("entry", record_number, "wrong idl size", "%" PRIuPTR, data->iov_len); size_t number = (data->iov_len >= sizeof(pgno_t)) ? *iptr++ : 0; - if (number < 1 || number > MDBX_LIST_MAX) + if (number < 1 || number > MDBX_PNL_MAX) problem_add("entry", record_number, "wrong idl length", "%" PRIuPTR, number); else if ((number + 1) * sizeof(pgno_t) > data->iov_len) {