Fix backoff for failed background tasks in replicated merge tree

This commit is contained in:
alesapin 2021-02-11 14:46:18 +03:00
parent 17af32a59b
commit 3253638969
8 changed files with 51 additions and 9 deletions

View File

@ -98,11 +98,21 @@ try
{
try /// We don't want exceptions in background pool
{
job();
bool job_success = job();
/// Job done, decrement metric and reset no_work counter
CurrentMetrics::values[pool_config.tasks_metric]--;
/// Job done, new empty space in pool, schedule background task
runTaskWithoutDelay();
if (job_success)
{
/// Job done, new empty space in pool, schedule background task
runTaskWithoutDelay();
}
else
{
/// Job done, but failed, schedule with backoff
scheduleTask(/* with_backoff = */ true);
}
}
catch (...)
{

View File

@ -36,10 +36,12 @@ enum class PoolType
FETCH,
};
using BackgroundJobFunc = std::function<bool()>;
/// Result from background job providers. Function which will be executed in pool and pool type.
struct JobAndPool
{
ThreadPool::Job job;
BackgroundJobFunc job;
PoolType pool_type;
};

View File

@ -3796,7 +3796,7 @@ std::optional<JobAndPool> MergeTreeData::getDataMovingJob()
return JobAndPool{[this, moving_tagger] () mutable
{
moveParts(moving_tagger);
return moveParts(moving_tagger);
}, PoolType::MOVE};
}

View File

@ -962,9 +962,11 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
if (merge_entry)
mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
else if (mutate_entry)
mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
__builtin_unreachable();
}, PoolType::MERGE_MUTATE};
}
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
@ -978,6 +980,7 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
clearOldWriteAheadLogs();
clearOldMutations();
clearEmptyParts();
return true;
}, PoolType::MERGE_MUTATE};
}
return {};

View File

@ -2682,7 +2682,7 @@ std::optional<JobAndPool> StorageReplicatedMergeTree::getDataProcessingJob()
return JobAndPool{[this, selected_entry] () mutable
{
processQueueEntry(selected_entry);
return processQueueEntry(selected_entry);
}, pool_type};
}

View File

@ -32,7 +32,7 @@ protected:
std::optional<JobAndPool> getBackgroundJob() override
{
return JobAndPool{[] { std::this_thread::sleep_for(1s); counter++; }, PoolType::MERGE_MUTATE};
return JobAndPool{[] { std::this_thread::sleep_for(1s); counter++; return true; }, PoolType::MERGE_MUTATE};
}
};

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS i20203_1;
DROP TABLE IF EXISTS i20203_2;
CREATE TABLE i20203_1 (a Int8)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/01715_background_checker/i20203','r1')
ORDER BY tuple();
CREATE TABLE i20203_2 (a Int8)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/01715_background_checker/i20203','r2')
ORDER BY tuple();
DETACH TABLE i20203_2;
INSERT INTO i20203_1 VALUES (2);
DETACH TABLE i20203_1;
ATTACH TABLE i20203_2;
-- sleep 10 seconds
SELECT number from numbers(10) where sleepEachRow(1) Format Null;
SELECT num_tries < 50
FROM system.replication_queue
WHERE table = 'i20203_2' AND database = currentDatabase();
DROP TABLE IF EXISTS i20203_1;
DROP TABLE IF EXISTS i20203_2;