This commit is contained in:
alesapin 2020-10-28 14:03:26 +03:00
parent c10370f98d
commit 3c31a5134e

View File

@ -490,7 +490,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
{ {
std::sort(log_entries.begin(), log_entries.end()); std::sort(log_entries.begin(), log_entries.end());
for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries; entry_idx += current_multi_batch_size) for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries;)
{ {
auto begin = log_entries.begin() + entry_idx; auto begin = log_entries.begin() + entry_idx;
auto end = entry_idx + current_multi_batch_size >= log_entries.size() auto end = entry_idx + current_multi_batch_size >= log_entries.size()
@ -498,6 +498,9 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
: (begin + current_multi_batch_size); : (begin + current_multi_batch_size);
auto last = end - 1; auto last = end - 1;
/// Increment entry_idx before batch size increase (we copied at most current_multi_batch_size entries)
entry_idx += current_multi_batch_size;
/// Increase the batch size exponentially, so it will saturate to MAX_MULTI_OPS. /// Increase the batch size exponentially, so it will saturate to MAX_MULTI_OPS.
if (current_multi_batch_size < MAX_MULTI_OPS) if (current_multi_batch_size < MAX_MULTI_OPS)
current_multi_batch_size = std::min<size_t>(MAX_MULTI_OPS, current_multi_batch_size * 2); current_multi_batch_size = std::min<size_t>(MAX_MULTI_OPS, current_multi_batch_size * 2);