ClickHouse/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp

87 lines
3.4 KiB
C++
Raw Normal View History

2019-10-01 16:50:08 +00:00
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
2019-10-10 16:30:30 +00:00
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/Context.h>
2016-11-20 12:43:20 +00:00
namespace DB
{
2019-10-01 16:50:08 +00:00
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
2018-10-17 03:13:00 +00:00
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
2019-02-10 16:55:12 +00:00
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
2019-07-19 14:56:00 +00:00
Names required_columns_,
2019-10-04 15:40:05 +00:00
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
2021-06-25 14:49:28 +00:00
ExpressionActionsSettings actions_settings,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
2017-04-05 20:34:19 +00:00
size_t part_index_in_query_,
bool has_limit_below_one_block_,
std::optional<ParallelReadingExtension> extension_)
: MergeTreeBaseSelectProcessor{
storage_snapshot_->getSampleBlockForColumns(required_columns_),
storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
2019-12-19 13:10:57 +00:00
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_, extension_},
2019-10-01 16:50:08 +00:00
required_columns{std::move(required_columns_)},
2017-04-05 20:34:19 +00:00
data_part{owned_data_part_},
sample_block(storage_snapshot_->metadata->getSampleBlock()),
2019-10-04 15:40:05 +00:00
all_mark_ranges(std::move(mark_ranges_)),
2017-04-05 20:34:19 +00:00
part_index_in_query(part_index_in_query_),
has_limit_below_one_block(has_limit_below_one_block_),
total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
{
/// Actually it means that parallel reading from replicas enabled
/// and we have to collaborate with initiator.
/// In this case we won't set approximate rows, because it will be accounted multiple times
if (!extension_.has_value())
addTotalRowsApprox(total_rows);
ordered_names = header_without_virtual_columns.getNames();
2018-04-16 12:21:36 +00:00
}
2021-08-04 15:18:03 +00:00
void MergeTreeSelectProcessor::initializeReaders()
2016-11-20 12:43:20 +00:00
{
task_columns = getReadTaskColumns(
storage, storage_snapshot, data_part,
required_columns, prewhere_info);
2017-04-05 20:34:19 +00:00
2021-08-04 15:18:03 +00:00
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter
2019-07-19 14:56:00 +00:00
const auto & column_names = task_columns.columns.getNames();
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
2019-10-01 16:50:08 +00:00
void MergeTreeSelectProcessor::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
data_part.reset();
2016-11-20 12:43:20 +00:00
}
2019-10-01 16:50:08 +00:00
MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default;
2016-11-20 12:43:20 +00:00
}