diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index e8ba306aecb..9d54bc285e3 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1500,7 +1500,7 @@ BackgroundProcessingPool & Context::getBackgroundPool() { auto lock = getLock(); if (!shared->background_pool) - shared->background_pool.emplace(settings.background_pool_size, getConfigRef()); + shared->background_pool.emplace(settings.background_pool_size); return *shared->background_pool; } @@ -1508,7 +1508,18 @@ BackgroundProcessingPool & Context::getBackgroundMovePool() { auto lock = getLock(); if (!shared->background_move_pool) - shared->background_move_pool.emplace(settings.background_move_pool_size, getConfigRef(), "BackgroundMovePool", "BgMoveProcPool"); + { + BackgroundProcessingPool::PoolSettings pool_settings; + auto & config = getConfigRef(); + pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10); + pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0); + pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); + pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10); + pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600); + pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); + pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); + shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool", "BgMoveProcPool"); + } return *shared->background_move_pool; } diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index 9a4aa1d9dca..44b83399afc 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -51,23 +51,16 @@ void BackgroundProcessingPoolTaskInfo::wake() BackgroundProcessingPool::BackgroundProcessingPool(int size_, - const Poco::Util::AbstractConfiguration & config, + const PoolSettings & pool_settings, const char * log_name, const char * thread_name_) : size(size_) , thread_name(thread_name_) + , settings(pool_settings) { logger = &Logger::get(log_name); LOG_INFO(logger, "Create " << log_name << " with " << size << " threads"); - thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10); - thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0); - thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); - task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10); - task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600); - task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); - task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); - threads.resize(size); for (auto & thread : threads) thread = ThreadFromGlobalPool([this] { threadFunction(); }); @@ -147,7 +140,7 @@ void BackgroundProcessingPool::threadFunction() memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool); pcg64 rng(randomSeed()); - std::this_thread::sleep_for(std::chrono::duration(std::uniform_real_distribution(0, thread_sleep_seconds_random_part)(rng))); + std::this_thread::sleep_for(std::chrono::duration(std::uniform_real_distribution(0, settings.thread_sleep_seconds_random_part)(rng))); while (!shutdown) { @@ -182,8 +175,8 @@ void BackgroundProcessingPool::threadFunction() { std::unique_lock lock(tasks_mutex); wake_event.wait_for(lock, - std::chrono::duration(thread_sleep_seconds - + std::uniform_real_distribution(0, thread_sleep_seconds_random_part)(rng))); + std::chrono::duration(settings.thread_sleep_seconds + + std::uniform_real_distribution(0, settings.thread_sleep_seconds_random_part)(rng))); continue; } @@ -193,7 +186,7 @@ void BackgroundProcessingPool::threadFunction() { std::unique_lock lock(tasks_mutex); wake_event.wait_for(lock, std::chrono::microseconds( - min_time - current_time + std::uniform_int_distribution(0, thread_sleep_seconds_random_part * 1000000)(rng))); + min_time - current_time + std::uniform_int_distribution(0, settings.thread_sleep_seconds_random_part * 1000000)(rng))); } std::shared_lock rlock(task->rwlock); @@ -231,11 +224,11 @@ void BackgroundProcessingPool::threadFunction() Poco::Timestamp next_time_to_execute; /// current time if (task_result == TaskResult::ERROR) next_time_to_execute += 1000000 * (std::min( - task_sleep_seconds_when_no_work_max, - task_sleep_seconds_when_no_work_min * std::pow(task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done)) - + std::uniform_real_distribution(0, task_sleep_seconds_when_no_work_random_part)(rng)); + settings.task_sleep_seconds_when_no_work_max, + settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done)) + + std::uniform_real_distribution(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)); else if (task_result == TaskResult::NOTHING_TO_DO) - next_time_to_execute += 1000000 * thread_sleep_seconds_if_nothing_to_do; + next_time_to_execute += 1000000 * settings.thread_sleep_seconds_if_nothing_to_do; tasks.erase(task->iterator); task->iterator = tasks.emplace(next_time_to_execute, task); diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h index 9929e380f25..619e267ffe5 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h @@ -47,8 +47,23 @@ public: using TaskHandle = std::shared_ptr; + struct PoolSettings + { + double thread_sleep_seconds = 10; + double thread_sleep_seconds_random_part = 1.0; + double thread_sleep_seconds_if_nothing_to_do = 0.1; + + /// For exponential backoff. + double task_sleep_seconds_when_no_work_min = 10; + double task_sleep_seconds_when_no_work_max = 600; + double task_sleep_seconds_when_no_work_multiplier = 1.1; + double task_sleep_seconds_when_no_work_random_part = 1.0; + + PoolSettings() noexcept {} + }; + BackgroundProcessingPool(int size_, - const Poco::Util::AbstractConfiguration & config, + const PoolSettings & pool_settings = {}, const char * log_name = "BackgroundProcessingPool", const char * thread_name_ = "BackgrProcPool"); @@ -88,15 +103,7 @@ protected: void threadFunction(); private: - double thread_sleep_seconds; - double thread_sleep_seconds_random_part; - double thread_sleep_seconds_if_nothing_to_do; - - /// For exponential backoff. - double task_sleep_seconds_when_no_work_min; - double task_sleep_seconds_when_no_work_max; - double task_sleep_seconds_when_no_work_multiplier; - double task_sleep_seconds_when_no_work_random_part; + PoolSettings settings; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index fa3e9112297..e3cd882f47c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -572,15 +572,17 @@ void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const Strin void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls, const ASTPtr & new_ttl_table_ast, bool only_check) { - auto create_ttl_entry = [this](ASTPtr ttl_ast) -> TTLEntry + auto create_ttl_entry = [this](ASTPtr ttl_ast) { + TTLEntry result; + auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, getColumns().getAllPhysical()); - auto expr = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false); + result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false); + result.destination_type = PartDestinationType::DELETE; + result.result_column = ttl_ast->getColumnName(); - String result_column = ttl_ast->getColumnName(); - checkTTLExpression(expr, result_column); - - return {expr, result_column, PartDestinationType::DELETE, {}, {}}; + checkTTLExpression(result.expression, result.result_column); + return result; }; if (!new_column_ttls.empty()) @@ -3131,7 +3133,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const namespace { -inline DiskSpace::ReservationPtr returnReservationOrThrowError(UInt64 expected_size, DiskSpace::ReservationPtr reservation) +inline DiskSpace::ReservationPtr checkAndReturnReservation(UInt64 expected_size, DiskSpace::ReservationPtr reservation) { if (reservation) return reservation; @@ -3148,7 +3150,23 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size) cons auto reservation = storage_policy->reserve(expected_size); - return returnReservationOrThrowError(expected_size, std::move(reservation)); + return checkAndReturnReservation(expected_size, std::move(reservation)); +} + +DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const +{ + expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); + + auto reservation = tryReserveSpace(expected_size, space); + + return checkAndReturnReservation(expected_size, std::move(reservation)); +} + +DiskSpace::ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const +{ + expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); + + return space->reserve(expected_size); } DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(UInt64 expected_size, @@ -3159,7 +3177,7 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(U DiskSpace::ReservationPtr reservation = tryReserveSpacePreferringMoveDestination(expected_size, ttl_infos, time_of_move); - return returnReservationOrThrowError(expected_size, std::move(reservation)); + return checkAndReturnReservation(expected_size, std::move(reservation)); } DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringMoveDestination(UInt64 expected_size, @@ -3198,22 +3216,6 @@ DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringMoveDestinatio return reservation; } -DiskSpace::ReservationPtr MergeTreeData::reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const -{ - expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); - - auto reservation = tryReserveSpaceInSpecificSpace(expected_size, space); - - return returnReservationOrThrowError(expected_size, std::move(reservation)); -} - -DiskSpace::ReservationPtr MergeTreeData::tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const -{ - expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size); - - return space->reserve(expected_size); -} - DiskSpace::SpacePtr MergeTreeData::TTLEntry::getDestination(const DiskSpace::StoragePolicyPtr & policy) const { if (destination_type == PartDestinationType::VOLUME) @@ -3439,7 +3441,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk( String dst_part_name = src_part->getNewName(dst_part_info); String tmp_dst_part_name = tmp_part_prefix + dst_part_name; - auto reservation = reserveSpaceInSpecificSpace(src_part->bytes_on_disk, src_part->disk); + auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk); String dst_part_path = getFullPathOnDisk(reservation->getDisk()); Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute(); Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 3525e38d2bc..561572929e6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -674,17 +674,21 @@ public: using PathsWithDisks = std::vector; PathsWithDisks getDataPathsWithDisks() const; - /// Reserves space at least 1MB + /// Reserves space at least 1MB. DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size) const; + + /// Reserves space at least 1MB on specific disk or volume. + DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const; + DiskSpace::ReservationPtr tryReserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const; + + + /// Reserves space at least 1MB preferring best destination according to `ttl_infos`. DiskSpace::ReservationPtr reserveSpacePreferringMoveDestination(UInt64 expected_size, const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; DiskSpace::ReservationPtr tryReserveSpacePreferringMoveDestination(UInt64 expected_size, const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; - DiskSpace::ReservationPtr reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const; - DiskSpace::ReservationPtr tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const; - /// Choose disk with max available free space /// Reserves 0 bytes DiskSpace::ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); } @@ -735,7 +739,10 @@ public: ASTPtr entry_ast; + /// Returns destination disk or volume for this rule. DiskSpace::SpacePtr getDestination(const DiskSpace::StoragePolicyPtr & policy) const; + + /// Checks if given part already belongs destination disk or volume for this rule. bool isPartInDestination(const DiskSpace::StoragePolicyPtr & policy, const MergeTreeDataPart & part) const; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index de987acac72..5d07ea1a8a2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -130,7 +130,7 @@ bool MergeTreePartsMover::selectPartsForMove( { auto destination = ttl_entry_ptr->getDestination(policy); if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part)) - reservation = part->storage.tryReserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); + reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); } if (reservation) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 4e4bea7b023..82f0263ef95 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -350,7 +350,7 @@ public: /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks if (is_mutation) - reserved_space = storage.tryReserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk); + reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->disk); else { MergeTreeDataPart::TTLInfos ttl_infos; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index cad4dacdfa8..43f195039cb 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1147,7 +1147,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM /// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks. /// Can throw an exception. - DiskSpace::ReservationPtr reserved_space = reserveSpaceInSpecificSpace(estimated_space_for_result, source_part->disk); + DiskSpace::ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk); auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY); diff --git a/dbms/tests/integration/test_ttl_move/configs/config.d/instant_moves.xml b/dbms/tests/integration/test_ttl_move/configs/config.d/instant_moves.xml index ac5005061e9..7b68c6946ca 100644 --- a/dbms/tests/integration/test_ttl_move/configs/config.d/instant_moves.xml +++ b/dbms/tests/integration/test_ttl_move/configs/config.d/instant_moves.xml @@ -1,4 +1,4 @@ - 0.5 - 0.5 + 0.5 + 0.5