mdbx: allows cursors to be free/reuse explicitly, regardless of transaction wr/ro type.

This commit is contained in:
Leo Yuriev 2017-01-24 20:17:20 +03:00
parent 8b045ab626
commit 6aa60c61c5
3 changed files with 38 additions and 15 deletions

8
lmdb.h
View File

@ -1414,6 +1414,13 @@ int mdb_del(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data);
* A cursor cannot be used when its database handle is closed. Nor * A cursor cannot be used when its database handle is closed. Nor
* when its transaction has ended, except with #mdb_cursor_renew(). * when its transaction has ended, except with #mdb_cursor_renew().
* It can be discarded with #mdb_cursor_close(). * It can be discarded with #mdb_cursor_close().
*
* MDBX-mode:
* A cursor must be closed explicitly always, before
* or after its transaction ends. It can be reused with
* #mdb_cursor_renew() before finally closing it.
*
* LMDB-compatible mode:
* A cursor in a write-transaction can be closed before its transaction * A cursor in a write-transaction can be closed before its transaction
* ends, and will otherwise be closed when its transaction ends. * ends, and will otherwise be closed when its transaction ends.
* A cursor in a read-only transaction must be closed explicitly, before * A cursor in a read-only transaction must be closed explicitly, before
@ -1421,6 +1428,7 @@ int mdb_del(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data);
* #mdb_cursor_renew() before finally closing it. * #mdb_cursor_renew() before finally closing it.
* @note Earlier documentation said that cursors in every transaction * @note Earlier documentation said that cursors in every transaction
* were closed when the transaction committed or aborted. * were closed when the transaction committed or aborted.
*
* @param[in] txn A transaction handle returned by #mdb_txn_begin() * @param[in] txn A transaction handle returned by #mdb_txn_begin()
* @param[in] dbi A database handle returned by #mdb_dbi_open() * @param[in] dbi A database handle returned by #mdb_dbi_open()
* @param[out] cursor Address where the new #MDB_cursor handle will be stored * @param[out] cursor Address where the new #MDB_cursor handle will be stored

43
mdb.c
View File

