Merge pull request #16332 from ClickHouse/fix-queue-processing-of-very-large-entries

Fix processing of very large entries in queue
This commit is contained in:
alexey-milovidov 2020-10-29 09:09:29 +03:00 committed by GitHub
commit 0faf2bc7e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 9 deletions

View File

@ -57,6 +57,7 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr &
return virtual_parts.getContainingPart(data_part->info) != data_part->name; return virtual_parts.getContainingPart(data_part->info) != data_part->name;
} }
bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{ {
auto queue_path = replica_path + "/queue"; auto queue_path = replica_path + "/queue";
@ -68,6 +69,9 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{ {
std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex); std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex);
/// Reset batch size on initialization to recover from possible errors of too large batch size.
current_multi_batch_size = 1;
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer"); String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str); log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
@ -486,20 +490,21 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
{ {
std::sort(log_entries.begin(), log_entries.end()); std::sort(log_entries.begin(), log_entries.end());
/// ZK contains a limit on the number or total size of operations in a multi-request. for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries;)
/// If the limit is exceeded, the connection is simply closed.
/// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total.
/// The average size of the node value in this case is less than 10 kilobytes.
static constexpr auto MAX_MULTI_OPS = 100;
for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += MAX_MULTI_OPS)
{ {
auto begin = log_entries.begin() + entry_idx; auto begin = log_entries.begin() + entry_idx;
auto end = entry_idx + MAX_MULTI_OPS >= log_entries.size() auto end = entry_idx + current_multi_batch_size >= log_entries.size()
? log_entries.end() ? log_entries.end()
: (begin + MAX_MULTI_OPS); : (begin + current_multi_batch_size);
auto last = end - 1; auto last = end - 1;
/// Increment entry_idx before batch size increase (we copied at most current_multi_batch_size entries)
entry_idx += current_multi_batch_size;
/// Increase the batch size exponentially, so it will saturate to MAX_MULTI_OPS.
if (current_multi_batch_size < MAX_MULTI_OPS)
current_multi_batch_size = std::min<size_t>(MAX_MULTI_OPS, current_multi_batch_size * 2);
String last_entry = *last; String last_entry = *last;
if (!startsWith(last_entry, "log-")) if (!startsWith(last_entry, "log-"))
throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log", throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log",

View File

@ -259,6 +259,19 @@ private:
~CurrentlyExecuting(); ~CurrentlyExecuting();
}; };
/// ZK contains a limit on the number or total size of operations in a multi-request.
/// If the limit is exceeded, the connection is simply closed.
/// The constant is selected with a margin. The default limit in ZK is 1 MB of data in total.
/// The average size of the node value in this case is less than 10 kilobytes.
static constexpr size_t MAX_MULTI_OPS = 100;
/// Very large queue entries may appear occasionally.
/// We cannot process MAX_MULTI_OPS at once because it will fail.
/// But we have to process more than one entry at once because otherwise lagged replicas keep up slowly.
/// Let's start with one entry per transaction and icrease it exponentially towards MAX_MULTI_OPS.
/// It will allow to make some progress before failing and remain operational even in extreme cases.
size_t current_multi_batch_size = 1;
public: public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeQueue(); ~ReplicatedMergeTreeQueue();