Get rid of background processing pool

This commit is contained in:
alesapin 2020-10-14 15:44:10 +03:00
parent 4014e0f08d
commit 0b14a31ba9
7 changed files with 29 additions and 95 deletions

View File

@ -18,7 +18,6 @@
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionCodecSelector.h>
@ -331,8 +330,6 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used in kafka streaming)
@ -433,8 +430,6 @@ struct ContextShared
external_dictionaries_loader.reset();
external_models_loader.reset();
buffer_flush_schedule_pool.reset();
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
ddl_worker.reset();
@ -1369,45 +1364,6 @@ void Context::dropCaches() const
shared->mark_cache->reset();
}
BackgroundProcessingPool & Context::getBackgroundPool()
{
auto lock = getLock();
if (!shared->background_pool)
{
BackgroundProcessingPool::PoolSettings pool_settings;
const auto & config = getConfigRef();
pool_settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10);
pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0);
pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10);
pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600);
pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
shared->background_pool.emplace(settings.background_pool_size, pool_settings);
}
return *shared->background_pool;
}
BackgroundProcessingPool & Context::getBackgroundMovePool()
{
auto lock = getLock();
if (!shared->background_move_pool)
{
BackgroundProcessingPool::PoolSettings pool_settings;
const auto & config = getConfigRef();
pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10);
pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0);
pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10);
pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600);
pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
pool_settings.tasks_metric = CurrentMetrics::BackgroundMovePoolTask;
shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool", "BgMoveProcPool");
}
return *shared->background_move_pool;
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool()
{
auto lock = getLock();

View File

@ -62,7 +62,6 @@ class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalModelsLoader;
class InterserverIOHandler;
class BackgroundProcessingPool;
class BackgroundSchedulePool;
class MergeList;
class Cluster;
@ -508,8 +507,6 @@ public:
void dropCaches() const;
BackgroundSchedulePool & getBufferFlushSchedulePool();
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();
BackgroundSchedulePool & getMessageBrokerSchedulePool();
BackgroundSchedulePool & getDistributedSchedulePool();

View File

@ -3,11 +3,38 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <pcg_random.hpp>
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
namespace DB
{
enum PoolType
{
MERGE_MUTATING,
MOVING,
}
struct PoolSettings
{
double thread_sleep_seconds = 10;
double thread_sleep_seconds_random_part = 1.0;
double thread_sleep_seconds_if_nothing_to_do = 0.1;
/// For exponential backoff.
double task_sleep_seconds_when_no_work_min = 10;
double task_sleep_seconds_when_no_work_max = 600;
double task_sleep_seconds_when_no_work_multiplier = 1.1;
double task_sleep_seconds_when_no_work_random_part = 1.0;
CurrentMetrics::Metric tasks_metric = CurrentMetrics::BackgroundPoolTask;
PoolSettings() noexcept {}
};
class BackgroundJobsExecutor
{
@ -19,7 +46,7 @@ private:
ThreadPool move_pool;
std::atomic<size_t> errors_count{0};
pcg64 rng;
BackgroundProcessingPool::PoolSettings settings;
PoolSettings settings;
BackgroundSchedulePool::TaskHolder data_processing_task;
BackgroundSchedulePool::TaskHolder data_moving_task;

View File

@ -762,23 +762,6 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p
return currently_merging_mutating_parts.count(part);
}
BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
{
try
{
if (!selectPartsAndMove())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
catch (...)
{
tryLogCurrentException(log);
return BackgroundProcessingPoolTaskResult::ERROR;
}
}
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */)
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);

View File

@ -13,7 +13,6 @@
#include <Storages/MergeTree/MergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/SimpleIncrement.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
@ -131,8 +130,6 @@ private:
ActionLock stopMergesAndWait();
BackgroundProcessingPoolTaskResult movePartsTask();
/// Allocate block number for new mutation, write mutation to disk
/// and into in-memory structures. Wake up merge-mutation task.
Int64 startMutation(const MutationCommands & commands, String & mutation_file_name);

View File

@ -2626,23 +2626,6 @@ bool StorageReplicatedMergeTree::partIsAssignedToBackgroundOperation(const DataP
return queue.isVirtualPart(part);
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movePartsTask()
{
try
{
if (!selectPartsAndMove())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
catch (...)
{
tryLogCurrentException(log);
return BackgroundProcessingPoolTaskResult::ERROR;
}
}
void StorageReplicatedMergeTree::mergeSelectingTask()
{
if (!is_leader)

View File

@ -16,7 +16,6 @@
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/LeaderElection.h>
@ -284,10 +283,6 @@ private:
BackgroundSchedulePool::TaskHolder mutations_updating_task;
/// A task which move parts to another disks/volumes
/// Transparent for replication.
BackgroundProcessingPool::TaskHandle move_parts_task_handle;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
@ -423,10 +418,6 @@ private:
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry);
/// Perform moves of parts to another disks.
/// Local operation, doesn't interact with replicationg queue.
BackgroundProcessingPoolTaskResult movePartsTask();
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
/// or an exception is thrown and leader_election is destroyed.