diff --git a/src/core.c b/src/core.c index d588bb5c..cba3f642 100644 --- a/src/core.c +++ b/src/core.c @@ -8416,7 +8416,7 @@ __hot static int page_touch(MDBX_cursor *mc) { np->mp_txnid = txn->mt_front; return MDBX_SUCCESS; } - tASSERT(txn, !IS_OVERFLOW(mp)); + tASSERT(txn, !IS_OVERFLOW(mp) && !IS_SUBP(mp)); if (IS_FROZEN(txn, mp)) { /* CoW the page */ @@ -16102,8 +16102,12 @@ __hot static __always_inline int page_get_checker_lite(const uint16_t ILL, if (((ILL & P_OVERFLOW) || !IS_OVERFLOW(page)) && (ILL & (P_BRANCH | P_LEAF | P_LEAF2)) == 0) { - if (unlikely(page->mp_upper < page->mp_lower || - ((page->mp_lower | page->mp_upper) & 1) || + /* Контроль четности page->mp_upper тут либо приводит к ложным ошибкам, + * либо слишком дорог по количеству операций. Заковырка в том, что mp_upper + * может быть нечетным на LEAF2-страницах, при нечетном количестве элементов + * нечетной длины. Поэтому четность page->mp_upper здесь не проверяется, но + * соответствующие полные проверки есть в page_check(). */ + if (unlikely(page->mp_upper < page->mp_lower || (page->mp_lower & 1) || PAGEHDRSZ + page->mp_upper > txn->mt_env->me_psize)) return bad_page(page, "invalid page' lower(%u)/upper(%u) with limit %zu\n", @@ -18082,9 +18086,9 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key, mc->mc_xcursor->mx_dbx.md_klen_min = mc->mc_xcursor->mx_dbx.md_klen_max = data->iov_len); + if (mc->mc_flags & C_SUB) + npr.page->mp_flags |= P_LEAF2; } - if ((mc->mc_db->md_flags & (MDBX_DUPSORT | MDBX_DUPFIXED)) == MDBX_DUPFIXED) - npr.page->mp_flags |= P_LEAF2; mc->mc_flags |= C_INITIALIZED; } @@ -18361,7 +18365,11 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key, if (unlikely(fp_flags & P_LEAF2)) { memcpy(page_data(mp), page_data(fp), page_numkeys(fp) * fp->mp_leaf2_ksize); + cASSERT(mc, + (((mp->mp_leaf2_ksize & page_numkeys(mp)) ^ mp->mp_upper) & + 1) == 0); } else { + cASSERT(mc, (mp->mp_upper & 1) == 0); memcpy(ptr_disp(mp, mp->mp_upper + PAGEHDRSZ), ptr_disp(fp, fp->mp_upper + PAGEHDRSZ), olddata.iov_len - fp->mp_upper - PAGEHDRSZ); @@ -18979,6 +18987,7 @@ __hot static int __must_check_result node_add_leaf2(MDBX_cursor *mc, const size_t ksize = mc->mc_db->md_xsize; cASSERT(mc, ksize == key->iov_len); const size_t nkeys = page_numkeys(mp); + cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0); /* Just using these for counting */ const intptr_t lower = mp->mp_lower + sizeof(indx_t); @@ -18998,6 +19007,8 @@ __hot static int __must_check_result node_add_leaf2(MDBX_cursor *mc, memmove(ptr_disp(ptr, ksize), ptr, diff * ksize); /* insert new key */ memcpy(ptr, key->iov_base, ksize); + + cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0); return MDBX_SUCCESS; } @@ -19164,6 +19175,7 @@ __hot static void node_del(MDBX_cursor *mc, size_t ksize) { mp->mp_lower -= sizeof(indx_t); cASSERT(mc, (size_t)UINT16_MAX - mp->mp_upper >= ksize - sizeof(indx_t)); mp->mp_upper += (indx_t)(ksize - sizeof(indx_t)); + cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0); return; } @@ -20830,8 +20842,7 @@ __cold static int page_check(const MDBX_cursor *const mc, break; } - if (unlikely(mp->mp_upper < mp->mp_lower || - ((mp->mp_lower | mp->mp_upper) & 1) || + if (unlikely(mp->mp_upper < mp->mp_lower || (mp->mp_lower & 1) || PAGEHDRSZ + mp->mp_upper > env->me_psize)) rc = bad_page(mp, "invalid page lower(%u)/upper(%u) with limit %zu\n", mp->mp_lower, mp->mp_upper, page_space(env)); @@ -20847,11 +20858,6 @@ __cold static int page_check(const MDBX_cursor *const mc, bad_page(mp, "%s-page nkeys (%zu) < %u\n", IS_BRANCH(mp) ? "branch" : "leaf", nkeys, 1 + IS_BRANCH(mp)); } - if (!IS_LEAF2(mp) && unlikely(PAGEHDRSZ + mp->mp_upper + - nkeys * sizeof(MDBX_node) + nkeys - 1 > - env->me_psize)) - rc = bad_page(mp, "invalid page upper (%u) for nkeys %zu with limit %zu\n", - mp->mp_upper, nkeys, page_space(env)); const size_t ksize_max = keysize_max(env->me_psize, 0); const size_t leaf2_ksize = mp->mp_leaf2_ksize; @@ -20860,8 +20866,20 @@ __cold static int page_check(const MDBX_cursor *const mc, (mc->mc_db->md_flags & MDBX_DUPFIXED) == 0)) rc = bad_page(mp, "unexpected leaf2-page (db-flags 0x%x)\n", mc->mc_db->md_flags); - if (unlikely(leaf2_ksize < 1 || leaf2_ksize > ksize_max)) - rc = bad_page(mp, "invalid leaf2-key length (%zu)\n", leaf2_ksize); + else if (unlikely(leaf2_ksize != mc->mc_db->md_xsize)) + rc = bad_page(mp, "invalid leaf2_ksize %zu\n", leaf2_ksize); + else if (unlikely(((leaf2_ksize & nkeys) ^ mp->mp_upper) & 1)) + rc = bad_page( + mp, "invalid page upper (%u) for nkeys %zu with leaf2-length %zu\n", + mp->mp_upper, nkeys, leaf2_ksize); + } else { + if (unlikely((mp->mp_upper & 1) || PAGEHDRSZ + mp->mp_upper + + nkeys * sizeof(MDBX_node) + + nkeys - 1 > + env->me_psize)) + rc = + bad_page(mp, "invalid page upper (%u) for nkeys %zu with limit %zu\n", + mp->mp_upper, nkeys, page_space(env)); } MDBX_val here, prev = {0, 0}; @@ -20869,7 +20887,7 @@ __cold static int page_check(const MDBX_cursor *const mc, if (IS_LEAF2(mp)) { const char *const key = page_leaf2key(mp, i, leaf2_ksize); if (unlikely(end_of_page < key + leaf2_ksize)) { - rc = bad_page(mp, "leaf2-key beyond (%zu) page-end\n", + rc = bad_page(mp, "leaf2-item beyond (%zu) page-end\n", key + leaf2_ksize - end_of_page); continue; } @@ -20878,7 +20896,7 @@ __cold static int page_check(const MDBX_cursor *const mc, if (unlikely(leaf2_ksize < mc->mc_dbx->md_klen_min || leaf2_ksize > mc->mc_dbx->md_klen_max)) rc = bad_page( - mp, "leaf2-key size (%zu) <> min/max key-length (%zu/%zu)\n", + mp, "leaf2-item size (%zu) <> min/max length (%zu/%zu)\n", leaf2_ksize, mc->mc_dbx->md_klen_min, mc->mc_dbx->md_klen_max); else mc->mc_dbx->md_klen_min = mc->mc_dbx->md_klen_max = leaf2_ksize; @@ -20887,7 +20905,7 @@ __cold static int page_check(const MDBX_cursor *const mc, here.iov_base = (void *)key; here.iov_len = leaf2_ksize; if (prev.iov_base && unlikely(mc->mc_dbx->md_cmp(&prev, &here) >= 0)) - rc = bad_page(mp, "leaf2-key #%zu wrong order (%s >= %s)\n", i, + rc = bad_page(mp, "leaf2-item #%zu wrong order (%s >= %s)\n", i, DKEY(&prev), DVAL(&here)); prev = here; } @@ -21299,6 +21317,8 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey, DKBUF; MDBX_page *const mp = mc->mc_pg[mc->mc_top]; + cASSERT(mc, (mp->mp_flags & P_ILL_BITS) == 0); + const size_t newindx = mc->mc_ki[mc->mc_top]; size_t nkeys = page_numkeys(mp); if (AUDIT_ENABLED()) { @@ -21414,6 +21434,15 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey, if (page_room(mn.mc_pg[ptop]) < branch_size(env, &sepkey)) split_indx = minkeys; } + if (foliage) { + TRACE("pure-left: foliage %u, top %i, ptop %zu, split_indx %zi, " + "minkeys %zi, sepkey %s, parent-room %zu, need4split %zu", + foliage, mc->mc_top, ptop, split_indx, minkeys, + DKEY_DEBUG(&sepkey), page_room(mc->mc_pg[ptop]), + branch_size(env, &sepkey)); + TRACE("pure-left: newkey %s, newdata %s, newindx %zu", + DKEY_DEBUG(newkey), DVAL_DEBUG(newdata), newindx); + } } } @@ -21459,6 +21488,7 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey, mp->mp_lower += sizeof(indx_t); cASSERT(mc, mp->mp_upper >= ksize - sizeof(indx_t)); mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t)); + cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0); } else { memcpy(sister->mp_ptrs, split, distance * ksize); void *const ins = page_leaf2key(sister, distance, ksize); @@ -21471,6 +21501,8 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey, sister->mp_upper -= (indx_t)(ksize - sizeof(indx_t)); cASSERT(mc, distance <= (int)UINT16_MAX); mc->mc_ki[mc->mc_top] = (indx_t)distance; + cASSERT(mc, + (((ksize & page_numkeys(sister)) ^ sister->mp_upper) & 1) == 0); } if (AUDIT_ENABLED()) {