mdbx: запрещение unbind/close курсоров для вложенных транзакций.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev)
2025-03-06 02:48:55 +03:00
parent 4fcfb07b97
commit b0665f7016
2 changed files with 53 additions and 46 deletions

View File

@@ -44,8 +44,10 @@ int mdbx_cursor_bind(MDBX_txn *txn, MDBX_cursor *mc, MDBX_dbi dbi) {
if (unlikely(!mc))
return LOG_IFERR(MDBX_EINVAL);
if (unlikely(mc->signature != cur_signature_ready4dispose && mc->signature != cur_signature_live))
return LOG_IFERR(MDBX_EBADSIGN);
if (unlikely(mc->signature != cur_signature_ready4dispose && mc->signature != cur_signature_live)) {
int rc = (mc->signature == cur_signature_wait4eot) ? MDBX_EINVAL : MDBX_EBADSIGN;
return LOG_IFERR(rc);
}
int rc = check_txn(txn, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
@@ -58,24 +60,12 @@ int mdbx_cursor_bind(MDBX_txn *txn, MDBX_cursor *mc, MDBX_dbi dbi) {
if (unlikely(dbi == FREE_DBI && !(txn->flags & MDBX_TXN_RDONLY)))
return LOG_IFERR(MDBX_EACCESS);
if (unlikely(mc->backup)) /* Cursor from parent transaction */ {
cASSERT(mc, mc->signature == cur_signature_live);
if (unlikely(cursor_dbi(mc) != dbi ||
/* paranoia */ mc->signature != cur_signature_live || mc->txn != txn))
return LOG_IFERR(MDBX_EINVAL);
cASSERT(mc, mc->tree == &txn->dbs[dbi]);
cASSERT(mc, mc->clc == &txn->env->kvs[dbi].clc);
cASSERT(mc, cursor_dbi(mc) == dbi);
return likely(cursor_dbi(mc) == dbi &&
/* paranoia */ mc->signature == cur_signature_live && mc->txn == txn)
? MDBX_SUCCESS
: LOG_IFERR(MDBX_EINVAL) /* Disallow change DBI in nested
transactions */
;
}
if (unlikely(mc->backup)) /* Cursor from parent transaction */
LOG_IFERR(MDBX_EINVAL);
if (mc->signature == cur_signature_live) {
if (mc->txn == txn && cursor_dbi(mc) == dbi)
return MDBX_SUCCESS;
rc = mdbx_cursor_unbind(mc);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
@@ -100,25 +90,22 @@ int mdbx_cursor_unbind(MDBX_cursor *mc) {
return (mc->signature == cur_signature_ready4dispose) ? MDBX_SUCCESS : LOG_IFERR(MDBX_EBADSIGN);
if (unlikely(mc->backup)) /* Cursor from parent transaction */
/* TODO: реализовать при переходе на двусвязный список курсоров */
return LOG_IFERR(MDBX_EINVAL);
eASSERT(nullptr, mc->txn && mc->txn->signature == txn_signature);
cASSERT(mc, mc->signature == cur_signature_live);
cASSERT(mc, !mc->backup);
if (unlikely(!mc->txn || mc->txn->signature != txn_signature)) {
ERROR("Wrong cursor's transaction %p 0x%x", __Wpedantic_format_voidptr(mc->txn), mc->txn ? mc->txn->signature : 0);
return LOG_IFERR(MDBX_PROBLEM);
}
if (mc->next != mc) {
const size_t dbi = (kvx_t *)mc->clc - mc->txn->env->kvs;
cASSERT(mc, cursor_dbi(mc) == dbi);
const size_t dbi = cursor_dbi(mc);
cASSERT(mc, dbi < mc->txn->n_dbi);
cASSERT(mc, &mc->txn->env->kvs[dbi].clc == mc->clc);
if (dbi < mc->txn->n_dbi) {
MDBX_cursor **prev = &mc->txn->cursors[dbi];
while (*prev) {
while (/* *prev && */ *prev != mc) {
ENSURE(mc->txn->env, (*prev)->signature == cur_signature_live || (*prev)->signature == cur_signature_wait4eot);
if (*prev == mc)
break;
prev = &(*prev)->next;
}
cASSERT(mc, *prev == mc);
@@ -219,30 +206,34 @@ again:
int mdbx_txn_release_all_cursors_ex(const MDBX_txn *txn, bool unbind, size_t *count) {
int rc = check_txn(txn, MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely(txn->parent)) {
rc = MDBX_BAD_TXN;
ERROR("%s, err %d", "must not unbind or close cursors for a nested txn", rc);
return rc;
}
size_t n = 0;
if (likely(rc == MDBX_SUCCESS)) {
TXN_FOREACH_DBI_FROM(txn, i, MAIN_DBI) {
while (txn->cursors[i]) {
++n;
MDBX_cursor *mc = txn->cursors[i];
ENSURE(nullptr, mc->signature == cur_signature_live && (mc->next != mc) && !mc->backup);
txn->cursors[i] = mc->next;
mc->next = mc;
if (unbind) {
be_poor(mc);
mc->signature = cur_signature_ready4dispose;
} else {
mc->signature = 0;
osal_free(mc);
}
TXN_FOREACH_DBI_FROM(txn, i, MAIN_DBI) {
while (txn->cursors[i]) {
++n;
MDBX_cursor *mc = txn->cursors[i];
ENSURE(nullptr, mc->signature == cur_signature_live && (mc->next != mc) && !mc->backup);
txn->cursors[i] = mc->next;
mc->next = mc;
if (unbind) {
be_poor(mc);
mc->signature = cur_signature_ready4dispose;
} else {
mc->signature = 0;
osal_free(mc);
}
}
} else {
LOG_IFERR(rc);
}
if (count)
*count = n;
return rc;
return MDBX_SUCCESS;
}
int mdbx_cursor_compare(const MDBX_cursor *l, const MDBX_cursor *r, bool ignore_multival) {