Review fixes.

This commit is contained in:
Vladimir Chebotarev 2019-12-05 11:05:07 +03:00
parent 065b681a8c
commit 7fd1668fde
9 changed files with 84 additions and 64 deletions

View File

@ -1500,7 +1500,7 @@ BackgroundProcessingPool & Context::getBackgroundPool()
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool.emplace(settings.background_pool_size, getConfigRef());
shared->background_pool.emplace(settings.background_pool_size);
return *shared->background_pool;
}
@ -1508,7 +1508,18 @@ BackgroundProcessingPool & Context::getBackgroundMovePool()
{
auto lock = getLock();
if (!shared->background_move_pool)
shared->background_move_pool.emplace(settings.background_move_pool_size, getConfigRef(), "BackgroundMovePool", "BgMoveProcPool");
{
BackgroundProcessingPool::PoolSettings pool_settings;
auto & config = getConfigRef();
pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10);
pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0);
pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10);
pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600);
pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool", "BgMoveProcPool");
}
return *shared->background_move_pool;
}

View File

@ -51,23 +51,16 @@ void BackgroundProcessingPoolTaskInfo::wake()
BackgroundProcessingPool::BackgroundProcessingPool(int size_,
const Poco::Util::AbstractConfiguration & config,
const PoolSettings & pool_settings,
const char * log_name,
const char * thread_name_)
: size(size_)
, thread_name(thread_name_)
, settings(pool_settings)
{
logger = &Logger::get(log_name);
LOG_INFO(logger, "Create " << log_name << " with " << size << " threads");
thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10);
thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0);
thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10);
task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600);
task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
threads.resize(size);
for (auto & thread : threads)
thread = ThreadFromGlobalPool([this] { threadFunction(); });
@ -147,7 +140,7 @@ void BackgroundProcessingPool::threadFunction()
memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
while (!shutdown)
{
@ -182,8 +175,8 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(thread_sleep_seconds
+ std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
std::chrono::duration<double>(settings.thread_sleep_seconds
+ std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
continue;
}
@ -193,7 +186,7 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, thread_sleep_seconds_random_part * 1000000)(rng)));
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, settings.thread_sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock rlock(task->rwlock);
@ -231,11 +224,11 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp next_time_to_execute; /// current time
if (task_result == TaskResult::ERROR)
next_time_to_execute += 1000000 * (std::min(
task_sleep_seconds_when_no_work_max,
task_sleep_seconds_when_no_work_min * std::pow(task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, task_sleep_seconds_when_no_work_random_part)(rng));
settings.task_sleep_seconds_when_no_work_max,
settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng));
else if (task_result == TaskResult::NOTHING_TO_DO)
next_time_to_execute += 1000000 * thread_sleep_seconds_if_nothing_to_do;
next_time_to_execute += 1000000 * settings.thread_sleep_seconds_if_nothing_to_do;
tasks.erase(task->iterator);
task->iterator = tasks.emplace(next_time_to_execute, task);

View File

@ -47,8 +47,23 @@ public:
using TaskHandle = std::shared_ptr<TaskInfo>;
struct PoolSettings
{
double thread_sleep_seconds = 10;
double thread_sleep_seconds_random_part = 1.0;
double thread_sleep_seconds_if_nothing_to_do = 0.1;
/// For exponential backoff.
double task_sleep_seconds_when_no_work_min = 10;
double task_sleep_seconds_when_no_work_max = 600;
double task_sleep_seconds_when_no_work_multiplier = 1.1;
double task_sleep_seconds_when_no_work_random_part = 1.0;
PoolSettings() noexcept {}
};
BackgroundProcessingPool(int size_,
const Poco::Util::AbstractConfiguration & config,
const PoolSettings & pool_settings = {},
const char * log_name = "BackgroundProcessingPool",
const char * thread_name_ = "BackgrProcPool");
@ -88,15 +103,7 @@ protected:
void threadFunction();
private:
double thread_sleep_seconds;
double thread_sleep_seconds_random_part;
double thread_sleep_seconds_if_nothing_to_do;
/// For exponential backoff.
double task_sleep_seconds_when_no_work_min;
double task_sleep_seconds_when_no_work_max;
double task_sleep_seconds_when_no_work_multiplier;
double task_sleep_seconds_when_no_work_random_part;
PoolSettings settings;
};

View File

