mdbx: more for robustness free/reuse of cursors.

Change-Id: I6c4056bc47aea28d39701713f97694a4ebe9b582
This commit is contained in:
Leo Yuriev 2017-02-08 19:43:22 +03:00
parent eb4eda6368
commit 6f600845f3
2 changed files with 46 additions and 24 deletions

68
mdb.c
View File

@ -937,8 +937,9 @@ struct MDB_xcursor;
* (A node with #F_DUPDATA but no #F_SUBDATA contains a subpage).
*/
struct MDB_cursor {
#define MDBX_MC_SIGNATURE_LIVE (0xFE05D5B1^MDBX_MODE_SALT)
#define MDBX_MC_SIGNATURE_CLOSED (0x90E297A7^MDBX_MODE_SALT)
#define MDBX_MC_SIGNATURE (0xFE05D5B1^MDBX_MODE_SALT)
#define MDBX_MC_READY4CLOSE (0x2817A047^MDBX_MODE_SALT)
#define MDBX_MC_WAIT4EOT (0x90E297A7^MDBX_MODE_SALT)
unsigned mc_signature;
/** Next cursor on this DB in this txn */
MDB_cursor *mc_next;
@ -2691,7 +2692,7 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
* @return 0 on success, non-zero on failure.
*/
static void
mdb_cursors_close(MDB_txn *txn, unsigned merge)
mdb_cursors_eot(MDB_txn *txn, unsigned merge)
{
MDB_cursor **cursors = txn->mt_cursors, *mc, *next, *bk;
MDB_xcursor *mx;
@ -2699,8 +2700,8 @@ mdb_cursors_close(MDB_txn *txn, unsigned merge)
for (i = txn->mt_numdbs; --i >= 0; ) {
for (mc = cursors[i]; mc; mc = next) {
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE_LIVE
|| mc->mc_signature == MDBX_MC_SIGNATURE_CLOSED);
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE
|| mc->mc_signature == MDBX_MC_WAIT4EOT);
next = mc->mc_next;
if ((bk = mc->mc_backup) != NULL) {
if (merge) {
@ -2713,14 +2714,26 @@ mdb_cursors_close(MDB_txn *txn, unsigned merge)
if ((mx = mc->mc_xcursor) != NULL)
mx->mx_cursor.mc_txn = bk->mc_txn;
} else {
/* Abort nested txn */
/* Abort nested txn, but save current cursor's stage */
unsigned stage = mc->mc_signature;
*mc = *bk;
mc->mc_signature = stage;
if ((mx = mc->mc_xcursor) != NULL)
*mx = *(MDB_xcursor *)(bk+1);
}
#if MDBX_MODE_ENABLED
bk->mc_signature = 0;
free(bk);
}
if (mc->mc_signature == MDBX_MC_WAIT4EOT) {
mc->mc_signature = 0;
free(mc);
} else {
mc->mc_signature = MDBX_MC_READY4CLOSE;
}
#else
mc = bk;
}
#if ! MDBX_MODE_ENABLED
/* Only malloced cursors are permanently tracked. */
mc->mc_signature = 0;
free(mc);
@ -3171,7 +3184,7 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)
pgno_t *pghead = env->me_pghead;
if (!(mode & MDB_END_UPDATE)) /* !(already closed cursors) */
mdb_cursors_close(txn, 0);
mdb_cursors_eot(txn, 0);
if (!(env->me_flags & MDB_WRITEMAP)) {
mdb_dlist_free(txn);
}
@ -3769,7 +3782,7 @@ mdb_txn_commit(MDB_txn *txn)
parent->mt_flags = txn->mt_flags;
/* Merge our cursors into parent's and close them */
mdb_cursors_close(txn, 1);
mdb_cursors_eot(txn, 1);
/* Update parent's DB table. */
memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
@ -3888,7 +3901,7 @@ mdb_txn_commit(MDB_txn *txn)
goto fail;
}
mdb_cursors_close(txn, 0);
mdb_cursors_eot(txn, 0);
if (!txn->mt_u.dirty_list[0].mid &&
!(txn->mt_flags & (MDB_TXN_DIRTY|MDB_TXN_SPILLS)))
@ -6473,7 +6486,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (unlikely(mc == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED))
@ -6707,7 +6720,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (unlikely(mc == NULL || key == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
env = mc->mc_txn->mt_env;
@ -7259,7 +7272,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned flags)
if (unlikely(!mc))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
@ -7800,7 +7813,7 @@ mdb_xcursor_init2(MDB_cursor *mc, MDB_xcursor *src_mx, int new_dupdata)
static void
mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{
mc->mc_signature = MDBX_MC_SIGNATURE_LIVE;
mc->mc_signature = MDBX_MC_SIGNATURE;
mc->mc_next = NULL;
mc->mc_backup = NULL;
mc->mc_dbi = dbi;
@ -7815,7 +7828,7 @@ mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
mc->mc_ki[0] = 0;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
mdb_tassert(txn, mx != NULL);
mx->mx_cursor.mc_signature = MDBX_MC_SIGNATURE_LIVE;
mx->mx_cursor.mc_signature = MDBX_MC_SIGNATURE;
mc->mc_xcursor = mx;
mdb_xcursor_init0(mc);
} else {
@ -7872,20 +7885,26 @@ mdb_cursor_renew(MDB_txn *txn, MDB_cursor *mc)
if (unlikely(!mc || !txn))
return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE
|| mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE
&& mc->mc_signature != MDBX_MC_READY4CLOSE))
return EINVAL;
if (unlikely(!TXN_DBI_EXIST(txn, mc->mc_dbi, DB_VALID)))
return EINVAL;
if (unlikely(mc->mc_backup))
return EINVAL;
if (unlikely((mc->mc_flags & C_UNTRACK) || txn->mt_cursors)) {
#if MDBX_MODE_ENABLED
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
while (*prev && *prev != mc) prev = &(*prev)->mc_next;
if (*prev == mc)
*prev = mc->mc_next;
mc->mc_signature = 0;
mc->mc_signature = MDBX_MC_READY4CLOSE;
#else
return EINVAL;
#endif
@ -7905,7 +7924,7 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp)
if (unlikely(mc == NULL || countp == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED))
@ -7961,7 +7980,8 @@ void
mdb_cursor_close(MDB_cursor *mc)
{
if (mc) {
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE_LIVE);
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE
|| mc->mc_signature == MDBX_MC_READY4CLOSE);
if (!mc->mc_backup) {
/* Remove from txn, if tracked.
* A read-only txn (!C_UNTRACK) may have been freed already,
@ -7975,7 +7995,9 @@ mdb_cursor_close(MDB_cursor *mc)
mc->mc_signature = 0;
free(mc);
} else {
mc->mc_signature = MDBX_MC_SIGNATURE_CLOSED;
/* cursor closed before nested txn ends */
mdb_cassert(mc, mc->mc_signature == MDBX_MC_SIGNATURE);
mc->mc_signature = MDBX_MC_WAIT4EOT;
}
}
}
@ -7983,7 +8005,7 @@ mdb_cursor_close(MDB_cursor *mc)
MDB_txn *
mdb_cursor_txn(MDB_cursor *mc)
{
if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE))
return NULL;
return mc->mc_txn;
}
@ -7991,7 +8013,7 @@ mdb_cursor_txn(MDB_cursor *mc)
MDB_dbi
mdb_cursor_dbi(MDB_cursor *mc)
{
if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE))
return INT_MIN;
return mc->mc_dbi;
}

2
mdbx.c
View File

@ -359,7 +359,7 @@ int mdbx_cursor_eof(MDB_cursor *mc)
if (unlikely(mc == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
if ((mc->mc_flags & C_INITIALIZED) == 0)