mdbx: исправление ложной ошибки MDBX_CORRUPTED (-30796) в сценарии "odd dupfixed".

Повреждение БД и/или потери данных не происходило, проблема лишь в
возврате ложной ошибки.

Благодарю пользователя/разработчика @Dvirsw (https://t.me/Dvirsw) за
сообщения о проблеме и предоставление минимального/оптимального сценария
воспроизведения.

--

Проблема была из-за излишнего условия при контроле внутренего поля
mp_upper в ходе проверки структуры страниц БД.

Поле mp_upper указывает на нижнуюю границу заполнения страницы от конца
к началу. Вследствие того, что значения ключей выравниваетня на четную
границу, это поле четно во всех случаях за исключением LEAF2-страницы
(листовая страница вложенного дерева для множественных значений
финсированной/одинаковой длины одного ключа), на которой размещено
нечетное количество значений нечетной длины.

Ошибка не проявлялась в большинстве случаев (в том числе в
стохастических тестах), так как штатно лишняя проверка производилась
только при чтении страницы и перебалансировке ключей, но не при каждом
добавлении значения. Тем не менее, сценарии тестов требуют
доработки/расширения для явного добавления нечетных dupfixed-сценариев.
This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2024-02-19 01:20:27 +03:00
parent f16c4303bf
commit fbc83dd069

View File

@ -8416,7 +8416,7 @@ __hot static int page_touch(MDBX_cursor *mc) {
np->mp_txnid = txn->mt_front;
return MDBX_SUCCESS;
}
tASSERT(txn, !IS_OVERFLOW(mp));
tASSERT(txn, !IS_OVERFLOW(mp) && !IS_SUBP(mp));
if (IS_FROZEN(txn, mp)) {
/* CoW the page */
@ -16102,8 +16102,12 @@ __hot static __always_inline int page_get_checker_lite(const uint16_t ILL,
if (((ILL & P_OVERFLOW) || !IS_OVERFLOW(page)) &&
(ILL & (P_BRANCH | P_LEAF | P_LEAF2)) == 0) {
if (unlikely(page->mp_upper < page->mp_lower ||
((page->mp_lower | page->mp_upper) & 1) ||
/* Контроль четности page->mp_upper тут либо приводит к ложным ошибкам,
* либо слишком дорог по количеству операций. Заковырка в том, что mp_upper
* может быть нечетным на LEAF2-страницах, при нечетном количестве элементов
* нечетной длины. Поэтому четность page->mp_upper здесь не проверяется, но
* соответствующие полные проверки есть в page_check(). */
if (unlikely(page->mp_upper < page->mp_lower || (page->mp_lower & 1) ||
PAGEHDRSZ + page->mp_upper > txn->mt_env->me_psize))
return bad_page(page,
"invalid page' lower(%u)/upper(%u) with limit %zu\n",
@ -18082,9 +18086,9 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
mc->mc_xcursor->mx_dbx.md_klen_min =
mc->mc_xcursor->mx_dbx.md_klen_max =
data->iov_len);
if (mc->mc_flags & C_SUB)
npr.page->mp_flags |= P_LEAF2;
}
if ((mc->mc_db->md_flags & (MDBX_DUPSORT | MDBX_DUPFIXED)) == MDBX_DUPFIXED)
npr.page->mp_flags |= P_LEAF2;
mc->mc_flags |= C_INITIALIZED;
}
@ -18361,7 +18365,11 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
if (unlikely(fp_flags & P_LEAF2)) {
memcpy(page_data(mp), page_data(fp),
page_numkeys(fp) * fp->mp_leaf2_ksize);
cASSERT(mc,
(((mp->mp_leaf2_ksize & page_numkeys(mp)) ^ mp->mp_upper) &
1) == 0);
} else {
cASSERT(mc, (mp->mp_upper & 1) == 0);
memcpy(ptr_disp(mp, mp->mp_upper + PAGEHDRSZ),
ptr_disp(fp, fp->mp_upper + PAGEHDRSZ),
olddata.iov_len - fp->mp_upper - PAGEHDRSZ);
@ -18979,6 +18987,7 @@ __hot static int __must_check_result node_add_leaf2(MDBX_cursor *mc,
const size_t ksize = mc->mc_db->md_xsize;
cASSERT(mc, ksize == key->iov_len);
const size_t nkeys = page_numkeys(mp);
cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0);
/* Just using these for counting */
const intptr_t lower = mp->mp_lower + sizeof(indx_t);
@ -18998,6 +19007,8 @@ __hot static int __must_check_result node_add_leaf2(MDBX_cursor *mc,
memmove(ptr_disp(ptr, ksize), ptr, diff * ksize);
/* insert new key */
memcpy(ptr, key->iov_base, ksize);
cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0);
return MDBX_SUCCESS;
}
@ -19164,6 +19175,7 @@ __hot static void node_del(MDBX_cursor *mc, size_t ksize) {
mp->mp_lower -= sizeof(indx_t);
cASSERT(mc, (size_t)UINT16_MAX - mp->mp_upper >= ksize - sizeof(indx_t));
mp->mp_upper += (indx_t)(ksize - sizeof(indx_t));
cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0);
return;
}
@ -20830,8 +20842,7 @@ __cold static int page_check(const MDBX_cursor *const mc,
break;
}
if (unlikely(mp->mp_upper < mp->mp_lower ||
((mp->mp_lower | mp->mp_upper) & 1) ||
if (unlikely(mp->mp_upper < mp->mp_lower || (mp->mp_lower & 1) ||
PAGEHDRSZ + mp->mp_upper > env->me_psize))
rc = bad_page(mp, "invalid page lower(%u)/upper(%u) with limit %zu\n",
mp->mp_lower, mp->mp_upper, page_space(env));
@ -20847,11 +20858,6 @@ __cold static int page_check(const MDBX_cursor *const mc,
bad_page(mp, "%s-page nkeys (%zu) < %u\n",
IS_BRANCH(mp) ? "branch" : "leaf", nkeys, 1 + IS_BRANCH(mp));
}
if (!IS_LEAF2(mp) && unlikely(PAGEHDRSZ + mp->mp_upper +
nkeys * sizeof(MDBX_node) + nkeys - 1 >
env->me_psize))
rc = bad_page(mp, "invalid page upper (%u) for nkeys %zu with limit %zu\n",
mp->mp_upper, nkeys, page_space(env));
const size_t ksize_max = keysize_max(env->me_psize, 0);
const size_t leaf2_ksize = mp->mp_leaf2_ksize;
@ -20860,8 +20866,20 @@ __cold static int page_check(const MDBX_cursor *const mc,
(mc->mc_db->md_flags & MDBX_DUPFIXED) == 0))
rc = bad_page(mp, "unexpected leaf2-page (db-flags 0x%x)\n",
mc->mc_db->md_flags);
if (unlikely(leaf2_ksize < 1 || leaf2_ksize > ksize_max))
rc = bad_page(mp, "invalid leaf2-key length (%zu)\n", leaf2_ksize);
else if (unlikely(leaf2_ksize != mc->mc_db->md_xsize))
rc = bad_page(mp, "invalid leaf2_ksize %zu\n", leaf2_ksize);
else if (unlikely(((leaf2_ksize & nkeys) ^ mp->mp_upper) & 1))
rc = bad_page(
mp, "invalid page upper (%u) for nkeys %zu with leaf2-length %zu\n",
mp->mp_upper, nkeys, leaf2_ksize);
} else {
if (unlikely((mp->mp_upper & 1) || PAGEHDRSZ + mp->mp_upper +
nkeys * sizeof(MDBX_node) +
nkeys - 1 >
env->me_psize))
rc =
bad_page(mp, "invalid page upper (%u) for nkeys %zu with limit %zu\n",
mp->mp_upper, nkeys, page_space(env));
}
MDBX_val here, prev = {0, 0};
@ -20869,7 +20887,7 @@ __cold static int page_check(const MDBX_cursor *const mc,
if (IS_LEAF2(mp)) {
const char *const key = page_leaf2key(mp, i, leaf2_ksize);
if (unlikely(end_of_page < key + leaf2_ksize)) {
rc = bad_page(mp, "leaf2-key beyond (%zu) page-end\n",
rc = bad_page(mp, "leaf2-item beyond (%zu) page-end\n",
key + leaf2_ksize - end_of_page);
continue;
}
@ -20878,7 +20896,7 @@ __cold static int page_check(const MDBX_cursor *const mc,
if (unlikely(leaf2_ksize < mc->mc_dbx->md_klen_min ||
leaf2_ksize > mc->mc_dbx->md_klen_max))
rc = bad_page(
mp, "leaf2-key size (%zu) <> min/max key-length (%zu/%zu)\n",
mp, "leaf2-item size (%zu) <> min/max length (%zu/%zu)\n",
leaf2_ksize, mc->mc_dbx->md_klen_min, mc->mc_dbx->md_klen_max);
else
mc->mc_dbx->md_klen_min = mc->mc_dbx->md_klen_max = leaf2_ksize;
@ -20887,7 +20905,7 @@ __cold static int page_check(const MDBX_cursor *const mc,
here.iov_base = (void *)key;
here.iov_len = leaf2_ksize;
if (prev.iov_base && unlikely(mc->mc_dbx->md_cmp(&prev, &here) >= 0))
rc = bad_page(mp, "leaf2-key #%zu wrong order (%s >= %s)\n", i,
rc = bad_page(mp, "leaf2-item #%zu wrong order (%s >= %s)\n", i,
DKEY(&prev), DVAL(&here));
prev = here;
}
@ -21299,6 +21317,8 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
DKBUF;
MDBX_page *const mp = mc->mc_pg[mc->mc_top];
cASSERT(mc, (mp->mp_flags & P_ILL_BITS) == 0);
const size_t newindx = mc->mc_ki[mc->mc_top];
size_t nkeys = page_numkeys(mp);
if (AUDIT_ENABLED()) {
@ -21414,6 +21434,15 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
if (page_room(mn.mc_pg[ptop]) < branch_size(env, &sepkey))
split_indx = minkeys;
}
if (foliage) {
TRACE("pure-left: foliage %u, top %i, ptop %zu, split_indx %zi, "
"minkeys %zi, sepkey %s, parent-room %zu, need4split %zu",
foliage, mc->mc_top, ptop, split_indx, minkeys,
DKEY_DEBUG(&sepkey), page_room(mc->mc_pg[ptop]),
branch_size(env, &sepkey));
TRACE("pure-left: newkey %s, newdata %s, newindx %zu",
DKEY_DEBUG(newkey), DVAL_DEBUG(newdata), newindx);
}
}
}
@ -21459,6 +21488,7 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
mp->mp_lower += sizeof(indx_t);
cASSERT(mc, mp->mp_upper >= ksize - sizeof(indx_t));
mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t));
cASSERT(mc, (((ksize & page_numkeys(mp)) ^ mp->mp_upper) & 1) == 0);
} else {
memcpy(sister->mp_ptrs, split, distance * ksize);
void *const ins = page_leaf2key(sister, distance, ksize);
@ -21471,6 +21501,8 @@ static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
sister->mp_upper -= (indx_t)(ksize - sizeof(indx_t));
cASSERT(mc, distance <= (int)UINT16_MAX);
mc->mc_ki[mc->mc_top] = (indx_t)distance;
cASSERT(mc,
(((ksize & page_numkeys(sister)) ^ sister->mp_upper) & 1) == 0);
}
if (AUDIT_ENABLED()) {