Remove settings from ReadFromMergeTree.

This commit is contained in:
Nikolai Kochetov 2021-05-28 17:34:02 +03:00
parent 91e8c2c75d
commit 295a302bc8
4 changed files with 161 additions and 173 deletions

View File

@ -49,38 +49,63 @@ struct ReadFromMergeTree::AnalysisResult
ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default;
};
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
{
const auto & settings = context->getSettingsRef();
return {
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
.min_bytes_to_use_mmap_io = settings.min_bytes_to_use_mmap_io,
.mmap_cache = context->getMMappedFileCache(),
.max_read_buffer_size = settings.max_read_buffer_size,
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
};
}
static const PrewhereInfoPtr & getPrewhereInfo(const SelectQueryInfo & query_info)
{
return query_info.projection ? query_info.projection->prewhere_info
: query_info.prewhere_info;
}
ReadFromMergeTree::ReadFromMergeTree(
const SelectQueryInfo & query_info_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
ContextPtr context_,
MergeTreeData::DataPartsVector parts_,
Names real_column_names_,
Names virt_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
StorageMetadataPtr metadata_snapshot_,
StorageMetadataPtr metadata_snapshot_base_,
Names real_column_names_,
MergeTreeData::DataPartsVector parts_,
PrewhereInfoPtr prewhere_info_,
Names virt_column_names_,
Settings settings_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(real_column_names_, data_.getVirtuals(), data_.getStorageID()),
prewhere_info_,
getPrewhereInfo(query_info_),
data_.getPartitionValueType(),
virt_column_names_)})
, query_info(std::move(query_info_))
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
, context(std::move(context_))
, reader_settings(getMergeTreeReaderSettings(context_))
, prepared_parts(std::move(parts_))
, real_column_names(std::move(real_column_names_))
, virt_column_names(std::move(virt_column_names_))
, data(data_)
, query_info(query_info_)
, prewhere_info(getPrewhereInfo(query_info))
, metadata_snapshot(std::move(metadata_snapshot_))
, metadata_snapshot_base(std::move(metadata_snapshot_base_))
, real_column_names(std::move(real_column_names_))
, prepared_parts(std::move(parts_))
, prewhere_info(std::move(prewhere_info_))
, virt_column_names(std::move(virt_column_names_))
, settings(std::move(settings_))
, context(std::move(context_))
, max_block_size(max_block_size_)
, requested_num_streams(num_streams_)
, preferred_block_size_bytes(context->getSettingsRef().preferred_block_size_bytes)
, preferred_max_column_in_block_size_bytes(context->getSettingsRef().preferred_max_column_in_block_size_bytes)
, sample_factor_column_queried(sample_factor_column_queried_)
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
, log(log_)
{
if (settings.sample_factor_column_queried)
if (sample_factor_column_queried)
{
/// Only _sample_factor virtual column is added by ReadFromMergeTree
/// Other virtual columns are added by MergeTreeBaseSelectProcessor.
@ -90,8 +115,11 @@ ReadFromMergeTree::ReadFromMergeTree(
}
Pipe ReadFromMergeTree::readFromPool(
RangesInDataParts parts_with_range, Names required_columns,
size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
RangesInDataParts parts_with_range,
Names required_columns,
size_t max_streams,
size_t min_marks_for_concurrent_read,
bool use_uncompressed_cache)
{
Pipes pipes;
size_t sum_marks = 0;
@ -103,8 +131,11 @@ Pipe ReadFromMergeTree::readFromPool(
total_rows += part.getRowsCount();
}
const auto & settings = context->getSettingsRef();
MergeTreeReadPool::BackoffSettings backoff_settings(settings);
auto pool = std::make_shared<MergeTreeReadPool>(
used_max_streams,
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
@ -113,20 +144,20 @@ Pipe ReadFromMergeTree::readFromPool(
prewhere_info,
true,
required_columns,
settings.backoff_settings,
backoff_settings,
settings.preferred_block_size_bytes,
false);
auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)");
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, used_max_streams);
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, max_streams);
for (size_t i = 0; i < used_max_streams; ++i)
for (size_t i = 0; i < max_streams; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, settings.max_block_size,
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
prewhere_info, settings.reader_settings, virt_column_names);
prewhere_info, reader_settings, virt_column_names);
if (i == 0)
{
@ -141,15 +172,22 @@ Pipe ReadFromMergeTree::readFromPool(
}
template<typename TSource>
ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache)
ProcessorPtr ReadFromMergeTree::createSource(
const RangesInDataPart & part,
const Names & required_columns,
bool use_uncompressed_cache)
{
return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query);
}
Pipe ReadFromMergeTree::readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache)
Pipe ReadFromMergeTree::readInOrder(
RangesInDataParts parts_with_range,
Names required_columns,
ReadType read_type,
bool use_uncompressed_cache)
{
Pipes pipes;
for (const auto & part : parts_with_range)
@ -176,10 +214,11 @@ Pipe ReadFromMergeTree::readInOrder(RangesInDataParts parts_with_range, Names re
Pipe ReadFromMergeTree::read(
RangesInDataParts parts_with_range, Names required_columns, ReadType read_type,
size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
{
if (read_type == ReadType::Default && used_max_streams > 1)
return readFromPool(parts_with_range, required_columns, used_max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
if (read_type == ReadType::Default && max_streams > 1)
return readFromPool(parts_with_range, required_columns, max_streams,
min_marks_for_concurrent_read, use_uncompressed_cache);
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache);
@ -195,7 +234,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names)
{
const auto & q_settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts_with_ranges.size());
@ -219,35 +258,35 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
index_granularity_bytes = data_settings->index_granularity_bytes;
const size_t max_marks_to_use_cache = MergeTreeDataSelectExecutor::roundRowsOrBytesToMarks(
q_settings.merge_tree_max_rows_to_use_cache,
q_settings.merge_tree_max_bytes_to_use_cache,
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data_settings->index_granularity,
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
q_settings.merge_tree_min_rows_for_concurrent_read,
q_settings.merge_tree_min_bytes_for_concurrent_read,
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
index_granularity_bytes,
sum_marks);
bool use_uncompressed_cache = q_settings.use_uncompressed_cache;
bool use_uncompressed_cache = settings.use_uncompressed_cache;
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
if (0 == sum_marks)
return {};
size_t used_num_streams = settings.num_streams;
if (used_num_streams > 1)
size_t num_streams = requested_num_streams;
if (num_streams > 1)
{
/// Reduce the number of num_streams if the data is small.
if (sum_marks < used_num_streams * min_marks_for_concurrent_read && parts_with_ranges.size() < used_num_streams)
used_num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts_with_ranges.size());
if (sum_marks < num_streams * min_marks_for_concurrent_read && parts_with_ranges.size() < num_streams)
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts_with_ranges.size());
}
return read(std::move(parts_with_ranges), column_names, ReadType::Default,
used_num_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
num_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
}
static ActionsDAGPtr createProjection(const Block & header)
@ -265,7 +304,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
ActionsDAGPtr & out_projection,
const InputOrderInfoPtr & input_order_info)
{
const auto & q_settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
size_t sum_marks = 0;
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts_with_ranges.size());
@ -285,14 +324,14 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
index_granularity_bytes = data_settings->index_granularity_bytes;
const size_t max_marks_to_use_cache = MergeTreeDataSelectExecutor::roundRowsOrBytesToMarks(
q_settings.merge_tree_max_rows_to_use_cache,
q_settings.merge_tree_max_bytes_to_use_cache,
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data_settings->index_granularity,
index_granularity_bytes);
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
q_settings.merge_tree_min_rows_for_concurrent_read,
q_settings.merge_tree_min_bytes_for_concurrent_read,
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
index_granularity_bytes,
sum_marks);
@ -307,7 +346,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
return {};
/// Let's split ranges to avoid reading much data.
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size = settings.max_block_size]
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size = max_block_size]
(const auto & ranges, int direction)
{
MarkRanges new_ranges;
@ -352,12 +391,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
return new_ranges;
};
const size_t min_marks_per_stream = (sum_marks - 1) / settings.num_streams + 1;
bool need_preliminary_merge = (parts_with_ranges.size() > q_settings.read_in_order_two_level_merge_threshold);
const size_t min_marks_per_stream = (sum_marks - 1) / requested_num_streams + 1;
bool need_preliminary_merge = (parts_with_ranges.size() > settings.read_in_order_two_level_merge_threshold);
Pipes pipes;
for (size_t i = 0; i < settings.num_streams && !parts_with_ranges.empty(); ++i)
for (size_t i = 0; i < requested_num_streams && !parts_with_ranges.empty(); ++i)
{
size_t need_marks = min_marks_per_stream;
RangesInDataParts new_parts;
@ -398,7 +437,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
while (need_marks > 0)
{
if (part.ranges.empty())
throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);
throw Exception("Unexpected end of ranges while spreading marks among streams",
ErrorCodes::LOGICAL_ERROR);
MarkRange & range = part.ranges.front();
@ -423,7 +463,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
: ReadFromMergeTree::ReadType::InReverseOrder;
pipes.emplace_back(read(std::move(new_parts), column_names, read_type,
settings.num_streams, min_marks_for_concurrent_read, use_uncompressed_cache));
requested_num_streams, min_marks_for_concurrent_read, use_uncompressed_cache));
}
if (need_preliminary_merge)
@ -451,7 +491,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
pipe.getHeader(),
pipe.numOutputPorts(),
sort_description,
settings.max_block_size);
max_block_size);
pipe.addTransform(std::move(transform));
}
@ -573,7 +613,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
const Names & column_names,
ActionsDAGPtr & out_projection)
{
const auto & q_settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
size_t sum_marks = 0;
size_t adaptive_parts = 0;
@ -591,8 +631,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
index_granularity_bytes = data_settings->index_granularity_bytes;
const size_t max_marks_to_use_cache = MergeTreeDataSelectExecutor::roundRowsOrBytesToMarks(
q_settings.merge_tree_max_rows_to_use_cache,
q_settings.merge_tree_max_bytes_to_use_cache,
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data_settings->index_granularity,
index_granularity_bytes);
@ -600,9 +640,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
size_t used_num_streams = settings.num_streams;
if (used_num_streams > q_settings.max_final_threads)
used_num_streams = q_settings.max_final_threads;
size_t num_streams = requested_num_streams;
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
/// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
/// We have all parts in parts vector, where parts with same partition are nearby.
@ -613,7 +653,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
auto it = parts_with_range.begin();
parts_to_merge_ranges.push_back(it);
if (q_settings.do_not_merge_across_partitions_select_final)
if (settings.do_not_merge_across_partitions_select_final)
{
while (it != parts_with_range.end())
{
@ -623,7 +663,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
}
/// We divide threads for each partition equally. But we will create at least the number of partitions threads.
/// (So, the total number of threads could be more than initial num_streams.
used_num_streams /= (parts_to_merge_ranges.size() - 1);
num_streams /= (parts_to_merge_ranges.size() - 1);
}
else
{
@ -650,7 +690,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// with level > 0 then we won't postprocess this part and if num_streams > 1 we
/// can use parallel select on such parts. We save such parts in one vector and then use
/// MergeTreeReadPool and MergeTreeThreadSelectBlockInputProcessor for parallel select.
if (used_num_streams > 1 && q_settings.do_not_merge_across_partitions_select_final &&
if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
@ -670,19 +710,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (new_parts.empty())
continue;
// ReadFromMergeTree::Settings step_settings
// {
// .max_block_size = max_block_size,
// .preferred_block_size_bytes = settings.preferred_block_size_bytes,
// .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
// .min_marks_for_concurrent_read = 0, /// this setting is not used for reading in order
// .use_uncompressed_cache = use_uncompressed_cache,
// .reader_settings = reader_settings,
// .backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
// };
pipe = read(std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder,
used_num_streams, 0, use_uncompressed_cache);
num_streams, 0, use_uncompressed_cache);
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
@ -699,7 +728,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part
if (q_settings.do_not_merge_across_partitions_select_final &&
if (settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
@ -720,8 +749,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
addMergingFinal(
pipe,
std::min<size_t>(used_num_streams, q_settings.max_final_threads),
sort_description, data.merging_params, partition_key_columns, settings.max_block_size);
std::min<size_t>(num_streams, settings.max_final_threads),
sort_description, data.merging_params, partition_key_columns, max_block_size);
partition_pipes.emplace_back(std::move(pipe));
}
@ -730,11 +759,11 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
{
RangesInDataParts new_parts;
size_t num_streams_for_lonely_parts = used_num_streams * lonely_parts.size();
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
q_settings.merge_tree_min_rows_for_concurrent_read,
q_settings.merge_tree_min_bytes_for_concurrent_read,
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
index_granularity_bytes,
sum_marks_in_lonely_parts);
@ -767,6 +796,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts) const
{
AnalysisResult result;
const auto & settings = context->getSettingsRef();
size_t total_parts = parts.size();
@ -807,7 +837,7 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
result.sampling = MergeTreeDataSelectExecutor::getSampling(
select, parts, metadata_snapshot, key_condition,
data, log, settings.sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context);
if (result.sampling.read_nothing)
return result;
@ -824,9 +854,9 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
query_info,
context,
key_condition,
settings.reader_settings,
reader_settings,
log,
settings.num_streams,
requested_num_streams,
result.index_stats,
true);
@ -865,8 +895,7 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
? query_info.input_order_info
: (query_info.projection ? query_info.projection->input_order_info : nullptr);
const auto & q_settings = context->getSettingsRef();
if ((q_settings.optimize_read_in_order || q_settings.optimize_aggregation_in_order) && input_order_info)
if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && input_order_info)
result.read_type = (input_order_info->direction > 0) ? ReadType::InOrder
: ReadType::InReverseOrder;
@ -906,7 +935,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
Pipe pipe;
const auto & q_settings = context->getSettingsRef();
const auto & settings = context->getSettingsRef();
if (select.final())
{
@ -927,7 +956,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
column_names_to_read,
result_projection);
}
else if ((q_settings.optimize_read_in_order || q_settings.optimize_aggregation_in_order) && input_order_info)
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && input_order_info)
{
size_t prefix_size = input_order_info->order_key_prefix_descr.size();
auto order_key_prefix_ast = metadata_snapshot->getSortingKey().expression_list_ast->clone();
@ -983,7 +1012,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
};
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (settings.sample_factor_column_queried)
if (sample_factor_column_queried)
{
ColumnWithTypeAndName column;
column.name = "_sample_factor";
@ -994,18 +1023,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build
append_actions(std::move(adding_column));
}
// TODO There seems to be no place initializing remove_columns_actions
// if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
// {
// auto remove_columns_action = std::make_shared<ExpressionActions>(
// query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone());
// pipe.addSimpleTransform([&](const Block & header)
// {
// return std::make_shared<ExpressionTransform>(header, remove_columns_action);
// });
// }
/// Extra columns may be returned (for example, if sampling is used).
/// Convert pipe to step header structure.
if (!isCompatibleHeader(cur_header, getOutputStream().header))