@ -572,15 +572,17 @@ void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const Strin
void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls,
const ASTPtr & new_ttl_table_ast, bool only_check)
{
auto create_ttl_entry = [this](ASTPtr ttl_ast) -> TTLEntry
auto create_ttl_entry = [this](ASTPtr ttl_ast)
{
TTLEntry result;
auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, getColumns().getAllPhysical());
auto expr = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
result.destination_type = PartDestinationType::DELETE;
result.result_column = ttl_ast->getColumnName();
String result_column = ttl_ast->getColumnName();
checkTTLExpression(expr, result_column);
return {expr, result_column, PartDestinationType::DELETE, {}, {}};
checkTTLExpression(result.expression, result.result_column);
return result;
};
if (!new_column_ttls.empty())
@ -3131,7 +3133,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
namespace
{
inline DiskSpace::ReservationPtr returnReservationOrThrowError(UInt64 expected_size, DiskSpace::ReservationPtr reservation)
inline DiskSpace::ReservationPtr checkAndReturnReservation(UInt64 expected_size, DiskSpace::ReservationPtr reservation)
{
if (reservation)
return reservation;
@ -3148,7 +3150,23 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size) cons
auto reservation = storage_policy->reserve(expected_size);
return returnReservationOrThrowError(expected_size, std::move(reservation));
return checkAndReturnReservation(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = tryReserveSpace(expected_size, space);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return space->reserve(expected_size);
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(UInt64 expected_size,
@ -3159,7 +3177,7 @@ DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringMoveDestination(U
DiskSpace::ReservationPtr reservation = tryReserveSpacePreferringMoveDestination(expected_size, ttl_infos, time_of_move);
return returnReservationOrThrowError(expected_size, std::move(reservation));
return checkAndReturnReservation(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringMoveDestination(UInt64 expected_size,
@ -3198,22 +3216,6 @@ DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringMoveDestinatio
return reservation;
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = tryReserveSpaceInSpecificSpace(expected_size, space);
return returnReservationOrThrowError(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return space->reserve(expected_size);
}
DiskSpace::SpacePtr MergeTreeData::TTLEntry::getDestination(const DiskSpace::StoragePolicyPtr & policy) const
{
if (destination_type == PartDestinationType::VOLUME)
@ -3439,7 +3441,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpaceInSpecificSpace(src_part->bytes_on_disk, src_part->disk);
auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk);
String dst_part_path = getFullPathOnDisk(reservation->getDisk());
Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();

View File

@ -674,17 +674,21 @@ public:
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getDataPathsWithDisks() const;
/// Reserves space at least 1MB
/// Reserves space at least 1MB.
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size) const;
/// Reserves space at least 1MB on specific disk or volume.
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
DiskSpace::ReservationPtr tryReserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
DiskSpace::ReservationPtr reserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const;
DiskSpace::ReservationPtr tryReserveSpacePreferringMoveDestination(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const;
DiskSpace::ReservationPtr reserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
DiskSpace::ReservationPtr tryReserveSpaceInSpecificSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
/// Choose disk with max available free space
/// Reserves 0 bytes
DiskSpace::ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); }
@ -735,7 +739,10 @@ public:
ASTPtr entry_ast;
/// Returns destination disk or volume for this rule.
DiskSpace::SpacePtr getDestination(const DiskSpace::StoragePolicyPtr & policy) const;
/// Checks if given part already belongs destination disk or volume for this rule.
bool isPartInDestination(const DiskSpace::StoragePolicyPtr & policy, const MergeTreeDataPart & part) const;
};

View File

@ -130,7 +130,7 @@ bool MergeTreePartsMover::selectPartsForMove(
{
auto destination = ttl_entry_ptr->getDestination(policy);
if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
reservation = part->storage.tryReserveSpaceInSpecificSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
}
if (reservation)

View File

@ -350,7 +350,7 @@ public:
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
reserved_space = storage.tryReserveSpaceInSpecificSpace(total_size, future_part_.parts[0]->disk);
reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->disk);
else
{
MergeTreeDataPart::TTLInfos ttl_infos;

View File

@ -1147,7 +1147,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception.
DiskSpace::ReservationPtr reserved_space = reserveSpaceInSpecificSpace(estimated_space_for_result, source_part->disk);
DiskSpace::ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);

View File

@ -1,4 +1,4 @@
<yandex>
<background_processing_pool_thread_sleep_seconds>0.5</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_processing_pool_task_sleep_seconds_when_no_work_max>
<background_move_processing_pool_thread_sleep_seconds>0.5</background_move_processing_pool_thread_sleep_seconds>
<background_move_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_move_processing_pool_task_sleep_seconds_when_no_work_max>
</yandex>