mirror of
https://github.com/isar/libmdbx.git
synced 2025-01-08 05:24:12 +08:00
mdbx: rework MDBX_reader.
This commit is contained in:
parent
a3ed42b999
commit
34213c554a
54
src/bits.h
54
src/bits.h
@ -189,40 +189,35 @@ typedef uint16_t indx_t;
|
|||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
|
||||||
/* The information we store in a single slot of the reader table.
|
/* The actual reader record, with cacheline padding. */
|
||||||
* In addition to a transaction ID, we also record the process and
|
typedef struct MDBX_reader {
|
||||||
* thread ID that owns a slot, so that we can detect stale information,
|
|
||||||
* e.g. threads or processes that went away without cleaning up.
|
|
||||||
* NOTE: We currently don't check for stale records. We simply re-init
|
|
||||||
* the table when we know that we're the only process opening the
|
|
||||||
* lock file. */
|
|
||||||
typedef struct MDB_rxbody {
|
|
||||||
/* Current Transaction ID when this transaction began, or (txnid_t)-1.
|
/* Current Transaction ID when this transaction began, or (txnid_t)-1.
|
||||||
* Multiple readers that start at the same time will probably have the
|
* Multiple readers that start at the same time will probably have the
|
||||||
* same ID here. Again, it's not important to exclude them from
|
* same ID here. Again, it's not important to exclude them from
|
||||||
* anything; all we need to know is which version of the DB they
|
* anything; all we need to know is which version of the DB they
|
||||||
* started from so we can avoid overwriting any data used in that
|
* started from so we can avoid overwriting any data used in that
|
||||||
* particular version. */
|
* particular version. */
|
||||||
volatile txnid_t mrb_txnid;
|
volatile txnid_t mr_txnid;
|
||||||
/* The process ID of the process owning this reader txn. */
|
|
||||||
volatile mdbx_pid_t mrb_pid;
|
/* The information we store in a single slot of the reader table.
|
||||||
/* The thread ID of the thread owning this txn. */
|
* In addition to a transaction ID, we also record the process and
|
||||||
volatile mdbx_tid_t mrb_tid;
|
* thread ID that owns a slot, so that we can detect stale information,
|
||||||
} MDB_rxbody;
|
* e.g. threads or processes that went away without cleaning up.
|
||||||
|
*
|
||||||
|
* NOTE: We currently don't check for stale records.
|
||||||
|
* We simply re-init the table when we know that we're the only process
|
||||||
|
* opening the lock file. */
|
||||||
|
|
||||||
|
/* The process ID of the process owning this reader txn. */
|
||||||
|
volatile mdbx_pid_t mr_pid;
|
||||||
|
/* The thread ID of the thread owning this txn. */
|
||||||
|
volatile mdbx_tid_t mr_tid;
|
||||||
|
|
||||||
/* The actual reader record, with cacheline padding. */
|
|
||||||
typedef struct MDB_reader {
|
|
||||||
union {
|
|
||||||
MDB_rxbody mrx;
|
|
||||||
/* shorthand for mrb_txnid */
|
|
||||||
#define mr_txnid mru.mrx.mrb_txnid
|
|
||||||
#define mr_pid mru.mrx.mrb_pid
|
|
||||||
#define mr_tid mru.mrx.mrb_tid
|
|
||||||
/* cache line alignment */
|
/* cache line alignment */
|
||||||
char pad[(sizeof(MDB_rxbody) + MDBX_CACHELINE_SIZE - 1) &
|
uint8_t pad[~(MDBX_CACHELINE_SIZE - 1) &
|
||||||
~(MDBX_CACHELINE_SIZE - 1)];
|
(sizeof(txnid_t) + sizeof(mdbx_pid_t) + sizeof(mdbx_tid_t) +
|
||||||
} mru;
|
MDBX_CACHELINE_SIZE - 1)];
|
||||||
} MDB_reader;
|
} MDBX_reader;
|
||||||
|
|
||||||
/* Information about a single database in the environment. */
|
/* Information about a single database in the environment. */
|
||||||
typedef struct MDB_db {
|
typedef struct MDB_db {
|
||||||
@ -345,7 +340,7 @@ typedef struct MDBX_lockinfo {
|
|||||||
/* Mutex protecting access to this table. */
|
/* Mutex protecting access to this table. */
|
||||||
MDBX_OSAL_LOCK mti_rmutex;
|
MDBX_OSAL_LOCK mti_rmutex;
|
||||||
#endif
|
#endif
|
||||||
MDB_reader mti_readers[1];
|
MDBX_reader mti_readers[1];
|
||||||
} MDBX_lockinfo;
|
} MDBX_lockinfo;
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
@ -390,7 +385,7 @@ struct MDB_txn {
|
|||||||
/* For write txns: Modified pages. Sorted when not MDB_WRITEMAP. */
|
/* For write txns: Modified pages. Sorted when not MDB_WRITEMAP. */
|
||||||
MDB_ID2L dirty_list;
|
MDB_ID2L dirty_list;
|
||||||
/* For read txns: This thread/txn's reader table slot, or NULL. */
|
/* For read txns: This thread/txn's reader table slot, or NULL. */
|
||||||
MDB_reader *reader;
|
MDBX_reader *reader;
|
||||||
} mt_u;
|
} mt_u;
|
||||||
/* Array of records for each DB known in the environment. */
|
/* Array of records for each DB known in the environment. */
|
||||||
MDB_dbx *mt_dbxs;
|
MDB_dbx *mt_dbxs;
|
||||||
@ -773,7 +768,8 @@ static __inline MDB_meta *mdbx_meta_head(MDB_env *env) {
|
|||||||
void mdbx_rthc_dtor(void *rthc);
|
void mdbx_rthc_dtor(void *rthc);
|
||||||
void mdbx_rthc_lock(void);
|
void mdbx_rthc_lock(void);
|
||||||
void mdbx_rthc_unlock(void);
|
void mdbx_rthc_unlock(void);
|
||||||
int mdbx_rthc_alloc(mdbx_thread_key_t *key, MDB_reader *begin, MDB_reader *end);
|
int mdbx_rthc_alloc(mdbx_thread_key_t *key, MDBX_reader *begin,
|
||||||
|
MDBX_reader *end);
|
||||||
void mdbx_rthc_remove(mdbx_thread_key_t key);
|
void mdbx_rthc_remove(mdbx_thread_key_t key);
|
||||||
void mdbx_rthc_cleanup(void);
|
void mdbx_rthc_cleanup(void);
|
||||||
|
|
||||||
|
32
src/mdbx.c
32
src/mdbx.c
@ -42,8 +42,8 @@
|
|||||||
/* rthc (tls keys and destructors) */
|
/* rthc (tls keys and destructors) */
|
||||||
|
|
||||||
typedef struct rthc_entry_t {
|
typedef struct rthc_entry_t {
|
||||||
MDB_reader *begin;
|
MDBX_reader *begin;
|
||||||
MDB_reader *end;
|
MDBX_reader *end;
|
||||||
mdbx_thread_key_t key;
|
mdbx_thread_key_t key;
|
||||||
} rthc_entry_t;
|
} rthc_entry_t;
|
||||||
|
|
||||||
@ -59,7 +59,7 @@ static rthc_entry_t rthc_table_static[RTHC_INITIAL_LIMIT];
|
|||||||
static rthc_entry_t *rthc_table = rthc_table_static;
|
static rthc_entry_t *rthc_table = rthc_table_static;
|
||||||
|
|
||||||
__cold void mdbx_rthc_dtor(void *ptr) {
|
__cold void mdbx_rthc_dtor(void *ptr) {
|
||||||
MDB_reader *rthc = (MDB_reader *)ptr;
|
MDBX_reader *rthc = (MDBX_reader *)ptr;
|
||||||
|
|
||||||
mdbx_rthc_lock();
|
mdbx_rthc_lock();
|
||||||
const mdbx_pid_t self_pid = mdbx_getpid();
|
const mdbx_pid_t self_pid = mdbx_getpid();
|
||||||
@ -80,7 +80,7 @@ __cold void mdbx_rthc_cleanup(void) {
|
|||||||
const mdbx_pid_t self_pid = mdbx_getpid();
|
const mdbx_pid_t self_pid = mdbx_getpid();
|
||||||
for (unsigned i = 0; i < rthc_count; ++i) {
|
for (unsigned i = 0; i < rthc_count; ++i) {
|
||||||
mdbx_thread_key_t key = rthc_table[i].key;
|
mdbx_thread_key_t key = rthc_table[i].key;
|
||||||
MDB_reader *rthc = mdbx_thread_rthc_get(key);
|
MDBX_reader *rthc = mdbx_thread_rthc_get(key);
|
||||||
if (rthc) {
|
if (rthc) {
|
||||||
mdbx_thread_rthc_set(key, NULL);
|
mdbx_thread_rthc_set(key, NULL);
|
||||||
if (rthc->mr_pid == self_pid) {
|
if (rthc->mr_pid == self_pid) {
|
||||||
@ -92,8 +92,8 @@ __cold void mdbx_rthc_cleanup(void) {
|
|||||||
mdbx_rthc_unlock();
|
mdbx_rthc_unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
__cold int mdbx_rthc_alloc(mdbx_thread_key_t *key, MDB_reader *begin,
|
__cold int mdbx_rthc_alloc(mdbx_thread_key_t *key, MDBX_reader *begin,
|
||||||
MDB_reader *end) {
|
MDBX_reader *end) {
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
*key = (mdbx_thread_key_t)0xBADBADBAD;
|
*key = (mdbx_thread_key_t)0xBADBADBAD;
|
||||||
#endif /* NDEBUG */
|
#endif /* NDEBUG */
|
||||||
@ -136,7 +136,7 @@ __cold void mdbx_rthc_remove(mdbx_thread_key_t key) {
|
|||||||
for (unsigned i = 0; i < rthc_count; ++i) {
|
for (unsigned i = 0; i < rthc_count; ++i) {
|
||||||
if (key == rthc_table[i].key) {
|
if (key == rthc_table[i].key) {
|
||||||
const mdbx_pid_t self_pid = mdbx_getpid();
|
const mdbx_pid_t self_pid = mdbx_getpid();
|
||||||
for (MDB_reader *rthc = rthc_table[i].begin; rthc < rthc_table[i].end;
|
for (MDBX_reader *rthc = rthc_table[i].begin; rthc < rthc_table[i].end;
|
||||||
++rthc)
|
++rthc)
|
||||||
if (rthc->mr_pid == self_pid)
|
if (rthc->mr_pid == self_pid)
|
||||||
rthc->mr_pid = 0;
|
rthc->mr_pid = 0;
|
||||||
@ -1396,7 +1396,7 @@ static txnid_t mdbx_find_oldest(MDB_env *env, int *laggard) {
|
|||||||
txnid_t oldest = mdbx_meta_lt(a, b) ? b->mm_txnid : a->mm_txnid;
|
txnid_t oldest = mdbx_meta_lt(a, b) ? b->mm_txnid : a->mm_txnid;
|
||||||
|
|
||||||
int i, reader;
|
int i, reader;
|
||||||
const MDB_reader *const r = env->me_lck->mti_readers;
|
const MDBX_reader *const r = env->me_lck->mti_readers;
|
||||||
for (reader = -1, i = env->me_lck->mti_numreaders; --i >= 0;) {
|
for (reader = -1, i = env->me_lck->mti_numreaders; --i >= 0;) {
|
||||||
if (r[i].mr_pid) {
|
if (r[i].mr_pid) {
|
||||||
mdbx_jitter4testing(true);
|
mdbx_jitter4testing(true);
|
||||||
@ -2115,7 +2115,7 @@ static int mdbx_txn_renew0(MDB_txn *txn, unsigned flags) {
|
|||||||
|
|
||||||
if (flags & MDB_TXN_RDONLY) {
|
if (flags & MDB_TXN_RDONLY) {
|
||||||
txn->mt_flags = MDB_TXN_RDONLY;
|
txn->mt_flags = MDB_TXN_RDONLY;
|
||||||
MDB_reader *r = txn->mt_u.reader;
|
MDBX_reader *r = txn->mt_u.reader;
|
||||||
if (likely(env->me_flags & MDB_ENV_TXKEY)) {
|
if (likely(env->me_flags & MDB_ENV_TXKEY)) {
|
||||||
mdbx_assert(env, !(env->me_flags & MDB_NOTLS));
|
mdbx_assert(env, !(env->me_flags & MDB_NOTLS));
|
||||||
r = mdbx_thread_rthc_get(env->me_txkey);
|
r = mdbx_thread_rthc_get(env->me_txkey);
|
||||||
@ -3890,7 +3890,7 @@ static int __cold mdbx_setup_lck(MDB_env *env, char *lck_pathname, int mode) {
|
|||||||
return err;
|
return err;
|
||||||
|
|
||||||
if (rc == MDBX_RESULT_TRUE) {
|
if (rc == MDBX_RESULT_TRUE) {
|
||||||
off_t wanna = roundup2((env->me_maxreaders - 1) * sizeof(MDB_reader) +
|
off_t wanna = roundup2((env->me_maxreaders - 1) * sizeof(MDBX_reader) +
|
||||||
sizeof(MDBX_lockinfo),
|
sizeof(MDBX_lockinfo),
|
||||||
env->me_os_psize);
|
env->me_os_psize);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
@ -3907,7 +3907,7 @@ static int __cold mdbx_setup_lck(MDB_env *env, char *lck_pathname, int mode) {
|
|||||||
size = wanna;
|
size = wanna;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
env->me_maxreaders = (size - sizeof(MDBX_lockinfo)) / sizeof(MDB_reader) + 1;
|
env->me_maxreaders = (size - sizeof(MDBX_lockinfo)) / sizeof(MDBX_reader) + 1;
|
||||||
|
|
||||||
void *addr = NULL;
|
void *addr = NULL;
|
||||||
err = mdbx_mmap(&addr, size, true, env->me_lfd);
|
err = mdbx_mmap(&addr, size, true, env->me_lfd);
|
||||||
@ -4185,7 +4185,7 @@ static void __cold mdbx_env_close0(MDB_env *env) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mdbx_munmap((void *)env->me_lck,
|
mdbx_munmap((void *)env->me_lck,
|
||||||
(env->me_maxreaders - 1) * sizeof(MDB_reader) +
|
(env->me_maxreaders - 1) * sizeof(MDBX_reader) +
|
||||||
sizeof(MDBX_lockinfo));
|
sizeof(MDBX_lockinfo));
|
||||||
env->me_lck = NULL;
|
env->me_lck = NULL;
|
||||||
env->me_pid = 0;
|
env->me_pid = 0;
|
||||||
@ -8666,7 +8666,7 @@ int __cold mdbx_env_info(MDB_env *env, MDBX_envinfo *arg, size_t bytes) {
|
|||||||
return MDBX_EINVAL;
|
return MDBX_EINVAL;
|
||||||
|
|
||||||
MDB_meta *m1, *m2;
|
MDB_meta *m1, *m2;
|
||||||
MDB_reader *r;
|
MDBX_reader *r;
|
||||||
unsigned i;
|
unsigned i;
|
||||||
|
|
||||||
m1 = METAPAGE_1(env);
|
m1 = METAPAGE_1(env);
|
||||||
@ -9137,7 +9137,7 @@ int __cold mdbx_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) {
|
|||||||
return MDBX_EBADSIGN;
|
return MDBX_EBADSIGN;
|
||||||
|
|
||||||
unsigned snap_nreaders = env->me_lck->mti_numreaders;
|
unsigned snap_nreaders = env->me_lck->mti_numreaders;
|
||||||
MDB_reader *mr = env->me_lck->mti_readers;
|
MDBX_reader *mr = env->me_lck->mti_readers;
|
||||||
for (unsigned i = 0; i < snap_nreaders; i++) {
|
for (unsigned i = 0; i < snap_nreaders; i++) {
|
||||||
if (mr[i].mr_pid) {
|
if (mr[i].mr_pid) {
|
||||||
txnid_t txnid = mr[i].mr_txnid;
|
txnid_t txnid = mr[i].mr_txnid;
|
||||||
@ -9221,7 +9221,7 @@ int __cold mdbx_reader_check0(MDB_env *env, int rdt_locked, int *dead) {
|
|||||||
pids[0] = 0;
|
pids[0] = 0;
|
||||||
|
|
||||||
int rc = MDBX_RESULT_FALSE, count = 0;
|
int rc = MDBX_RESULT_FALSE, count = 0;
|
||||||
MDB_reader *mr = env->me_lck->mti_readers;
|
MDBX_reader *mr = env->me_lck->mti_readers;
|
||||||
|
|
||||||
for (unsigned i = 0; i < snap_nreaders; i++) {
|
for (unsigned i = 0; i < snap_nreaders; i++) {
|
||||||
const mdbx_pid_t pid = mr[i].mr_pid;
|
const mdbx_pid_t pid = mr[i].mr_pid;
|
||||||
@ -9586,7 +9586,7 @@ static txnid_t __cold mdbx_oomkick(MDB_env *env, txnid_t oldest) {
|
|||||||
return snap;
|
return snap;
|
||||||
}
|
}
|
||||||
|
|
||||||
MDB_reader *r;
|
MDBX_reader *r;
|
||||||
mdbx_tid_t tid;
|
mdbx_tid_t tid;
|
||||||
mdbx_pid_t pid;
|
mdbx_pid_t pid;
|
||||||
int rc;
|
int rc;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user