Merge branch 'master' into timezone_00719

This commit is contained in:
mergify[bot] 2021-08-11 03:52:50 +00:00 committed by GitHub
commit 127e050580
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 11 deletions

View File

@ -146,6 +146,9 @@ try
catch (...) /// Exception while we looking for a task, reschedule catch (...) /// Exception while we looking for a task, reschedule
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
/// Why do we scheduleTask again?
/// To retry on exception, since it may be some temporary exception.
scheduleTask(/* with_backoff = */ true); scheduleTask(/* with_backoff = */ true);
} }
@ -180,10 +183,16 @@ void IBackgroundJobExecutor::triggerTask()
} }
void IBackgroundJobExecutor::backgroundTaskFunction() void IBackgroundJobExecutor::backgroundTaskFunction()
try
{ {
if (!scheduleJob()) if (!scheduleJob())
scheduleTask(/* with_backoff = */ true); scheduleTask(/* with_backoff = */ true);
} }
catch (...) /// Catch any exception to avoid thread termination.
{
tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true);
}
IBackgroundJobExecutor::~IBackgroundJobExecutor() IBackgroundJobExecutor::~IBackgroundJobExecutor()
{ {

View File

@ -958,11 +958,21 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
} }
if (!commands_for_size_validation.empty()) if (!commands_for_size_validation.empty())
{
try
{ {
MutationsInterpreter interpreter( MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, getContext(), false); shared_from_this(), metadata_snapshot, commands_for_size_validation, getContext(), false);
commands_size += interpreter.evaluateCommandsSize(); commands_size += interpreter.evaluateCommandsSize();
} }
catch (...)
{
MergeTreeMutationEntry & entry = it->second;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = getCurrentExceptionMessage(false);
continue;
}
}
if (current_ast_elements + commands_size >= max_ast_elements) if (current_ast_elements + commands_size >= max_ast_elements)
break; break;
@ -971,6 +981,8 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end()); commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
} }
if (!commands.empty())
{
auto new_part_info = part->info; auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first; new_part_info.mutation = current_mutations_by_version.rbegin()->first;
@ -982,6 +994,8 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands); return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands);
} }
}
return {}; return {};
} }
@ -1036,6 +1050,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & execut
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
bool has_mutations;
{ {
std::unique_lock lock(currently_processing_in_background_mutex); std::unique_lock lock(currently_processing_in_background_mutex);
if (merger_mutator.merges_blocker.isCancelled()) if (merger_mutator.merges_blocker.isCancelled())
@ -1044,6 +1059,15 @@ bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & execut
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock); merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock);
if (!merge_entry) if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock); mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
has_mutations = !current_mutations_by_version.empty();
}
if (!mutate_entry && has_mutations)
{
/// Notify in case of errors
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
} }
if (merge_entry) if (merge_entry)

View File

@ -0,0 +1,33 @@
SET mutations_sync=2;
DROP TABLE IF EXISTS rep_data;
CREATE TABLE rep_data
(
p Int,
t DateTime,
INDEX idx t TYPE minmax GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rep_data', '1')
PARTITION BY p
ORDER BY t
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0;
INSERT INTO rep_data VALUES (1, now());
ALTER TABLE rep_data MATERIALIZE INDEX idx IN PARTITION ID 'NO_SUCH_PART'; -- { serverError 248 }
ALTER TABLE rep_data MATERIALIZE INDEX idx IN PARTITION ID '1';
ALTER TABLE rep_data MATERIALIZE INDEX idx IN PARTITION ID '2';
DROP TABLE IF EXISTS data;
CREATE TABLE data
(
p Int,
t DateTime,
INDEX idx t TYPE minmax GRANULARITY 1
)
ENGINE = MergeTree
PARTITION BY p
ORDER BY t
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0;
INSERT INTO data VALUES (1, now());
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID 'NO_SUCH_PART'; -- { serverError 341 }
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID '1';
ALTER TABLE data MATERIALIZE INDEX idx IN PARTITION ID '2';