@ -933,7 +933,8 @@ struct MDB_xcursor;
* (A node with #F_DUPDATA but no #F_SUBDATA contains a subpage). * (A node with #F_DUPDATA but no #F_SUBDATA contains a subpage).
*/ */
struct MDB_cursor { struct MDB_cursor {
#define MDBX_MC_SIGNATURE (0xFE05D5B1^MDBX_MODE_SALT) #define MDBX_MC_SIGNATURE_LIVE (0xFE05D5B1^MDBX_MODE_SALT)
#define MDBX_MC_SIGNATURE_CLOSED (0x90E297A7^MDBX_MODE_SALT)
unsigned mc_signature; unsigned mc_signature;
/** Next cursor on this DB in this txn */ /** Next cursor on this DB in this txn */
MDB_cursor *mc_next; MDB_cursor *mc_next;
@ -2695,6 +2696,8 @@ mdb_cursors_close(MDB_txn *txn, unsigned merge)
for (i = txn->mt_numdbs; --i >= 0; ) { for (i = txn->mt_numdbs; --i >= 0; ) {
for (mc = cursors[i]; mc; mc = next) { for (mc = cursors[i]; mc; mc = next) {
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE_LIVE
|| mc->mc_signature == MDBX_MC_SIGNATURE_CLOSED);
next = mc->mc_next; next = mc->mc_next;
if ((bk = mc->mc_backup) != NULL) { if ((bk = mc->mc_backup) != NULL) {
if (merge) { if (merge) {
@ -2714,9 +2717,11 @@ mdb_cursors_close(MDB_txn *txn, unsigned merge)
} }
mc = bk; mc = bk;
} }
#if ! MDBX_MODE_ENABLED
/* Only malloced cursors are permanently tracked. */ /* Only malloced cursors are permanently tracked. */
mc->mc_signature = 0; mc->mc_signature = 0;
free(mc); free(mc);
#endif
} }
cursors[i] = NULL; cursors[i] = NULL;
} }
@ -6465,7 +6470,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (unlikely(mc == NULL)) if (unlikely(mc == NULL))
return EINVAL; return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED)) if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED))
@ -6699,7 +6704,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (unlikely(mc == NULL || key == NULL)) if (unlikely(mc == NULL || key == NULL))
return EINVAL; return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
env = mc->mc_txn->mt_env; env = mc->mc_txn->mt_env;
@ -7234,7 +7239,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned flags)
if (unlikely(!mc)) if (unlikely(!mc))
return EINVAL; return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED))) if (unlikely(mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
@ -7777,7 +7782,7 @@ mdb_xcursor_init2(MDB_cursor *mc, MDB_xcursor *src_mx, int new_dupdata)
static void static void
mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx) mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{ {
mc->mc_signature = MDBX_MC_SIGNATURE; mc->mc_signature = MDBX_MC_SIGNATURE_LIVE;
mc->mc_next = NULL; mc->mc_next = NULL;
mc->mc_backup = NULL; mc->mc_backup = NULL;
mc->mc_dbi = dbi; mc->mc_dbi = dbi;
@ -7792,7 +7797,7 @@ mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
mc->mc_ki[0] = 0; mc->mc_ki[0] = 0;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) { if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
mdb_tassert(txn, mx != NULL); mdb_tassert(txn, mx != NULL);
mx->mx_cursor.mc_signature = MDBX_MC_SIGNATURE; mx->mx_cursor.mc_signature = MDBX_MC_SIGNATURE_LIVE;
mc->mc_xcursor = mx; mc->mc_xcursor = mx;
mdb_xcursor_init0(mc); mdb_xcursor_init0(mc);
} else { } else {
@ -7850,14 +7855,23 @@ mdb_cursor_renew(MDB_txn *txn, MDB_cursor *mc)
return EINVAL; return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE
|| mc->mc_signature != MDBX_MC_SIGNATURE)) || mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
if (unlikely(!TXN_DBI_EXIST(txn, mc->mc_dbi, DB_VALID))) if (unlikely(!TXN_DBI_EXIST(txn, mc->mc_dbi, DB_VALID)))
return EINVAL; return EINVAL;
if (unlikely((mc->mc_flags & C_UNTRACK) || txn->mt_cursors)) if (unlikely((mc->mc_flags & C_UNTRACK) || txn->mt_cursors)) {
#if MDBX_MODE_ENABLED
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
while (*prev && *prev != mc) prev = &(*prev)->mc_next;
if (*prev == mc)
*prev = mc->mc_next;
mc->mc_signature = 0;
#else
return EINVAL; return EINVAL;
#endif
}
if (unlikely(txn->mt_flags & MDB_TXN_BLOCKED)) if (unlikely(txn->mt_flags & MDB_TXN_BLOCKED))
return MDB_BAD_TXN; return MDB_BAD_TXN;
@ -7873,7 +7887,7 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp)
if (unlikely(mc == NULL || countp == NULL)) if (unlikely(mc == NULL || countp == NULL))
return EINVAL; return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED)) if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED))
@ -7932,12 +7946,11 @@ void
mdb_cursor_close(MDB_cursor *mc) mdb_cursor_close(MDB_cursor *mc)
{ {
if (mc) { if (mc) {
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE); mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE_LIVE);
if (!mc->mc_backup) { if (!mc->mc_backup) {
/* Remove from txn, if tracked. /* Remove from txn, if tracked.
* A read-only txn (!C_UNTRACK) may have been freed already, * A read-only txn (!C_UNTRACK) may have been freed already,
* so do not peek inside it. Only write txns track cursors. * so do not peek inside it. Only write txns track cursors. */
*/
if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) { if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi]; MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
while (*prev && *prev != mc) prev = &(*prev)->mc_next; while (*prev && *prev != mc) prev = &(*prev)->mc_next;
@ -7946,6 +7959,8 @@ mdb_cursor_close(MDB_cursor *mc)
} }
mc->mc_signature = 0; mc->mc_signature = 0;
free(mc); free(mc);
} else {
mc->mc_signature = MDBX_MC_SIGNATURE_CLOSED;
} }
} }
} }
@ -7953,7 +7968,7 @@ mdb_cursor_close(MDB_cursor *mc)
MDB_txn * MDB_txn *
mdb_cursor_txn(MDB_cursor *mc) mdb_cursor_txn(MDB_cursor *mc)
{ {
if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return NULL; return NULL;
return mc->mc_txn; return mc->mc_txn;
} }
@ -7961,7 +7976,7 @@ mdb_cursor_txn(MDB_cursor *mc)
MDB_dbi MDB_dbi
mdb_cursor_dbi(MDB_cursor *mc) mdb_cursor_dbi(MDB_cursor *mc)
{ {
if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(!mc || mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return INT_MIN; return INT_MIN;
return mc->mc_dbi; return mc->mc_dbi;
} }

2
mdbx.c
View File

@ -363,7 +363,7 @@ int mdbx_cursor_eof(MDB_cursor *mc)
if (unlikely(mc == NULL)) if (unlikely(mc == NULL))
return EINVAL; return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE_LIVE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
if ((mc->mc_flags & C_INITIALIZED) == 0) if ((mc->mc_flags & C_INITIALIZED) == 0)