mdbx: explicity types inside LCK-file, reserve ABA-solver for 32-bit archs.

Change-Id: I2d1235365709858e2deb9f195f0fe8403721c705
This commit is contained in:
Leonid Yuriev 2019-09-23 15:32:29 +03:00
parent 3549744f40
commit b3c2118eb4
5 changed files with 46 additions and 42 deletions

View File

@ -481,7 +481,7 @@ __cold void mdbx_rthc_thread_dtor(void *ptr) {
mdbx_trace(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", mdbx_getpid(), mdbx_trace(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", mdbx_getpid(),
(uintptr_t)mdbx_thread_self(), ptr); (uintptr_t)mdbx_thread_self(), ptr);
const mdbx_pid_t self_pid = mdbx_getpid(); const uint32_t self_pid = mdbx_getpid();
for (unsigned i = 0; i < rthc_count; ++i) { for (unsigned i = 0; i < rthc_count; ++i) {
if (!rthc_table[i].key_valid) if (!rthc_table[i].key_valid)
continue; continue;
@ -580,7 +580,7 @@ __cold void mdbx_rthc_global_dtor(void) {
thread_key_delete(rthc_key); thread_key_delete(rthc_key);
#endif #endif
const mdbx_pid_t self_pid = mdbx_getpid(); const uint32_t self_pid = mdbx_getpid();
for (unsigned i = 0; i < rthc_count; ++i) { for (unsigned i = 0; i < rthc_count; ++i) {
if (!rthc_table[i].key_valid) if (!rthc_table[i].key_valid)
continue; continue;
@ -673,7 +673,7 @@ __cold void mdbx_rthc_remove(const mdbx_thread_key_t key) {
for (unsigned i = 0; i < rthc_count; ++i) { for (unsigned i = 0; i < rthc_count; ++i) {
if (rthc_table[i].key_valid && key == rthc_table[i].thr_tls_key) { if (rthc_table[i].key_valid && key == rthc_table[i].thr_tls_key) {
const mdbx_pid_t self_pid = mdbx_getpid(); const uint32_t self_pid = mdbx_getpid();
mdbx_trace("== [%i], %p ...%p, current-pid %d", i, rthc_table[i].begin, mdbx_trace("== [%i], %p ...%p, current-pid %d", i, rthc_table[i].begin,
rthc_table[i].end, self_pid); rthc_table[i].end, self_pid);
@ -756,7 +756,7 @@ static int uniq_peek(const mdbx_mmap_t *pending, mdbx_mmap_t *scan) {
static int uniq_poke(const mdbx_mmap_t *pending, mdbx_mmap_t *scan, static int uniq_poke(const mdbx_mmap_t *pending, mdbx_mmap_t *scan,
uint64_t *abra) { uint64_t *abra) {
if (*abra == 0) { if (*abra == 0) {
const mdbx_tid_t tid = mdbx_thread_self(); const size_t tid = mdbx_thread_self();
size_t uit = 0; size_t uit = 0;
memcpy(&uit, &tid, (sizeof(tid) < sizeof(uit)) ? sizeof(tid) : sizeof(uit)); memcpy(&uit, &tid, (sizeof(tid) < sizeof(uit)) ? sizeof(tid) : sizeof(uit));
*abra = *abra =
@ -3583,6 +3583,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
if (flags & MDBX_RDONLY) { if (flags & MDBX_RDONLY) {
txn->mt_flags = MDBX_RDONLY | (env->me_flags & MDBX_NOTLS); txn->mt_flags = MDBX_RDONLY | (env->me_flags & MDBX_NOTLS);
MDBX_reader *r = txn->mt_ro_reader; MDBX_reader *r = txn->mt_ro_reader;
STATIC_ASSERT(sizeof(size_t) == sizeof(r->mr_tid));
if (likely(env->me_flags & MDBX_ENV_TXKEY)) { if (likely(env->me_flags & MDBX_ENV_TXKEY)) {
mdbx_assert(env, !(env->me_flags & MDBX_NOTLS)); mdbx_assert(env, !(env->me_flags & MDBX_NOTLS));
r = thread_rthc_get(env->me_txkey); r = thread_rthc_get(env->me_txkey);
@ -3599,7 +3600,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
return MDBX_BAD_RSLOT; return MDBX_BAD_RSLOT;
} else if (env->me_lck) { } else if (env->me_lck) {
unsigned slot, nreaders; unsigned slot, nreaders;
const mdbx_tid_t tid = mdbx_thread_self(); const size_t tid = mdbx_thread_self();
mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC); mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC);
mdbx_assert(env, env->me_lck->mti_os_and_format == MDBX_LOCK_FORMAT); mdbx_assert(env, env->me_lck->mti_os_and_format == MDBX_LOCK_FORMAT);
@ -6614,7 +6615,7 @@ mdbx_env_set_geometry(MDBX_env *env, intptr_t size_lower, intptr_t size_now,
goto bailout; goto bailout;
/* Check if there are any reading threads that do not use the SRWL */ /* Check if there are any reading threads that do not use the SRWL */
const mdbx_pid_t CurrentTid = GetCurrentThreadId(); const size_t CurrentTid = GetCurrentThreadId();
const MDBX_reader *const begin = env->me_lck->mti_readers; const MDBX_reader *const begin = env->me_lck->mti_readers;
const MDBX_reader *const end = begin + env->me_lck->mti_numreaders; const MDBX_reader *const end = begin + env->me_lck->mti_numreaders;
for (const MDBX_reader *reader = begin; reader < end; ++reader) { for (const MDBX_reader *reader = begin; reader < end; ++reader) {
@ -12961,7 +12962,7 @@ int __cold mdbx_env_info_ex(const MDBX_env *env, const MDBX_txn *txn,
arg->mi_self_latter_reader_txnid = arg->mi_latter_reader_txnid = arg->mi_self_latter_reader_txnid = arg->mi_latter_reader_txnid =
arg->mi_recent_txnid; arg->mi_recent_txnid;
for (unsigned i = 0; i < arg->mi_numreaders; ++i) { for (unsigned i = 0; i < arg->mi_numreaders; ++i) {
const mdbx_pid_t pid = r[i].mr_pid; const uint32_t pid = r[i].mr_pid;
if (pid) { if (pid) {
const txnid_t txnid = r[i].mr_txnid; const txnid_t txnid = r[i].mr_txnid;
if (arg->mi_latter_reader_txnid > txnid) if (arg->mi_latter_reader_txnid > txnid)
@ -13517,11 +13518,11 @@ int __cold mdbx_reader_list(MDBX_env *env, MDBX_reader_list_func *func,
for (unsigned i = 0; i < snap_nreaders; i++) { for (unsigned i = 0; i < snap_nreaders; i++) {
const MDBX_reader *r = env->me_lck->mti_readers + i; const MDBX_reader *r = env->me_lck->mti_readers + i;
retry_reader:; retry_reader:;
const mdbx_pid_t pid = r->mr_pid; const uint32_t pid = r->mr_pid;
if (!pid) if (!pid)
continue; continue;
txnid_t txnid = r->mr_txnid; txnid_t txnid = r->mr_txnid;
const mdbx_tid_t tid = r->mr_tid; const size_t tid = r->mr_tid;
const pgno_t pages_used = r->mr_snapshot_pages_used; const pgno_t pages_used = r->mr_snapshot_pages_used;
const uint64_t reader_pages_retired = r->mr_snapshot_pages_retired; const uint64_t reader_pages_retired = r->mr_snapshot_pages_retired;
mdbx_compiler_barrier(); mdbx_compiler_barrier();
@ -13556,7 +13557,7 @@ int __cold mdbx_reader_list(MDBX_env *env, MDBX_reader_list_func *func,
reader_pages_retired)) reader_pages_retired))
: 0; : 0;
} }
rc = func(ctx, ++serial, i, pid, tid, txnid, lag, bytes_used, rc = func(ctx, ++serial, i, pid, (mdbx_tid_t)tid, txnid, lag, bytes_used,
bytes_retained); bytes_retained);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
break; break;
@ -13568,7 +13569,7 @@ int __cold mdbx_reader_list(MDBX_env *env, MDBX_reader_list_func *func,
/* Insert pid into list if not already present. /* Insert pid into list if not already present.
* return -1 if already present. */ * return -1 if already present. */
static int __cold mdbx_pid_insert(mdbx_pid_t *ids, mdbx_pid_t pid) { static int __cold mdbx_pid_insert(uint32_t *ids, uint32_t pid) {
/* binary search of pid in list */ /* binary search of pid in list */
unsigned base = 0; unsigned base = 0;
unsigned cursor = 1; unsigned cursor = 1;
@ -13637,11 +13638,11 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) {
lck->mti_reader_check_timestamp = mdbx_osal_monotime(); lck->mti_reader_check_timestamp = mdbx_osal_monotime();
const unsigned snap_nreaders = lck->mti_numreaders; const unsigned snap_nreaders = lck->mti_numreaders;
mdbx_pid_t pidsbuf_onstask[142]; uint32_t pidsbuf_onstask[142];
mdbx_pid_t *const pids = uint32_t *const pids =
(snap_nreaders < ARRAY_LENGTH(pidsbuf_onstask)) (snap_nreaders < ARRAY_LENGTH(pidsbuf_onstask))
? pidsbuf_onstask ? pidsbuf_onstask
: mdbx_malloc((snap_nreaders + 1) * sizeof(mdbx_pid_t)); : mdbx_malloc((snap_nreaders + 1) * sizeof(uint32_t));
if (unlikely(!pids)) if (unlikely(!pids))
return MDBX_ENOMEM; return MDBX_ENOMEM;
@ -13649,7 +13650,7 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) {
int rc = MDBX_SUCCESS, count = 0; int rc = MDBX_SUCCESS, count = 0;
for (unsigned i = 0; i < snap_nreaders; i++) { for (unsigned i = 0; i < snap_nreaders; i++) {
const mdbx_pid_t pid = lck->mti_readers[i].mr_pid; const uint32_t pid = lck->mti_readers[i].mr_pid;
if (pid == 0) if (pid == 0)
continue /* skip empty */; continue /* skip empty */;
if (pid == env->me_pid) if (pid == env->me_pid)
@ -13810,15 +13811,15 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
if (!env->me_oom_func) if (!env->me_oom_func)
break; break;
mdbx_pid_t pid = asleep->mr_pid; uint32_t pid = asleep->mr_pid;
mdbx_tid_t tid = asleep->mr_tid; size_t tid = asleep->mr_tid;
if (asleep->mr_txnid != laggard || pid <= 0) if (asleep->mr_txnid != laggard || pid <= 0)
continue; continue;
const txnid_t gap = const txnid_t gap =
mdbx_meta_txnid_stable(env, mdbx_meta_head(env)) - laggard; mdbx_meta_txnid_stable(env, mdbx_meta_head(env)) - laggard;
int rc = int rc =
env->me_oom_func(env, pid, tid, laggard, env->me_oom_func(env, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, retry); (gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, retry);
if (rc < 0) if (rc < 0)
break; break;

View File

@ -448,7 +448,7 @@ typedef struct MDBX_reader {
* anything; all we need to know is which version of the DB they * anything; all we need to know is which version of the DB they
* started from so we can avoid overwriting any data used in that * started from so we can avoid overwriting any data used in that
* particular version. */ * particular version. */
volatile txnid_t mr_txnid; volatile uint64_t /* txnid_t */ mr_txnid;
/* The information we store in a single slot of the reader table. /* The information we store in a single slot of the reader table.
* In addition to a transaction ID, we also record the process and * In addition to a transaction ID, we also record the process and
@ -460,15 +460,16 @@ typedef struct MDBX_reader {
* opening the lock file. */ * opening the lock file. */
/* The thread ID of the thread owning this txn. */ /* The thread ID of the thread owning this txn. */
union { #if MDBX_WORDBITS >= 64
volatile mdbx_tid_t mr_tid; volatile uint64_t mr_tid;
volatile uint64_t mr_tid_u64; #else
}; volatile uint32_t mr_tid;
volatile uint32_t mr_aba_curer; /* CSN to resolve ABA_problems on 32-bit arch,
unused for now */
#endif
/* The process ID of the process owning this reader txn. */ /* The process ID of the process owning this reader txn. */
union { volatile uint32_t mr_pid;
volatile mdbx_pid_t mr_pid;
volatile uint32_t mr_pid_u32;
};
/* The number of pages used in the reader's MVCC snapshot, /* The number of pages used in the reader's MVCC snapshot,
* i.e. the value of meta->mm_geo.next and txn->mt_next_pgno */ * i.e. the value of meta->mm_geo.next and txn->mt_next_pgno */
volatile pgno_t mr_snapshot_pages_used; volatile pgno_t mr_snapshot_pages_used;
@ -754,7 +755,7 @@ struct MDBX_txn {
* dirty/spilled pages. Thus commit(nested txn) has room to merge * dirty/spilled pages. Thus commit(nested txn) has room to merge
* dirtylist into mt_parent after freeing hidden mt_parent pages. */ * dirtylist into mt_parent after freeing hidden mt_parent pages. */
unsigned mt_dirtyroom; unsigned mt_dirtyroom;
mdbx_tid_t mt_owner; /* thread ID that owns this transaction */ size_t mt_owner; /* thread ID that owns this transaction */
mdbx_canary mt_canary; mdbx_canary mt_canary;
}; };
@ -878,7 +879,7 @@ struct MDBX_env {
mdbx_fastmutex_t me_dbi_lock; mdbx_fastmutex_t me_dbi_lock;
MDBX_dbi me_numdbs; /* number of DBs opened */ MDBX_dbi me_numdbs; /* number of DBs opened */
MDBX_dbi me_maxdbs; /* size of the DB table */ MDBX_dbi me_maxdbs; /* size of the DB table */
mdbx_pid_t me_pid; /* process ID of this env */ uint32_t me_pid; /* process ID of this env */
mdbx_thread_key_t me_txkey; /* thread-key for readers */ mdbx_thread_key_t me_txkey; /* thread-key for readers */
char *me_path; /* path to the DB files */ char *me_path; /* path to the DB files */
void *me_pbuf; /* scratch area for DUPSORT put() */ void *me_pbuf; /* scratch area for DUPSORT put() */
@ -903,9 +904,9 @@ struct MDBX_env {
unsigned me_maxgc_ov1page; unsigned me_maxgc_ov1page;
/* Max size of a node on a page */ /* Max size of a node on a page */
unsigned me_nodemax; unsigned me_nodemax;
unsigned me_maxkey_limit; /* max size of a key */ unsigned me_maxkey_limit; /* max size of a key */
mdbx_pid_t me_live_reader; /* have liveness lock in reader table */ uint32_t me_live_reader; /* have liveness lock in reader table */
void *me_userctx; /* User-settable context */ void *me_userctx; /* User-settable context */
volatile uint64_t *me_unsynced_timeout; volatile uint64_t *me_unsynced_timeout;
volatile uint64_t *me_autosync_period; volatile uint64_t *me_autosync_period;
volatile pgno_t *me_unsynced_pages; volatile pgno_t *me_unsynced_pages;

View File

@ -187,7 +187,7 @@ MDBX_INTERNAL_FUNC int mdbx_rpid_clear(MDBX_env *env) {
return lck_op(env->me_lfd, op_setlk, F_UNLCK, env->me_pid, 1); return lck_op(env->me_lfd, op_setlk, F_UNLCK, env->me_pid, 1);
} }
MDBX_INTERNAL_FUNC int mdbx_rpid_check(MDBX_env *env, mdbx_pid_t pid) { MDBX_INTERNAL_FUNC int mdbx_rpid_check(MDBX_env *env, uint32_t pid) {
assert(env->me_lfd != INVALID_HANDLE_VALUE); assert(env->me_lfd != INVALID_HANDLE_VALUE);
assert(pid > 0); assert(pid > 0);
return lck_op(env->me_lfd, op_getlk, F_WRLCK, pid, 1); return lck_op(env->me_lfd, op_getlk, F_WRLCK, pid, 1);

View File

@ -244,13 +244,13 @@ static int suspend_and_append(mdbx_handle_array_t **array,
MDBX_INTERNAL_FUNC int MDBX_INTERNAL_FUNC int
mdbx_suspend_threads_before_remap(MDBX_env *env, mdbx_handle_array_t **array) { mdbx_suspend_threads_before_remap(MDBX_env *env, mdbx_handle_array_t **array) {
const mdbx_pid_t CurrentTid = GetCurrentThreadId(); const size_t CurrentTid = GetCurrentThreadId();
int rc; int rc;
if (env->me_lck) { if (env->me_lck) {
/* Scan LCK for threads of the current process */ /* Scan LCK for threads of the current process */
const MDBX_reader *const begin = env->me_lck->mti_readers; const MDBX_reader *const begin = env->me_lck->mti_readers;
const MDBX_reader *const end = begin + env->me_lck->mti_numreaders; const MDBX_reader *const end = begin + env->me_lck->mti_numreaders;
const mdbx_tid_t WriteTxnOwner = env->me_txn0 ? env->me_txn0->mt_owner : 0; const size_t WriteTxnOwner = env->me_txn0 ? env->me_txn0->mt_owner : 0;
for (const MDBX_reader *reader = begin; reader < end; ++reader) { for (const MDBX_reader *reader = begin; reader < end; ++reader) {
if (reader->mr_pid != env->me_pid || !reader->mr_tid) { if (reader->mr_pid != env->me_pid || !reader->mr_tid) {
skip_lck: skip_lck:
@ -265,7 +265,7 @@ mdbx_suspend_threads_before_remap(MDBX_env *env, mdbx_handle_array_t **array) {
goto skip_lck; goto skip_lck;
} }
rc = suspend_and_append(array, reader->mr_tid); rc = suspend_and_append(array, (mdbx_tid_t)reader->mr_tid);
if (rc != MDBX_SUCCESS) { if (rc != MDBX_SUCCESS) {
bailout_lck: bailout_lck:
(void)mdbx_resume_threads_after_remap(*array); (void)mdbx_resume_threads_after_remap(*array);
@ -273,7 +273,7 @@ mdbx_suspend_threads_before_remap(MDBX_env *env, mdbx_handle_array_t **array) {
} }
} }
if (WriteTxnOwner && WriteTxnOwner != CurrentTid) { if (WriteTxnOwner && WriteTxnOwner != CurrentTid) {
rc = suspend_and_append(array, WriteTxnOwner); rc = suspend_and_append(array, (mdbx_tid_t)WriteTxnOwner);
if (rc != MDBX_SUCCESS) if (rc != MDBX_SUCCESS)
goto bailout_lck; goto bailout_lck;
} }
@ -585,7 +585,7 @@ MDBX_INTERNAL_FUNC int mdbx_rpid_clear(MDBX_env *env) {
* MDBX_RESULT_TRUE, if pid is live (unable to acquire lock) * MDBX_RESULT_TRUE, if pid is live (unable to acquire lock)
* MDBX_RESULT_FALSE, if pid is dead (lock acquired) * MDBX_RESULT_FALSE, if pid is dead (lock acquired)
* or otherwise the errcode. */ * or otherwise the errcode. */
MDBX_INTERNAL_FUNC int mdbx_rpid_check(MDBX_env *env, mdbx_pid_t pid) { MDBX_INTERNAL_FUNC int mdbx_rpid_check(MDBX_env *env, uint32_t pid) {
(void)env; (void)env;
HANDLE hProcess = OpenProcess(SYNCHRONIZE, FALSE, pid); HANDLE hProcess = OpenProcess(SYNCHRONIZE, FALSE, pid);
int rc; int rc;

View File

@ -624,7 +624,8 @@ MDBX_INTERNAL_FUNC int mdbx_msync(mdbx_mmap_t *map, size_t offset,
size_t length, int async); size_t length, int async);
MDBX_INTERNAL_FUNC int mdbx_check4nonlocal(mdbx_filehandle_t handle, int flags); MDBX_INTERNAL_FUNC int mdbx_check4nonlocal(mdbx_filehandle_t handle, int flags);
static __maybe_unused __inline mdbx_pid_t mdbx_getpid(void) { static __maybe_unused __inline uint32_t mdbx_getpid(void) {
STATIC_ASSERT(sizeof(mdbx_pid_t) <= sizeof(uint32_t));
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
return GetCurrentProcessId(); return GetCurrentProcessId();
#else #else
@ -632,11 +633,12 @@ static __maybe_unused __inline mdbx_pid_t mdbx_getpid(void) {
#endif #endif
} }
static __maybe_unused __inline mdbx_tid_t mdbx_thread_self(void) { static __maybe_unused __inline size_t mdbx_thread_self(void) {
STATIC_ASSERT(sizeof(mdbx_tid_t) <= sizeof(size_t));
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
return GetCurrentThreadId(); return GetCurrentThreadId();
#else #else
return pthread_self(); return (size_t)pthread_self();
#endif #endif
} }
@ -765,7 +767,7 @@ MDBX_INTERNAL_FUNC int mdbx_rpid_clear(MDBX_env *env);
/// MDBX_RESULT_FALSE (0) - если процесс-читатель с соответствующим pid /// MDBX_RESULT_FALSE (0) - если процесс-читатель с соответствующим pid
/// отсутствует или не работает с БД (индицирующая блокировка отсутствует). /// отсутствует или не работает с БД (индицирующая блокировка отсутствует).
/// Иначе (не 0 и не -1) - код ошибки. /// Иначе (не 0 и не -1) - код ошибки.
MDBX_INTERNAL_FUNC int mdbx_rpid_check(MDBX_env *env, mdbx_pid_t pid); MDBX_INTERNAL_FUNC int mdbx_rpid_check(MDBX_env *env, uint32_t pid);
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
typedef union MDBX_srwlock { typedef union MDBX_srwlock {