View File

@ -40,22 +40,6 @@ public:
using IndexStats = std::vector<IndexStat>;
/// Part of settings which are needed for reading.
struct Settings
{
UInt64 max_block_size;
size_t num_streams;
size_t preferred_block_size_bytes;
size_t preferred_max_column_in_block_size_bytes;
//size_t min_marks_for_concurrent_read;
bool use_uncompressed_cache;
bool force_primary_key;
bool sample_factor_column_queried;
MergeTreeReaderSettings reader_settings;
MergeTreeReadPool::BackoffSettings backoff_settings;
};
enum class ReadType
{
/// By default, read will use MergeTreeReadPool and return pipe with num_streams outputs.
@ -72,17 +56,18 @@ public:
};
ReadFromMergeTree(
const SelectQueryInfo & query_info_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
ContextPtr context_,
MergeTreeData::DataPartsVector parts_,
Names real_column_names_,
Names virt_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
StorageMetadataPtr metadata_snapshot_,
StorageMetadataPtr metadata_snapshot_base_,
Names real_column_names_,
MergeTreeData::DataPartsVector parts_,
PrewhereInfoPtr prewhere_info_,
Names virt_column_names_,
Settings settings_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_,
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_
);
@ -97,23 +82,33 @@ public:
void describeIndexes(JSONBuilder::JSONMap & map) const override;
private:
SelectQueryInfo query_info;
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read;
ContextPtr context;
const MergeTreeReaderSettings reader_settings;
MergeTreeData::DataPartsVector prepared_parts;
Names real_column_names;
Names virt_column_names;
const MergeTreeData & data;
SelectQueryInfo query_info;
PrewhereInfoPtr prewhere_info;
StorageMetadataPtr metadata_snapshot;
StorageMetadataPtr metadata_snapshot_base;
Names real_column_names;
MergeTreeData::DataPartsVector prepared_parts;
PrewhereInfoPtr prewhere_info;
Names virt_column_names;
Settings settings;
ContextPtr context;
const size_t max_block_size;
const size_t requested_num_streams;
const size_t preferred_block_size_bytes;
const size_t preferred_max_column_in_block_size_bytes;
const bool sample_factor_column_queried;
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read;
Poco::Logger * log;
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readFromPool(RangesInDataParts parts_with_ranges, Names required_columns, size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache);
Pipe readInOrder(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, bool use_uncompressed_cache);
template<typename TSource>

