Merge remote-tracking branch 'upstream/master' into METR-23466

This commit is contained in:
proller 2016-11-25 03:22:30 +03:00
commit 26fe10a7b4
6 changed files with 78 additions and 9 deletions

View File

@ -253,8 +253,6 @@ void SystemLog<LogElement>::prepareTable()
{
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
auto lock = context.getLock();
table = context.tryGetTable(database_name, table_name);
if (table)

View File

@ -30,6 +30,11 @@ public:
*/
size_t getMaxPartsSizeForMerge();
/** For explicitly passed size of pool and number of used tasks.
* This method could be used to calculate threshold depending on number of tasks in replication queue.
*/
size_t getMaxPartsSizeForMerge(size_t pool_size, size_t pool_used);
/** Выбирает, какие куски слить. Использует кучу эвристик.
*
* can_merge - функция, определяющая, можно ли объединить пару соседних кусков.

View File

@ -21,7 +21,7 @@ struct MergeTreeSettings
size_t max_bytes_to_merge_at_min_space_in_pool = 1024 * 1024;
/// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.
size_t max_replicated_merges_in_queue = 6;
size_t max_replicated_merges_in_queue = 16;
/// How many seconds to keep obsolete parts.
time_t old_parts_lifetime = 8 * 60;

View File

@ -85,12 +85,20 @@ size_t MergeTreeDataMerger::getMaxPartsSizeForMerge()
{
size_t total_threads_in_pool = pool.getNumberOfThreads();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
size_t free_threads_in_pool = 1 + total_threads_in_pool - busy_threads_in_pool; /// 1 is current thread
return getMaxPartsSizeForMerge(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
}
size_t MergeTreeDataMerger::getMaxPartsSizeForMerge(size_t pool_size, size_t pool_used)
{
if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxPartsSizeForMerge: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
size_t max_size = interpolateExponential(
data.settings.max_bytes_to_merge_at_min_space_in_pool,
data.settings.max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_threads_in_pool) / total_threads_in_pool);
static_cast<double>(pool_size - pool_used) / pool_size);
return std::min(max_size, static_cast<size_t>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
}

View File

@ -1695,6 +1695,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
/** If many merges is already queued, then will queue only small enough merges.
* Otherwise merge queue could be filled with only large merges,
* and in the same time, many small parts could be created and won't be merged.
*/
size_t merges_queued = queue.countMerges();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
@ -1708,10 +1712,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
MergeTreeData::DataPartsVector parts;
String merged_name;
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
if (merger.selectPartsToMerge(
parts, merged_name, false, std::min(disk_space, data.settings.max_bytes_to_merge_at_max_space_in_pool), can_merge)
parts, merged_name, false,
merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued),
can_merge)
&& createLogEntryToMergeParts(parts, merged_name))
{
success = true;
@ -2060,6 +2064,12 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
void StorageReplicatedMergeTree::shutdown()
{
/** This must be done before waiting for restarting_thread.
* Because restarting_thread will wait for finishing of tasks in background pool,
* and parts are fetched in that tasks.
*/
fetcher.cancel();
if (restarting_thread)
{
restarting_thread->stop();
@ -2071,7 +2081,6 @@ void StorageReplicatedMergeTree::shutdown()
endpoint_holder->cancel();
endpoint_holder = nullptr;
}
fetcher.cancel();
if (disk_space_monitor_endpoint_holder)
{

View File

@ -0,0 +1,49 @@
#!/usr/bin/perl
use strict;
use warnings;
use Data::Dumper;
my @current_stack = ();
my $grouped_stacks = {};
while (my $line = <>)
{
chomp $line;
if ($line =~ '^#')
{
$line =~ s/^#\d+\s+//;
$line =~ s/ \([^\)]+=[^\)]+\) / /g;
push @current_stack, $line;
}
if ($line eq '')
{
my $group = \$grouped_stacks;
for my $frame (reverse @current_stack)
{
$$group->{count} ||= 0;
++$$group->{count};
$group = \$$group->{children}{$frame};
}
@current_stack = ();
}
}
sub print_group
{
my $group = shift;
my $level = shift || 0;
for my $key (sort { $group->{children}{$b}{count} <=> $group->{children}{$a}{count} } keys %{$group->{children}})
{
my $count = $group->{count};
print(('| ' x $level) . $count . (' ' x (5 - (length $count))) . $key . "\n");
print_group($group->{children}{$key}, $level + 1);
}
}
print_group($grouped_stacks);