mdbx: rework lck/body setup.

This commit is contained in:
Leo Yuriev 2017-04-24 15:51:21 +03:00
parent 19d877635c
commit 0d59cd4fe2
2 changed files with 127 additions and 129 deletions

View File

@ -655,7 +655,7 @@ static int mdbx_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
static int mdbx_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
pgno_t newpgno, unsigned nflags);
static int mdbx_env_read_header(MDB_env *env, MDB_meta *meta);
static int mdbx_read_header(MDB_env *env, MDB_meta *meta);
static int mdbx_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending);
static void mdbx_env_close0(MDB_env *env);
@ -3264,7 +3264,7 @@ fail:
* @param[in] env the environment handle
* @param[out] meta address of where to store the meta information
* @return 0 on success, non-zero on failure. */
static int __cold mdbx_env_read_header(MDB_env *env, MDB_meta *meta) {
static int __cold mdbx_read_header(MDB_env *env, MDB_meta *meta) {
MDB_metabuf pbuf;
MDB_page *p;
MDB_meta *m;
@ -3612,16 +3612,14 @@ static int __cold mdbx_env_map(MDB_env *env, void *addr, size_t usedsize) {
#endif
#ifdef MADV_DONTDUMP
if (!(flags & MDBX_PAGEPERTURB)) {
if (!(flags & MDBX_PAGEPERTURB))
(void)madvise(env->me_map, env->me_mapsize, MADV_DONTDUMP);
}
#endif
#ifdef MADV_REMOVE
if (flags & MDB_WRITEMAP) {
if (flags & MDB_WRITEMAP)
(void)madvise(env->me_map + usedsize, env->me_mapsize - usedsize,
MADV_REMOVE);
}
#else
(void)usedsize;
#endif
@ -3739,14 +3737,17 @@ int __cold mdbx_env_get_maxreaders(MDB_env *env, unsigned *readers) {
}
/* Further setup required for opening an LMDB environment */
static int __cold mdbx_env_open2(MDB_env *env, MDB_meta *meta) {
int newenv = 0;
int rc = mdbx_env_read_header(env, meta);
if (unlikely(rc != MDB_SUCCESS)) {
if (rc != ENOENT)
return rc;
mdbx_debug("new mdbenv");
newenv = 1;
static int __cold mdbx_setup_body(MDB_env *env, MDB_meta *meta, int lck_rc) {
int rc = MDBX_RESULT_FALSE;
int err = mdbx_read_header(env, meta);
if (unlikely(err != MDB_SUCCESS)) {
if (lck_rc != /* lck exclusive */ MDBX_RESULT_TRUE || err != MDBX_ENODATA ||
(env->me_flags & MDB_RDONLY))
return err;
mdbx_debug("create new database");
rc = /* new database */ MDBX_RESULT_TRUE;
env->me_psize = env->me_os_psize;
if (env->me_psize > MAX_PAGESIZE)
env->me_psize = MAX_PAGESIZE;
@ -3771,84 +3772,77 @@ static int __cold mdbx_env_open2(MDB_env *env, MDB_meta *meta) {
meta->mm_mapsize = env->me_mapsize;
}
if (newenv) {
if (rc == MDBX_RESULT_TRUE) {
/* mdbx_env_map() may grow the datafile. Write the metapages
* first, so the file will be valid if initialization fails. */
rc = mdbx_env_init_meta(env, meta);
if (unlikely(rc != MDB_SUCCESS))
return rc;
err = mdbx_env_init_meta(env, meta);
if (unlikely(err != MDB_SUCCESS))
return err;
rc = mdbx_ftruncate(env->me_fd, env->me_mapsize);
if (unlikely(rc != MDB_SUCCESS))
return rc;
err = mdbx_ftruncate(env->me_fd, env->me_mapsize);
if (unlikely(err != MDB_SUCCESS))
return err;
}
const size_t usedsize = (meta->mm_last_pg + 1) * env->me_psize;
rc = mdbx_env_map(env, NULL, usedsize);
if (rc)
return rc;
err = mdbx_env_map(env, NULL, usedsize);
if (err)
return err;
mdbx_env_setup_limits(env, env->me_psize);
return MDB_SUCCESS;
return rc;
}
/****************************************************************************/
/* Open and/or initialize the lock region for the environment. */
static int __cold mdbx_env_setup_locks(MDB_env *env, char *lpath, int mode,
int *excl) {
static int __cold mdbx_setup_locks(MDB_env *env, char *lck_pathname, int mode) {
off_t size;
assert(env->me_fd != INVALID_HANDLE_VALUE);
assert(env->me_lfd == INVALID_HANDLE_VALUE);
int rc = mdbx_openfile(lpath, O_RDWR | O_CREAT, mode, &env->me_lfd);
if (rc != MDB_SUCCESS) {
if (rc == EROFS && (env->me_flags & MDB_RDONLY)) {
env->me_lfd = INVALID_HANDLE_VALUE;
rc = MDB_SUCCESS;
} else {
return rc;
}
int err = mdbx_openfile(lck_pathname, O_RDWR | O_CREAT, mode, &env->me_lfd);
if (err != MDB_SUCCESS) {
if (err != EROFS || (env->me_flags & MDB_RDONLY) == 0)
return err;
/* LY: without-lck mode (e.g. on read-only filesystem) */
env->me_lfd = INVALID_HANDLE_VALUE;
}
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it. */
rc = mdbx_lck_seize(env);
if (rc == MDBX_RESULT_TRUE)
*excl = true;
else if (rc == MDBX_RESULT_FALSE)
*excl = false;
else
const int rc = mdbx_lck_seize(env);
if (MDBX_IS_ERROR(rc))
return rc;
rc = mdbx_filesize(env->me_lfd, &size);
if (unlikely(rc != MDB_SUCCESS))
return rc;
err = mdbx_filesize(env->me_lfd, &size);
if (unlikely(err != MDB_SUCCESS))
return err;
if (*excl > 0) {
if (rc == MDBX_RESULT_TRUE) {
off_t wanna = roundup2((env->me_maxreaders - 1) * sizeof(MDB_reader) +
sizeof(MDBX_lockinfo),
env->me_os_psize);
if (size != wanna) {
rc = mdbx_ftruncate(env->me_lfd, wanna);
if (unlikely(rc != MDB_SUCCESS))
return rc;
err = mdbx_ftruncate(env->me_lfd, wanna);
if (unlikely(err != MDB_SUCCESS))
return err;
size = wanna;
}
}
env->me_maxreaders = (size - sizeof(MDBX_lockinfo)) / sizeof(MDB_reader) + 1;
void *addr = NULL;
rc = mdbx_mmap(&addr, size, true, env->me_lfd);
if (unlikely(rc != MDB_SUCCESS))
return rc;
err = mdbx_mmap(&addr, size, true, env->me_lfd);
if (unlikely(err != MDB_SUCCESS))
return err;
env->me_txns = addr;
if (!(env->me_flags & MDB_NOTLS)) {
rc = mdbx_rthc_alloc(&env->me_txkey, &env->me_txns->mti_readers[0],
&env->me_txns->mti_readers[env->me_maxreaders]);
if (unlikely(rc != MDB_SUCCESS))
return rc;
err = mdbx_rthc_alloc(&env->me_txkey, &env->me_txns->mti_readers[0],
&env->me_txns->mti_readers[env->me_maxreaders]);
if (unlikely(err != MDB_SUCCESS))
return err;
env->me_flags |= MDB_ENV_TXKEY;
}
@ -3875,11 +3869,12 @@ static int __cold mdbx_env_setup_locks(MDB_env *env, char *lpath, int mode,
return errno;
#endif
if (*excl > 0) {
if (rc == MDBX_RESULT_TRUE) {
/* LY: exlcusive mode, init lck */
memset(env->me_txns, 0, sizeof(MDBX_lockinfo));
rc = mdbx_lck_init(env);
if (rc)
return rc;
err = mdbx_lck_init(env);
if (err)
return err;
env->me_txns->mti_magic = MDB_MAGIC;
env->me_txns->mti_format = MDB_LOCK_FORMAT;
@ -3896,7 +3891,7 @@ static int __cold mdbx_env_setup_locks(MDB_env *env, char *lpath, int mode,
}
}
return MDB_SUCCESS;
return rc;
}
/** The name of the lock file in the DB environment */
@ -3922,8 +3917,8 @@ static int __cold mdbx_env_setup_locks(MDB_env *env, char *lpath, int mode,
int __cold mdbx_env_open_ex(MDB_env *env, const char *path, unsigned flags,
mode_t mode, int *exclusive) {
int oflags, rc, len, excl = -1;
char *lpath, *dpath;
int oflags, rc, len;
char *lck_pathname, *dxb_pathname;
if (unlikely(!env || !path))
return MDBX_EINVAL;
@ -3941,18 +3936,18 @@ int __cold mdbx_env_open_ex(MDB_env *env, const char *path, unsigned flags,
} else {
rc = len + sizeof(LOCKNAME) + len + sizeof(DATANAME);
}
lpath = malloc(rc);
if (!lpath)
return ENOMEM;
lck_pathname = malloc(rc);
if (!lck_pathname)
return MDBX_ENOMEM;
if (flags & MDB_NOSUBDIR) {
dpath = lpath + len + sizeof(LOCKSUFF);
sprintf(lpath, "%s" LOCKSUFF, path);
strcpy(dpath, path);
dxb_pathname = lck_pathname + len + sizeof(LOCKSUFF);
sprintf(lck_pathname, "%s" LOCKSUFF, path);
strcpy(dxb_pathname, path);
} else {
dpath = lpath + len + sizeof(LOCKNAME);
sprintf(lpath, "%s" LOCKNAME, path);
sprintf(dpath, "%s" DATANAME, path);
dxb_pathname = lck_pathname + len + sizeof(LOCKNAME);
sprintf(lck_pathname, "%s" LOCKNAME, path);
sprintf(dxb_pathname, "%s" DATANAME, path);
}
rc = MDB_SUCCESS;
@ -3986,59 +3981,62 @@ int __cold mdbx_env_open_ex(MDB_env *env, const char *path, unsigned flags,
else
oflags = O_RDWR | O_CREAT;
rc = mdbx_openfile(dpath, oflags, mode, &env->me_fd);
rc = mdbx_openfile(dxb_pathname, oflags, mode, &env->me_fd);
if (rc != MDB_SUCCESS)
goto bailout;
rc = mdbx_env_setup_locks(env, lpath, mode, &excl);
if (rc)
const int lck_rc = mdbx_setup_locks(env, lck_pathname, mode);
if (MDBX_IS_ERROR(lck_rc)) {
rc = lck_rc;
goto bailout;
}
MDB_meta meta;
rc = mdbx_env_open2(env, &meta);
if (rc == MDB_SUCCESS) {
mdbx_debug("opened dbenv %p", (void *)env);
if (excl > 0) {
env->me_txns->mti_envmode = env->me_flags;
if (exclusive == NULL || *exclusive < 2) {
/* LY: downgrade lock only if exclusive access not requested.
* in case exclusive==1, just leave value as is. */
rc = mdbx_lck_downgrade(env);
if (rc != MDB_SUCCESS)
goto bailout;
excl = 0;
}
} else {
if (exclusive) {
/* LY: just indicate that is not an exclusive access. */
*exclusive = 0;
}
if ((env->me_txns->mti_envmode ^ env->me_flags) &
(MDB_WRITEMAP | MDB_NOSYNC | MDB_NOMETASYNC | MDB_MAPASYNC)) {
/* LY: Current mode/flags incompatible with requested. */
rc = MDB_INCOMPATIBLE;
const int dxb_rc = mdbx_setup_body(env, &meta, lck_rc);
if (MDBX_IS_ERROR(dxb_rc)) {
rc = dxb_rc;
goto bailout;
}
mdbx_debug("opened dbenv %p", (void *)env);
if (lck_rc == MDBX_RESULT_TRUE) {
env->me_txns->mti_envmode = env->me_flags;
if (exclusive == NULL || *exclusive < 2) {
/* LY: downgrade lock only if exclusive access not requested.
* in case exclusive==1, just leave value as is. */
rc = mdbx_lck_downgrade(env);
if (rc != MDB_SUCCESS)
goto bailout;
}
}
if (!(flags & MDB_RDONLY)) {
MDB_txn *txn;
int tsize = sizeof(MDB_txn),
size = tsize +
env->me_maxdbs * (sizeof(MDB_db) + sizeof(MDB_cursor *) +
sizeof(unsigned) + 1);
if ((env->me_pbuf = calloc(1, env->me_psize)) &&
(txn = calloc(1, size))) {
txn->mt_dbs = (MDB_db *)((char *)txn + tsize);
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbiseqs = (unsigned *)(txn->mt_cursors + env->me_maxdbs);
txn->mt_dbflags = (unsigned char *)(txn->mt_dbiseqs + env->me_maxdbs);
txn->mt_env = env;
txn->mt_dbxs = env->me_dbxs;
txn->mt_flags = MDB_TXN_FINISHED;
env->me_txn0 = txn;
} else {
rc = ENOMEM;
}
} else {
if (exclusive) {
/* LY: just indicate that is not an exclusive access. */
*exclusive = 0;
}
if ((env->me_txns->mti_envmode ^ env->me_flags) &
(MDB_WRITEMAP | MDB_NOSYNC | MDB_NOMETASYNC | MDB_MAPASYNC)) {
/* LY: Current mode/flags incompatible with requested. */
rc = MDB_INCOMPATIBLE;
goto bailout;
}
}
if (!(flags & MDB_RDONLY)) {
MDB_txn *txn;
int tsize = sizeof(MDB_txn),
size = tsize +
env->me_maxdbs * (sizeof(MDB_db) + sizeof(MDB_cursor *) +
sizeof(unsigned) + 1);
if ((env->me_pbuf = calloc(1, env->me_psize)) && (txn = calloc(1, size))) {
txn->mt_dbs = (MDB_db *)((char *)txn + tsize);
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbiseqs = (unsigned *)(txn->mt_cursors + env->me_maxdbs);
txn->mt_dbflags = (unsigned char *)(txn->mt_dbiseqs + env->me_maxdbs);
txn->mt_env = env;
txn->mt_dbxs = env->me_dbxs;
txn->mt_flags = MDB_TXN_FINISHED;
env->me_txn0 = txn;
} else {
rc = MDBX_ENOMEM;
}
}
@ -4063,7 +4061,7 @@ int __cold mdbx_env_open_ex(MDB_env *env, const char *path, unsigned flags,
bailout:
if (rc)
mdbx_env_close0(env);
free(lpath);
free(lck_pathname);
return rc;
}
@ -8464,24 +8462,24 @@ int __cold mdbx_env_copyfd(MDB_env *env, mdbx_filehandle_t fd) {
int __cold mdbx_env_copy2(MDB_env *env, const char *path, unsigned flags) {
int rc, len;
char *lpath;
char *lck_pathname;
mdbx_filehandle_t newfd = INVALID_HANDLE_VALUE;
if (env->me_flags & MDB_NOSUBDIR) {
lpath = (char *)path;
lck_pathname = (char *)path;
} else {
len = strlen(path);
len += sizeof(DATANAME);
lpath = malloc(len);
if (!lpath)
return ENOMEM;
sprintf(lpath, "%s" DATANAME, path);
lck_pathname = malloc(len);
if (!lck_pathname)
return MDBX_ENOMEM;
sprintf(lck_pathname, "%s" DATANAME, path);
}
/* The destination path must exist, but the destination file must not.
* We don't want the OS to cache the writes, since the source data is
* already in the OS cache. */
rc = mdbx_openfile(lpath, O_WRONLY | O_CREAT | O_EXCL, 0666, &newfd);
rc = mdbx_openfile(lck_pathname, O_WRONLY | O_CREAT | O_EXCL, 0666, &newfd);
if (rc == MDB_SUCCESS) {
if (env->me_psize >= env->me_os_psize) {
#ifdef F_NOCACHE /* __APPLE__ */
@ -8496,7 +8494,7 @@ int __cold mdbx_env_copy2(MDB_env *env, const char *path, unsigned flags) {
}
if (!(env->me_flags & MDB_NOSUBDIR))
free(lpath);
free(lck_pathname);
if (newfd != INVALID_HANDLE_VALUE) {
int err = mdbx_closefile(newfd);

View File

@ -333,12 +333,12 @@ int mdbx_pread(mdbx_filehandle_t fd, void *buf, size_t bytes, off_t offset) {
ov.Offset = (DWORD)offset;
ov.OffsetHigh = HIGH_DWORD(offset);
DWORD read;
DWORD read = 0;
if (unlikely(!ReadFile(fd, buf, (DWORD)bytes, &read, &ov))) {
int rc = GetLastError();
if (rc == ERROR_HANDLE_EOF && read == 0 && offset == 0)
return MDBX_ENODATA;
return rc;
if (rc == ERROR_HANDLE_EOF)
return (read == 0 && offset == 0) ? MDBX_ENODATA : ERROR_READ_FAULT;
return (rc == MDB_SUCCESS) ? /* paranoia */ ERROR_READ_FAULT : rc;
}
return (read == bytes) ? MDB_SUCCESS : ERROR_READ_FAULT;
#else