ClickHouse/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp

485 lines
19 KiB
C++
Raw Normal View History

2019-10-01 16:50:08 +00:00
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
2018-02-13 19:34:15 +00:00
#include <Storages/MergeTree/MergeTreeRangeReader.h>
2019-10-10 16:30:30 +00:00
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Columns/FilterDescription.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h>
2020-06-30 16:41:43 +00:00
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
{
namespace ErrorCodes
{
2021-06-18 14:28:52 +00:00
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int LOGICAL_ERROR;
}
2019-10-01 16:50:08 +00:00
MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
Block header,
2019-08-03 11:02:40 +00:00
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
2021-06-25 14:49:28 +00:00
ExpressionActionsSettings actions_settings,
2019-08-03 11:02:40 +00:00
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
2019-08-03 11:02:40 +00:00
bool use_uncompressed_cache_,
const Names & virt_column_names_)
2021-04-27 08:15:59 +00:00
: SourceWithProgress(transformHeader(std::move(header), prewhere_info_, storage_.getPartitionValueType(), virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
, reader_settings(reader_settings_)
, use_uncompressed_cache(use_uncompressed_cache_)
, virt_column_names(virt_column_names_)
2021-04-27 08:15:59 +00:00
, partition_value_type(storage.getPartitionValueType())
{
header_without_virtual_columns = getPort().getHeader();
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
if (header_without_virtual_columns.has(*it))
header_without_virtual_columns.erase(*it);
2021-06-25 14:49:28 +00:00
if (prewhere_info)
{
2021-06-29 11:53:34 +00:00
prewhere_actions = std::make_unique<PrewhereExprInfo>();
2021-06-25 14:49:28 +00:00
if (prewhere_info->alias_actions)
prewhere_actions->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
if (prewhere_info->row_level_filter)
prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
prewhere_actions->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name;
prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name;
prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column;
prewhere_actions->need_filter = prewhere_info->need_filter;
}
}
2019-10-01 16:50:08 +00:00
Chunk MergeTreeBaseSelectProcessor::generate()
{
while (!isCancelled())
{
if ((!task || task->isFinished()) && !getNewTask())
return {};
auto res = readFromPart();
2019-10-21 16:26:29 +00:00
if (res.hasRows())
{
2021-04-27 08:15:59 +00:00
injectVirtualColumns(res, task.get(), partition_value_type, virt_column_names);
return res;
}
}
return {};
}
2019-10-01 16:50:08 +00:00
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
if (prewhere_info)
{
if (reader->getColumns().empty())
{
2021-06-25 14:49:28 +00:00
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
2021-06-25 14:49:28 +00:00
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false);
2019-07-18 18:34:15 +00:00
pre_reader_ptr = &current_task.pre_range_reader;
}
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true);
}
}
else
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true);
}
}
2019-10-01 16:50:08 +00:00
Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
{
if (task->size_predictor)
task->size_predictor->startBlock();
const UInt64 current_max_block_size_rows = max_block_size_rows;
const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes;
const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
const MergeTreeIndexGranularity & index_granularity = task->data_part->index_granularity;
const double min_filtration_ratio = 0.00001;
auto estimate_num_rows = [current_preferred_block_size_bytes, current_max_block_size_rows,
&index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader)
{
if (!current_task.size_predictor)
2019-05-08 14:25:17 +00:00
return static_cast<size_t>(current_max_block_size_rows);
2017-07-24 15:06:32 +00:00
/// Calculates number of rows will be read using preferred_block_size_bytes.
2019-03-25 13:55:24 +00:00
/// Can't be less than avg_index_granularity.
2019-05-08 14:25:17 +00:00
size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
if (!rows_to_read)
return rows_to_read;
2019-05-08 12:26:35 +00:00
auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
if (current_preferred_max_column_in_block_size_bytes)
{
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
2019-05-08 12:26:35 +00:00
auto rows_to_read_for_max_size_column
= current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
auto rows_to_read_for_max_size_column_with_filtration
2019-05-08 12:26:35 +00:00
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
2017-07-21 18:02:02 +00:00
2019-03-28 15:03:49 +00:00
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
}
2019-05-08 12:26:35 +00:00
auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
if (unread_rows_in_current_granule >= rows_to_read)
return rows_to_read;
2019-03-28 15:03:49 +00:00
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
};
UInt64 recommended_rows = estimate_num_rows(*task, task->range_reader);
UInt64 rows_to_read = std::max(UInt64(1), std::min(current_max_block_size_rows, recommended_rows));
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
/// All rows were filtered. Repeat.
2019-09-23 19:22:02 +00:00
if (read_result.num_rows == 0)
read_result.columns.clear();
2020-04-22 06:22:14 +00:00
const auto & sample_block = task->range_reader.getSampleBlock();
2019-09-26 17:29:41 +00:00
if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size())
throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. "
"Have " + toString(sample_block.columns()) + " in sample block "
"and " + toString(read_result.columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR);
/// TODO: check columns have the same types as in header.
2019-09-23 19:22:02 +00:00
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
2018-02-20 14:26:22 +00:00
2019-10-04 15:40:05 +00:00
progress({ read_result.numReadRows(), read_result.numBytesRead() });
if (task->size_predictor)
{
2018-03-05 14:41:43 +00:00
task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows);
2019-09-23 19:22:02 +00:00
if (!read_result.columns.empty())
2019-09-26 17:29:41 +00:00
task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows);
}
2019-10-01 16:50:08 +00:00
if (read_result.num_rows == 0)
return {};
Columns ordered_columns;
ordered_columns.reserve(header_without_virtual_columns.columns());
2019-10-01 16:50:08 +00:00
/// Reorder columns. TODO: maybe skip for default case.
for (size_t ps = 0; ps < header_without_virtual_columns.columns(); ++ps)
2018-02-22 11:54:26 +00:00
{
auto pos_in_sample_block = sample_block.getPositionByName(header_without_virtual_columns.getByPosition(ps).name);
2019-10-01 16:50:08 +00:00
ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block]));
}
2019-10-01 16:50:08 +00:00
return Chunk(std::move(ordered_columns), read_result.num_rows);
}
2019-10-01 16:50:08 +00:00
Chunk MergeTreeBaseSelectProcessor::readFromPart()
{
if (!task->range_reader.isInitialized())
initializeRangeReaders(*task);
return readFromPartImpl();
}
2019-10-31 11:32:24 +00:00
namespace
{
2019-10-31 13:18:21 +00:00
/// Simple interfaces to insert virtual columns.
2019-10-31 11:32:24 +00:00
struct VirtualColumnsInserter
{
2019-10-31 13:18:21 +00:00
virtual ~VirtualColumnsInserter() = default;
virtual void insertArrayOfStringsColumn(const ColumnPtr & column, const String & name) = 0;
2019-10-31 11:32:24 +00:00
virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0;
virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0;
virtual void insertUUIDColumn(const ColumnPtr & column, const String & name) = 0;
2021-05-20 06:30:13 +00:00
virtual void insertPartitionValueColumn(
size_t rows,
2021-05-21 01:17:18 +00:00
const Row & partition_value,
2021-05-20 06:30:13 +00:00
const DataTypePtr & partition_value_type,
const String & name) = 0;
2019-10-31 11:32:24 +00:00
};
}
2021-04-27 08:15:59 +00:00
static void injectVirtualColumnsImpl(
size_t rows,
VirtualColumnsInserter & inserter,
MergeTreeReadTask * task,
const DataTypePtr & partition_value_type,
const Names & virtual_columns)
{
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
if (!virtual_columns.empty())
{
if (unlikely(rows && !task))
throw Exception("Cannot insert virtual columns to non-empty chunk without specified task.",
ErrorCodes::LOGICAL_ERROR);
2018-11-28 15:05:28 +00:00
const IMergeTreeDataPart * part = nullptr;
if (rows)
{
part = task->data_part.get();
if (part->isProjectionPart())
part = part->getParentPart();
}
2019-09-23 19:22:02 +00:00
for (const auto & virtual_column_name : virtual_columns)
{
2019-09-23 19:22:02 +00:00
if (virtual_column_name == "_part")
{
ColumnPtr column;
if (rows)
column = DataTypeString().createColumnConst(rows, part->name)->convertToFullColumnIfConst();
else
column = DataTypeString().createColumn();
2019-10-31 11:32:24 +00:00
inserter.insertStringColumn(column, virtual_column_name);
}
2019-09-23 19:22:02 +00:00
else if (virtual_column_name == "_part_index")
{
ColumnPtr column;
if (rows)
column = DataTypeUInt64().createColumnConst(rows, task->part_index_in_query)->convertToFullColumnIfConst();
else
column = DataTypeUInt64().createColumn();
2019-10-31 11:32:24 +00:00
inserter.insertUInt64Column(column, virtual_column_name);
}
else if (virtual_column_name == "_part_uuid")
{
ColumnPtr column;
if (rows)
column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst();
else
column = DataTypeUUID().createColumn();
inserter.insertUUIDColumn(column, virtual_column_name);
}
2019-09-23 19:22:02 +00:00
else if (virtual_column_name == "_partition_id")
{
ColumnPtr column;
if (rows)
column = DataTypeString().createColumnConst(rows, part->info.partition_id)->convertToFullColumnIfConst();
else
column = DataTypeString().createColumn();
2019-10-31 11:32:24 +00:00
inserter.insertStringColumn(column, virtual_column_name);
}
2021-04-27 08:15:59 +00:00
else if (virtual_column_name == "_partition_value")
{
if (rows)
inserter.insertPartitionValueColumn(rows, task->data_part->partition.value, partition_value_type, virtual_column_name);
else
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
}
}
}
}
namespace
{
2019-10-31 11:32:24 +00:00
struct VirtualColumnsInserterIntoBlock : public VirtualColumnsInserter
{
2019-10-31 11:32:24 +00:00
explicit VirtualColumnsInserterIntoBlock(Block & block_) : block(block_) {}
void insertArrayOfStringsColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), name});
}
2019-10-31 11:32:24 +00:00
void insertStringColumn(const ColumnPtr & column, const String & name) final
{
2019-10-31 11:32:24 +00:00
block.insert({column, std::make_shared<DataTypeString>(), name});
}
void insertUInt64Column(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUInt64>(), name});
}
void insertUUIDColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUUID>(), name});
}
2021-04-27 08:15:59 +00:00
void insertPartitionValueColumn(
2021-05-21 01:17:18 +00:00
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String & name) final
2021-04-27 08:15:59 +00:00
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
block.insert({column, partition_value_type, name});
}
Block & block;
};
2019-10-31 11:32:24 +00:00
struct VirtualColumnsInserterIntoColumns : public VirtualColumnsInserter
{
2019-10-31 11:32:24 +00:00
explicit VirtualColumnsInserterIntoColumns(Columns & columns_) : columns(columns_) {}
void insertArrayOfStringsColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
2019-10-31 11:32:24 +00:00
void insertStringColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertUInt64Column(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
void insertUUIDColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
2021-04-27 08:15:59 +00:00
2021-05-20 06:30:13 +00:00
void insertPartitionValueColumn(
2021-05-21 01:17:18 +00:00
size_t rows, const Row & partition_value, const DataTypePtr & partition_value_type, const String &) final
2021-04-27 08:15:59 +00:00
{
ColumnPtr column;
if (rows)
column = partition_value_type->createColumnConst(rows, Tuple(partition_value.begin(), partition_value.end()))
->convertToFullColumnIfConst();
else
column = partition_value_type->createColumn();
columns.push_back(column);
}
Columns & columns;
};
}
2021-04-27 08:15:59 +00:00
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Block & block, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
2021-04-27 08:15:59 +00:00
VirtualColumnsInserterIntoBlock inserter{block};
injectVirtualColumnsImpl(block.rows(), inserter, task, partition_value_type, virtual_columns);
}
2021-04-27 08:15:59 +00:00
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
Chunk & chunk, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
UInt64 num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
2021-04-27 08:15:59 +00:00
VirtualColumnsInserterIntoColumns inserter{columns};
injectVirtualColumnsImpl(num_rows, inserter, task, partition_value_type, virtual_columns);
chunk.setColumns(columns, num_rows);
}
2021-06-25 14:49:28 +00:00
Block MergeTreeBaseSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
if (prewhere_info)
{
2021-02-15 19:48:06 +00:00
if (prewhere_info->alias_actions)
2021-06-25 14:49:28 +00:00
block = prewhere_info->alias_actions->updateHeader(std::move(block));
2021-02-15 19:48:06 +00:00
if (prewhere_info->row_level_filter)
{
2021-06-25 14:49:28 +00:00
block = prewhere_info->row_level_filter->updateHeader(std::move(block));
2021-02-15 19:48:06 +00:00
auto & row_level_column = block.getByName(prewhere_info->row_level_column_name);
if (!row_level_column.type->canBeUsedInBooleanContext())
{
2021-02-15 19:48:06 +00:00
throw Exception("Invalid type for filter in PREWHERE: " + row_level_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
2021-02-20 11:00:16 +00:00
block.erase(prewhere_info->row_level_column_name);
}
if (prewhere_info->prewhere_actions)
2021-06-25 14:49:28 +00:00
block = prewhere_info->prewhere_actions->updateHeader(std::move(block));
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
2020-07-02 07:44:47 +00:00
if (!prewhere_column.type->canBeUsedInBooleanContext())
{
2020-06-30 14:20:27 +00:00
throw Exception("Invalid type for filter in PREWHERE: " + prewhere_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
2020-06-30 14:20:27 +00:00
if (prewhere_info->remove_prewhere_column)
block.erase(prewhere_info->prewhere_column_name);
else
{
WhichDataType which(removeNullable(recursiveRemoveLowCardinality(prewhere_column.type)));
2021-06-18 14:28:52 +00:00
if (which.isInt() || which.isUInt())
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
else if (which.isFloat())
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1.0f)->convertToFullColumnIfConst();
else
throw Exception("Illegal type " + prewhere_column.type->getName() + " of column for filter.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
}
}
2021-04-27 08:15:59 +00:00
injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns);
return block;
}
std::unique_ptr<MergeTreeBlockSizePredictor> MergeTreeBaseSelectProcessor::getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block)
{
const auto & required_column_names = task_columns.columns.getNames();
const auto & required_pre_column_names = task_columns.pre_columns.getNames();
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
return std::make_unique<MergeTreeBlockSizePredictor>(
data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block);
}
2019-10-01 16:50:08 +00:00
MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default;
}