View File

@ -381,7 +381,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
const ASTSelectQuery & select,
MergeTreeData::DataPartsVector & parts,
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata_snapshot,
KeyCondition & key_condition,
const MergeTreeData & data,
@ -1189,43 +1189,19 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
const auto & settings = context->getSettingsRef();
MergeTreeReaderSettings reader_settings =
{
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
.min_bytes_to_use_mmap_io = settings.min_bytes_to_use_mmap_io,
.mmap_cache = context->getMMappedFileCache(),
.max_read_buffer_size = settings.max_read_buffer_size,
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
};
ReadFromMergeTree::Settings step_settings
{
.max_block_size = max_block_size,
.num_streams = num_streams,
.preferred_block_size_bytes = settings.preferred_block_size_bytes,
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
.use_uncompressed_cache = settings.use_uncompressed_cache,
.force_primary_key = settings.force_primary_key,
.sample_factor_column_queried = sample_factor_column_queried,
.reader_settings = reader_settings,
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
};
auto read_from_merge_tree = std::make_unique<ReadFromMergeTree>(
query_info,
max_block_numbers_to_read,
context,
parts,
real_column_names,
virt_column_names,
data,
query_info,
metadata_snapshot,
metadata_snapshot_base,
real_column_names,
parts,
query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info,
virt_column_names,
step_settings,
context,
max_block_size,
num_streams,
sample_factor_column_queried,
max_block_numbers_to_read,
log
);

View File

@ -176,7 +176,7 @@ public:
static MergeTreeDataSelectSamplingData getSampling(
const ASTSelectQuery & select,
MergeTreeData::DataPartsVector & parts,
const MergeTreeData::DataPartsVector & parts,
const StorageMetadataPtr & metadata_snapshot,
KeyCondition & key_condition,
const MergeTreeData & data,