This commit is contained in:
Andrey Mironov 2015-09-01 19:09:12 +03:00
parent 1b394d9324
commit ef356f9267
3 changed files with 75 additions and 38 deletions

View File

@ -119,7 +119,10 @@ struct Settings
* (Чтобы большие запросы не вымывали кэш.) */ \
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024)) \
\
/** Распределять чтение из MergeTree по потокам равномерно, обеспечивая стабильное среднее время исполнения каждого потока в пределах одного чтения. */ \
M(SettingBool, merge_tree_uniform_read_distribution, false) \
/** Переиспользовать MergeTreeReader'ы после вычитывания подзадачи на чтение. */ \
M(SettingBool, merge_tree_uniform_read_reuse_readers, false) \
\
/** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \

View File

@ -69,7 +69,14 @@ protected:
injectVirtualColumns(res);
if (task->mark_ranges.empty())
{
if (0 == storage.context.getSettings().merge_tree_uniform_read_reuse_readers)
{
reader = {};
pre_reader = {};
}
task = {};
}
}
return res;

View File

@ -369,14 +369,30 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
column_names);
BlockInputStreams res;
/// @todo remove old code
if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 0)
if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1)
{
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
column_names);
for (std::size_t i = 0; i < threads; ++i)
res.emplace_back(new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_concurrent_read, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns
});
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*res.front()).setTotalRowsApprox(total_rows);
LOG_TRACE(log, "Reading approx. " << total_rows);
}
else if (sum_marks > 0)
{
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
@ -458,22 +474,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
if (!parts.empty())
throw Exception("Couldn't spread marks among threads", ErrorCodes::LOGICAL_ERROR);
}
else if (sum_marks > 0)
{
for (std::size_t i = 0; i < threads; ++i)
res.emplace_back(new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_concurrent_read, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns
});
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*res.front()).setTotalRowsApprox(total_rows);
LOG_TRACE(log, "Reading approx. " << total_rows);
}
return res;
}
@ -503,31 +503,58 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true,
column_names, true);
BlockInputStreams to_merge;
for (const auto i : ext::range(0, parts.size()))
if (settings.merge_tree_uniform_read_distribution == 1)
{
BlockInputStreamPtr source_stream{
new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_read_task, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns
}
};
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true,
column_names, true);
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
for (const auto i : ext::range(0, parts.size()))
{
BlockInputStreamPtr source_stream{
new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_read_task, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns
}
};
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*to_merge.front()).setTotalRowsApprox(total_rows);
LOG_TRACE(log, "Reading approx. " << total_rows);
}
else
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
RangesInDataPart & part = parts[part_index];
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size);
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*to_merge.front()).setTotalRowsApprox(total_rows);
for (const String & virt_column : virt_columns)
{
if (virt_column == "_part")
source_stream = new AddingConstColumnBlockInputStream<String>(
source_stream, new DataTypeString, part.data_part->name, "_part");
else if (virt_column == "_part_index")
source_stream = new AddingConstColumnBlockInputStream<UInt64>(
source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index");
}
LOG_TRACE(log, "Reading approx. " << total_rows);
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
}
BlockInputStreams res;
if (to_merge.size() == 1)