add waiting for fsync in WAL

This commit is contained in:
Anton Popov 2020-09-11 02:24:16 +03:00
parent af4089c606
commit f2a5216e97
5 changed files with 26 additions and 8 deletions

View File

@ -17,7 +17,7 @@ public:
FileSyncGuard(const DiskPtr & disk_, int fd_) : disk(disk_), fd(fd_) {}
FileSyncGuard(const DiskPtr & disk_, const String & path)
: disk(disk_), fd(disk_->open(path, O_RDONLY)) {}
: disk(disk_), fd(disk_->open(path, O_RDWR)) {}
~FileSyncGuard()
{

View File

@ -47,6 +47,7 @@ struct Settings;
M(Bool, fsync_part_directory, false, "Do fsync for part directory after all part operations (writes, renames, etc.).", 0) \
M(UInt64, write_ahead_log_bytes_to_fsync, 100ULL * 1024 * 1024, "Amount of bytes, accumulated in WAL to do fsync.", 0) \
M(UInt64, write_ahead_log_interval_ms_to_fsync, 100, "Interval in milliseconds after which fsync for WAL is being done.", 0) \
M(Bool, in_memory_parts_insert_sync, false, "If true insert of part with in-memory format will wait for fsync of WAL", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \

View File

@ -33,6 +33,7 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
std::lock_guard lock(write_mutex);
out->sync();
sync_scheduled = false;
sync_cv.notify_all();
});
}
@ -50,7 +51,7 @@ void MergeTreeWriteAheadLog::init()
void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_name)
{
std::lock_guard lock(write_mutex);
std::unique_lock lock(write_mutex);
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
min_block_number = std::min(min_block_number, part_info.min_block);
@ -70,7 +71,7 @@ void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_na
void MergeTreeWriteAheadLog::dropPart(const String & part_name)
{
std::lock_guard lock(write_mutex);
std::unique_lock lock(write_mutex);
writeIntBinary(static_cast<UInt8>(0), *out);
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
@ -78,7 +79,7 @@ void MergeTreeWriteAheadLog::dropPart(const String & part_name)
sync(lock);
}
void MergeTreeWriteAheadLog::rotate(const std::lock_guard<std::mutex> &)
void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
{
String new_name = String(WAL_FILE_NAME) + "_"
+ toString(min_block_number) + "_"
@ -90,7 +91,7 @@ void MergeTreeWriteAheadLog::rotate(const std::lock_guard<std::mutex> &)
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
{
std::lock_guard lock(write_mutex);
std::unique_lock lock(write_mutex);
MergeTreeData::MutableDataPartsVector parts;
auto in = disk->readFile(path, DBMS_DEFAULT_BUFFER_SIZE);
@ -185,7 +186,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
return result;
}
void MergeTreeWriteAheadLog::sync(const std::lock_guard<std::mutex> &)
void MergeTreeWriteAheadLog::sync(std::unique_lock<std::mutex> & lock)
{
size_t bytes_to_sync = storage.getSettings()->write_ahead_log_bytes_to_fsync;
time_t time_to_sync = storage.getSettings()->write_ahead_log_interval_ms_to_fsync;
@ -201,6 +202,9 @@ void MergeTreeWriteAheadLog::sync(const std::lock_guard<std::mutex> &)
sync_task->scheduleAfter(time_to_sync);
sync_scheduled = true;
}
if (storage.getSettings()->in_memory_parts_insert_sync)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
}
std::optional<MergeTreeWriteAheadLog::MinMaxBlockNumber>

View File

@ -44,8 +44,8 @@ public:
private:
void init();
void rotate(const std::lock_guard<std::mutex> & lock);
void sync(const std::lock_guard<std::mutex> & lock);
void rotate(const std::unique_lock<std::mutex> & lock);
void sync(std::unique_lock<std::mutex> & lock);
const MergeTreeData & storage;
DiskPtr disk;
@ -60,6 +60,7 @@ private:
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder sync_task;
std::condition_variable sync_cv;
size_t bytes_at_last_sync = 0;
bool sync_scheduled = false;

View File

@ -1,5 +1,17 @@
#!/bin/bash
: '
A simple test for durability. It starts up clickhouse server in qemu VM and runs
inserts via clickhouse benchmark tool. Then it kills VM in random moment and
checks whether table contains broken parts. With enabled fsync no broken parts
should be appeared.
Usage:
./install.sh
./durability-test.sh <table name> <file with create query> <file with insert query>
'
URL=http://cloud-images.ubuntu.com/bionic/current
IMAGE=bionic-server-cloudimg-amd64.img
SSH_PORT=11022