Merge branch 'ClickHouse:master' into master

This commit is contained in:
OnePiece 2021-08-31 16:38:26 +08:00 committed by GitHub
commit 5a4580c454
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 22 deletions

View File

@ -1,5 +1,5 @@
#!/bin/bash
# shellcheck disable=SC2086
# shellcheck disable=SC2086,SC2001
set -eux
set -o pipefail
@ -76,7 +76,10 @@ function filter_exists_and_template
local path
for path in "$@"; do
if [ -e "$path" ]; then
echo "$path" | sed -n 's/\.sql\.j2$/.gen.sql/'
# SC2001 shellcheck suggests:
# echo ${path//.sql.j2/.gen.sql}
# but it doesn't allow to use regex
echo "$path" | sed 's/\.sql\.j2$/.gen.sql/'
else
echo "'$path' does not exists" >&2
fi

View File

@ -84,28 +84,55 @@ bool incrementMetricIfLessThanMax(std::atomic<Int64> & atomic_value, Int64 max_v
}
/// This is a RAII class which only decrements metric.
/// It is added because after all other fixes a bug non-executing merges was occurred again.
/// Last hypothesis: task was successfully added to pool, however, was not executed because of internal exception in it.
class ParanoidMetricDecrementor
{
public:
explicit ParanoidMetricDecrementor(CurrentMetrics::Metric metric_) : metric(metric_) {}
void alarm() { is_alarmed = true; }
void decrement()
{
if (is_alarmed.exchange(false))
{
CurrentMetrics::values[metric]--;
}
}
~ParanoidMetricDecrementor() { decrement(); }
private:
CurrentMetrics::Metric metric;
std::atomic_bool is_alarmed = false;
};
void IBackgroundJobExecutor::execute(JobAndPool job_and_pool)
try
{
auto & pool_config = pools_configs[job_and_pool.pool_type];
const auto max_pool_size = pool_config.get_max_pool_size();
auto metric_decrementor = std::make_shared<ParanoidMetricDecrementor>(pool_config.tasks_metric);
/// If corresponding pool is not full increment metric and assign new job
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size))
{
metric_decrementor->alarm();
try /// this try required because we have to manually decrement metric
{
/// Synchronize pool size, because config could be reloaded
pools[job_and_pool.pool_type].setMaxThreads(max_pool_size);
pools[job_and_pool.pool_type].setQueueSize(max_pool_size);
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool.job)}] ()
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, metric_decrementor, job{std::move(job_and_pool.job)}] ()
{
try /// We don't want exceptions in background pool
{
bool job_success = job();
/// Job done, decrement metric and reset no_work counter
CurrentMetrics::values[pool_config.tasks_metric]--;
metric_decrementor->decrement();
if (job_success)
{
@ -121,7 +148,7 @@ try
}
catch (...)
{
CurrentMetrics::values[pool_config.tasks_metric]--;
metric_decrementor->decrement();
tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true);
}
@ -133,7 +160,7 @@ try
catch (...)
{
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
CurrentMetrics::values[pool_config.tasks_metric]--;
metric_decrementor->decrement();
tryLogCurrentException(__PRETTY_FUNCTION__);
scheduleTask(/* with_backoff = */ true);
}

View File

@ -192,7 +192,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
if (missing_part_search_result == MissingPartSearchResult::LostForever)
{
auto lost_part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
if (lost_part_info.level != 0)
if (lost_part_info.level != 0 || lost_part_info.mutation != 0)
{
Strings source_parts;
bool part_in_queue = storage.queue.checkPartInQueueAndGetSourceParts(part_name, source_parts);