ClickHouse/src/Storages/StorageMerge.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1207 lines
48 KiB
C++
Raw Normal View History

2023-09-19 11:27:31 +00:00
#include <algorithm>
2023-09-25 22:20:40 +00:00
#include <functional>
2023-09-19 11:27:31 +00:00
#include <Analyzer/ConstantNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/Utils.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnString.h>
#include <Core/SortDescription.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/IDataType.h>
#include <Databases/IDatabase.h>
#include <IO/WriteBufferFromString.h>
2020-11-06 14:07:56 +00:00
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
2023-09-19 11:27:31 +00:00
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/InterpreterSelectQuery.h>
2023-02-14 11:20:01 +00:00
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
2023-09-19 11:27:31 +00:00
#include <Interpreters/TreeRewriter.h>
2021-06-25 14:30:58 +00:00
#include <Interpreters/addTypeConversionToAST.h>
2023-09-19 11:27:31 +00:00
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/getHeaderForProcessingStage.h>
2021-06-25 14:30:58 +00:00
#include <Interpreters/replaceAliasColumnsInQuery.h>
2023-09-19 11:27:31 +00:00
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
2023-09-19 11:27:31 +00:00
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Planner/Utils.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
2022-05-20 19:49:31 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
2023-09-19 11:27:31 +00:00
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
2023-09-19 11:27:31 +00:00
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/narrowPipe.h>
#include <Storages/AlterCommands.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageView.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <base/defines.h>
#include <base/range.h>
2023-09-19 11:27:31 +00:00
#include <Common/Exception.h>
#include <Common/assert_cast.h>
#include <Common/checkStackSize.h>
#include <Common/typeid_cast.h>
2012-05-30 05:53:09 +00:00
namespace
{
using namespace DB;
bool columnIsPhysical(ColumnDefaultKind kind)
{
return kind == ColumnDefaultKind::Default || kind == ColumnDefaultKind::Materialized;
}
bool columnDefaultKindHasSameType(ColumnDefaultKind lhs, ColumnDefaultKind rhs)
{
if (lhs == rhs)
return true;
if (columnIsPhysical(lhs) == columnIsPhysical(rhs))
return true;
return false;
}
}
2012-05-30 05:53:09 +00:00
namespace DB
{
namespace ErrorCodes
{
2021-09-25 16:41:50 +00:00
extern const int BAD_ARGUMENTS;
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_PREWHERE;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SAMPLING_NOT_SUPPORTED;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
StorageMerge::StorageMerge(
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
const ColumnsDescription & columns_,
2021-04-23 12:18:23 +00:00
const String & comment,
2021-06-25 13:51:17 +00:00
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const DBToTableSetMap & source_databases_and_tables_,
ContextPtr context_)
2021-06-08 03:11:27 +00:00
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
2021-06-25 13:51:17 +00:00
, source_database_regexp(source_database_name_or_regexp_)
2021-06-08 03:11:27 +00:00
, source_databases_and_tables(source_databases_and_tables_)
2021-06-25 13:51:17 +00:00
, source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_.empty() ? getColumnsDescriptionFromSourceTables() : columns_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
2021-04-23 12:18:23 +00:00
const String & comment,
2021-06-25 13:51:17 +00:00
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const String & source_table_regexp_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
2021-06-25 13:51:17 +00:00
, source_database_regexp(source_database_name_or_regexp_)
, source_table_regexp(source_table_regexp_)
2021-06-25 13:51:17 +00:00
, source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
2012-05-30 05:53:09 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_.empty() ? getColumnsDescriptionFromSourceTables() : columns_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2012-05-30 05:53:09 +00:00
}
ColumnsDescription StorageMerge::getColumnsDescriptionFromSourceTables() const
{
auto table = getFirstTable([](auto && t) { return t; });
if (!table)
throw Exception{ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "There are no tables satisfied provided regexp, you must specify table structure manually"};
return table->getInMemoryMetadataPtr()->getColumns();
}
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
2021-06-07 09:14:29 +00:00
auto database_table_iterators = getDatabaseIterators(getContext());
2021-06-07 09:14:29 +00:00
for (auto & iterator : database_table_iterators)
{
2021-06-07 09:14:29 +00:00
while (iterator->isValid())
{
2021-06-13 08:03:19 +00:00
const auto & table = iterator->table();
if (table.get() != this && predicate(table))
2021-06-25 08:00:30 +00:00
return table;
2018-02-23 01:00:47 +00:00
2021-06-07 09:14:29 +00:00
iterator->next();
}
}
2018-02-23 01:00:47 +00:00
return {};
}
template <typename F>
void StorageMerge::forEachTable(F && func) const
{
getFirstTable([&func](const auto & table)
{
func(table);
return false;
});
}
bool StorageMerge::isRemote() const
{
2020-06-02 02:06:16 +00:00
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table && table->isRemote(); });
return first_remote_table != nullptr;
}
bool StorageMerge::tableSupportsPrewhere() const
{
/// NOTE: This check is used during query analysis as condition for applying
/// "move to PREWHERE" optimization. However, it contains a logical race:
/// If new table that matches regexp for current storage and doesn't support PREWHERE
/// will appear after this check and before calling "read" method, the optimized query may fail.
/// Since it's quite rare case, we just ignore this possibility.
///
/// NOTE: Type can be different, and in this case, PREWHERE cannot be
/// applied for those columns, but there a separate method to return
/// supported columns for PREWHERE - supportedPrewhereColumns().
return getFirstTable([](const auto & table) { return !table->canMoveConditionsToPrewhere(); }) == nullptr;
}
bool StorageMerge::canMoveConditionsToPrewhere() const
{
return tableSupportsPrewhere();
}
std::optional<NameSet> StorageMerge::supportedPrewhereColumns() const
{
bool supports_prewhere = true;
const auto & metadata = getInMemoryMetadata();
const auto & columns = metadata.getColumns();
NameSet supported_columns;
std::unordered_map<std::string, std::pair<const IDataType *, ColumnDefaultKind>> column_info;
for (const auto & name_type : columns.getAll())
{
const auto & column_default = columns.getDefault(name_type.name).value_or(ColumnDefault{});
column_info.emplace(name_type.name, std::make_pair(
name_type.type.get(),
column_default.kind));
supported_columns.emplace(name_type.name);
}
forEachTable([&](const StoragePtr & table)
{
const auto & table_metadata_ptr = table->getInMemoryMetadataPtr();
if (!table_metadata_ptr)
supports_prewhere = false;
if (!supports_prewhere)
return;
const auto & table_columns = table_metadata_ptr->getColumns();
for (const auto & column : table_columns.getAll())
{
const auto & column_default = table_columns.getDefault(column.name).value_or(ColumnDefault{});
const auto & [root_type, src_default_kind] = column_info[column.name];
if ((root_type && !root_type->equals(*column.type)) ||
!columnDefaultKindHasSameType(src_default_kind, column_default.kind))
2022-12-30 15:48:57 +00:00
{
supported_columns.erase(column.name);
2022-12-30 15:48:57 +00:00
}
}
});
return supported_columns;
}
2017-07-21 20:59:01 +00:00
bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & /*metadata_snapshot*/) const
2018-03-16 09:00:04 +00:00
{
/// It's beneficial if it is true for at least one table.
StorageListWithLocks selected_tables = getSelectedTables(query_context);
2018-03-16 09:00:04 +00:00
size_t i = 0;
for (const auto & table : selected_tables)
{
2021-06-07 09:14:29 +00:00
const auto & storage_ptr = std::get<1>(table);
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
if (storage_ptr->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot))
2018-03-16 09:00:04 +00:00
return true;
++i;
/// For simplicity reasons, check only first ten tables.
if (i > 10)
break;
}
return false;
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageSnapshotPtr &,
SelectQueryInfo & query_info) const
{
2020-11-13 22:56:25 +00:00
/// In case of JOIN the first stage (which includes JOIN)
/// should be done on the initiator always.
///
2022-04-17 23:02:49 +00:00
/// Since in case of JOIN query on shards will receive query without JOIN (and their columns).
/// (see removeJoin())
2020-11-13 22:56:25 +00:00
///
/// And for this we need to return FetchColumns.
if (const auto * select = query_info.query->as<ASTSelectQuery>(); select && hasJoin(*select))
2020-11-13 22:56:25 +00:00
return QueryProcessingStage::FetchColumns;
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
2021-06-07 09:14:29 +00:00
DatabaseTablesIterators database_table_iterators = getDatabaseIterators(local_context);
size_t selected_table_size = 0;
/// TODO: Find a way to support projections for StorageMerge
query_info.ignore_projections = true;
2021-06-07 09:14:29 +00:00
for (const auto & iterator : database_table_iterators)
{
2021-06-07 09:14:29 +00:00
while (iterator->isValid())
{
2021-06-07 09:14:29 +00:00
const auto & table = iterator->table();
if (table && table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(
stage_in_source_tables,
table->getQueryProcessingStage(local_context, to_stage,
table->getStorageSnapshot(table->getInMemoryMetadataPtr(), local_context), query_info));
2021-06-07 09:14:29 +00:00
}
2021-06-07 09:14:29 +00:00
iterator->next();
}
}
return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
}
2022-05-20 19:49:31 +00:00
void StorageMerge::read(
QueryPlan & query_plan,
2012-05-30 05:53:09 +00:00
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
size_t num_streams)
2012-05-30 05:53:09 +00:00
{
2022-07-25 19:41:43 +00:00
/** Just in case, turn off optimization "transfer to PREWHERE",
* since there is no certainty that it works when one of table is MergeTree and other is not.
*/
2022-07-26 14:43:05 +00:00
auto modified_context = Context::createCopy(local_context);
2022-07-25 19:41:43 +00:00
modified_context->setSetting("optimize_move_to_prewhere", false);
2022-07-27 12:00:55 +00:00
bool has_database_virtual_column = false;
bool has_table_virtual_column = false;
Names real_column_names;
real_column_names.reserve(column_names.size());
for (const auto & column_name : column_names)
{
if (column_name == "_database" && isVirtualColumn(column_name, storage_snapshot->metadata))
has_database_virtual_column = true;
else if (column_name == "_table" && isVirtualColumn(column_name, storage_snapshot->metadata))
has_table_virtual_column = true;
else
real_column_names.push_back(column_name);
}
StorageListWithLocks selected_tables
= getSelectedTables(modified_context, query_info.query, has_database_virtual_column, has_table_virtual_column);
InputOrderInfoPtr input_sorting_info;
if (query_info.order_optimizer)
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto storage_ptr = std::get<1>(*it);
auto storage_metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, modified_context);
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
input_sorting_info.reset();
if (!input_sorting_info)
break;
}
query_info.input_order_info = input_sorting_info;
}
2022-07-25 19:41:43 +00:00
query_plan.addInterpreterContext(modified_context);
2022-07-26 14:43:05 +00:00
/// What will be result structure depending on query processed stage in source tables?
Block common_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
auto step = std::make_unique<ReadFromMerge>(
common_header,
2022-07-27 12:00:55 +00:00
std::move(selected_tables),
real_column_names,
has_database_virtual_column,
has_table_virtual_column,
2022-07-26 14:43:05 +00:00
max_block_size,
num_streams,
shared_from_this(),
storage_snapshot,
query_info,
std::move(modified_context),
processed_stage);
query_plan.addStep(std::move(step));
}
ReadFromMerge::ReadFromMerge(
Block common_header_,
2022-07-27 12:00:55 +00:00
StorageListWithLocks selected_tables_,
2022-07-26 14:43:05 +00:00
Names column_names_,
2022-07-27 12:00:55 +00:00
bool has_database_virtual_column_,
bool has_table_virtual_column_,
2022-07-26 14:43:05 +00:00
size_t max_block_size,
size_t num_streams,
StoragePtr storage,
StorageSnapshotPtr storage_snapshot,
const SelectQueryInfo & query_info_,
ContextMutablePtr context_,
QueryProcessingStage::Enum processed_stage)
2023-02-24 12:46:09 +00:00
: SourceStepWithFilter(DataStream{.header = common_header_})
2022-07-26 14:43:05 +00:00
, required_max_block_size(max_block_size)
, requested_num_streams(num_streams)
, common_header(std::move(common_header_))
2022-07-27 12:00:55 +00:00
, selected_tables(std::move(selected_tables_))
2022-07-26 14:43:05 +00:00
, column_names(std::move(column_names_))
2022-07-27 12:00:55 +00:00
, has_database_virtual_column(has_database_virtual_column_)
, has_table_virtual_column(has_table_virtual_column_)
2022-07-26 14:43:05 +00:00
, storage_merge(std::move(storage))
, merge_storage_snapshot(std::move(storage_snapshot))
, query_info(query_info_)
, context(std::move(context_))
, common_processed_stage(processed_stage)
{
2023-09-19 11:27:31 +00:00
createChildPlans();
2022-07-25 19:41:43 +00:00
}
void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (selected_tables.empty())
2021-11-10 10:16:39 +00:00
{
2022-07-26 14:43:05 +00:00
pipeline.init(Pipe(std::make_shared<NullSource>(output_stream->header)));
2022-05-20 19:49:31 +00:00
return;
2021-11-10 10:16:39 +00:00
}
2023-09-19 11:27:31 +00:00
QueryPlanResourceHolder resources;
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;
chassert(selected_tables.size() == child_plans.size());
chassert(selected_tables.size() == table_aliases.size());
auto table_it = selected_tables.begin();
for (size_t i = 0; i < selected_tables.size(); ++i, ++table_it)
{
auto & plan = child_plans.at(i);
const auto & table = *table_it;
const auto storage = std::get<1>(table);
const auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
const auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot);
auto source_pipeline = createSources(
plan, nested_storage_snaphsot, modified_query_info, common_processed_stage, common_header, table_aliases.at(i), table, context);
if (source_pipeline && source_pipeline->initialized())
{
resources.storage_holders.push_back(std::get<1>(table));
resources.table_locks.push_back(std::get<2>(table));
pipelines.emplace_back(std::move(source_pipeline));
}
}
if (pipelines.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(output_stream->header)));
return;
}
pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines));
if (!query_info.input_order_info)
{
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier = std::min(
static_cast<size_t>(tables_count),
std::max(1UL, static_cast<size_t>(context->getSettingsRef().max_streams_multiplier_for_merge_tables)));
size_t num_streams = static_cast<size_t>(requested_num_streams * num_streams_multiplier);
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
// because narrowPipe doesn't preserve order.
pipeline.narrow(num_streams);
}
pipeline.addResources(std::move(resources));
}
void ReadFromMerge::createChildPlans()
{
if (selected_tables.empty())
return;
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
2022-11-17 18:44:26 +00:00
= std::min(static_cast<size_t>(tables_count), std::max(1UL, static_cast<size_t>(context->getSettingsRef().max_streams_multiplier_for_merge_tables)));
2022-09-11 01:21:34 +00:00
size_t num_streams = static_cast<size_t>(requested_num_streams * num_streams_multiplier);
2019-07-07 00:20:38 +00:00
size_t remaining_streams = num_streams;
if (order_info)
{
2022-11-17 18:44:26 +00:00
query_info.input_order_info = order_info;
}
else if (query_info.order_optimizer)
{
InputOrderInfoPtr input_sorting_info;
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
2021-06-07 09:14:29 +00:00
auto storage_ptr = std::get<1>(*it);
2020-06-17 11:05:11 +00:00
auto storage_metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
2022-07-25 19:41:43 +00:00
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, context);
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
input_sorting_info.reset();
if (!input_sorting_info)
break;
}
2020-05-13 13:49:10 +00:00
query_info.input_order_info = input_sorting_info;
}
for (const auto & table : selected_tables)
2012-05-30 05:53:09 +00:00
{
2018-11-26 00:56:50 +00:00
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
2023-03-10 10:54:46 +00:00
current_streams = std::max(1uz, current_streams);
2021-06-07 09:14:29 +00:00
const auto & storage = std::get<1>(table);
2017-07-21 20:59:01 +00:00
2023-02-14 11:20:01 +00:00
bool sampling_requested = query_info.query->as<ASTSelectQuery>()->sampleSize() != nullptr;
if (query_info.table_expression_modifiers)
sampling_requested = query_info.table_expression_modifiers->hasSampleSizeRatio();
/// If sampling requested, then check that table supports it.
2023-02-14 11:20:01 +00:00
if (sampling_requested && !storage->supportsSampling())
2023-10-08 20:54:33 +00:00
throw Exception(ErrorCodes::SAMPLING_NOT_SUPPORTED, "Illegal SAMPLE: table {} doesn't support sampling", storage->getStorageID().getNameForLogs());
2023-09-19 11:27:31 +00:00
auto & aliases = table_aliases.emplace_back();
2020-06-18 09:22:54 +00:00
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
2022-07-25 19:41:43 +00:00
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
2021-06-24 23:25:06 +00:00
2023-02-14 11:20:01 +00:00
auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot);
2021-11-09 11:27:17 +00:00
Names column_names_as_aliases;
2023-02-14 11:20:01 +00:00
if (!context->getSettingsRef().allow_experimental_analyzer)
2021-06-24 23:25:06 +00:00
{
2023-02-14 11:20:01 +00:00
auto storage_columns = storage_metadata_snapshot->getColumns();
auto syntax_result = TreeRewriter(context).analyzeSelect(
modified_query_info.query, TreeRewriterResult({}, storage, nested_storage_snaphsot));
2021-11-09 11:27:17 +00:00
2023-02-14 11:20:01 +00:00
bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
if (with_aliases)
2021-06-25 14:30:58 +00:00
{
2023-02-14 11:20:01 +00:00
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
2021-06-24 23:25:06 +00:00
2023-09-19 11:27:31 +00:00
auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock();
2023-02-14 11:20:01 +00:00
for (const auto & column : column_names)
2021-06-24 23:25:06 +00:00
{
2023-02-14 11:20:01 +00:00
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
if (is_alias)
{
column_expr = column_default->expression->clone();
replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(),
syntax_result->array_join_result_to_source, context);
2023-09-21 10:54:09 +00:00
const auto & column_description = storage_columns.get(column);
2023-02-14 11:20:01 +00:00
column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(),
storage_metadata_snapshot->getColumns().getAll(), context);
column_expr = setAlias(column_expr, column);
auto type = sample_block.getByName(column).type;
aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() });
}
else
column_expr = std::make_shared<ASTIdentifier>(column);
required_columns_expr_list->children.emplace_back(std::move(column_expr));
2021-06-24 23:25:06 +00:00
}
2023-02-14 11:20:01 +00:00
syntax_result = TreeRewriter(context).analyze(
required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, context));
2023-02-14 11:20:01 +00:00
auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActionsDAG(true);
2021-11-09 11:27:17 +00:00
2023-02-14 11:20:01 +00:00
column_names_as_aliases = alias_actions->getRequiredColumns().getNames();
if (column_names_as_aliases.empty())
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
}
2021-06-24 23:25:06 +00:00
}
2020-06-18 09:22:54 +00:00
2023-09-19 11:27:31 +00:00
child_plans.emplace_back(createPlanForTable(
2021-07-09 14:11:44 +00:00
nested_storage_snaphsot,
2021-11-09 11:27:17 +00:00
modified_query_info,
2022-07-25 19:41:43 +00:00
common_processed_stage,
required_max_block_size,
2021-06-07 09:14:29 +00:00
table,
2022-07-27 12:00:55 +00:00
column_names_as_aliases.empty() ? column_names : column_names_as_aliases,
2022-07-25 19:41:43 +00:00
context,
2023-09-19 11:27:31 +00:00
current_streams));
2012-05-30 05:53:09 +00:00
}
}
2023-02-14 11:20:01 +00:00
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & query_info,
const ContextPtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot)
{
const auto & [database_name, storage, storage_lock, table_name] = storage_with_lock_and_name;
const StorageID current_storage_id = storage->getStorageID();
SelectQueryInfo modified_query_info = query_info;
if (modified_query_info.table_expression)
{
auto replacement_table_expression = std::make_shared<TableNode>(storage, storage_lock, storage_snapshot);
2023-02-15 14:03:52 +00:00
if (query_info.table_expression_modifiers)
replacement_table_expression->setTableExpressionModifiers(*query_info.table_expression_modifiers);
2023-02-14 11:20:01 +00:00
modified_query_info.query_tree = modified_query_info.query_tree->cloneAndReplace(modified_query_info.table_expression,
replacement_table_expression);
modified_query_info.table_expression = replacement_table_expression;
2023-02-15 14:03:52 +00:00
modified_query_info.planner_context->getOrCreateTableExpressionData(replacement_table_expression);
2023-02-14 11:20:01 +00:00
2023-02-15 14:03:52 +00:00
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
if (storage_snapshot->storage.supportsSubcolumns())
get_column_options.withSubcolumns();
std::unordered_map<std::string, QueryTreeNodePtr> column_name_to_node;
if (!storage_snapshot->tryGetColumn(get_column_options, "_table"))
2023-02-14 11:20:01 +00:00
column_name_to_node.emplace("_table", std::make_shared<ConstantNode>(current_storage_id.table_name));
2023-02-15 14:03:52 +00:00
if (!storage_snapshot->tryGetColumn(get_column_options, "_database"))
2023-02-14 11:20:01 +00:00
column_name_to_node.emplace("_database", std::make_shared<ConstantNode>(current_storage_id.database_name));
2023-02-15 14:03:52 +00:00
if (!column_name_to_node.empty())
{
2023-02-14 11:20:01 +00:00
replaceColumns(modified_query_info.query_tree,
replacement_table_expression,
column_name_to_node);
}
modified_query_info.query = queryNodeToSelectQuery(modified_query_info.query_tree);
}
else
{
2023-02-15 14:03:52 +00:00
bool is_storage_merge_engine = storage->as<StorageMerge>();
2023-02-14 11:20:01 +00:00
modified_query_info.query = query_info.query->clone();
/// Original query could contain JOIN but we need only the first joined table and its columns.
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
TreeRewriterResult new_analyzer_res = *modified_query_info.syntax_analyzer_result;
removeJoin(modified_select, new_analyzer_res, modified_context);
modified_query_info.syntax_analyzer_result = std::make_shared<TreeRewriterResult>(std::move(new_analyzer_res));
if (!is_storage_merge_engine)
{
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", current_storage_id.table_name);
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_database", current_storage_id.database_name);
}
}
return modified_query_info;
}
2023-09-25 22:20:40 +00:00
bool recursivelyApplyToReadingSteps(QueryPlan::Node * node, const std::function<bool(ReadFromMergeTree &)> & func)
2023-09-19 11:27:31 +00:00
{
bool ok = true;
for (auto * child : node->children)
ok &= recursivelyApplyToReadingSteps(child, func);
2023-09-25 22:20:40 +00:00
// This code is mainly meant to be used to call `requestReadingInOrder` on child steps.
// In this case it is ok if one child will read in order and other will not (though I don't know when it is possible),
// the only important part is to acknowledge this at the parent and don't rely on any particular ordering of input data.
2023-09-19 11:27:31 +00:00
if (!ok)
return false;
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(node->step.get()))
ok &= func(*read_from_merge_tree);
return ok;
}
2022-07-25 19:41:43 +00:00
QueryPipelineBuilderPtr ReadFromMerge::createSources(
2023-09-19 11:27:31 +00:00
QueryPlan & plan,
const StorageSnapshotPtr & storage_snapshot,
2021-11-10 08:47:03 +00:00
SelectQueryInfo & modified_query_info,
const QueryProcessingStage::Enum & processed_stage,
const Block & header,
2021-06-24 23:25:06 +00:00
const Aliases & aliases,
const StorageWithLockAndName & storage_with_lock,
2023-09-19 11:27:31 +00:00
ContextMutablePtr modified_context,
bool concat_streams) const
{
if (!plan.isInitialized())
return std::make_unique<QueryPipelineBuilder>();
QueryPipelineBuilderPtr builder;
const auto & [database_name, storage, _, table_name] = storage_with_lock;
bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer;
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, storage_snapshot, modified_query_info);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(modified_context), BuildQueryPipelineSettings::fromContext(modified_context));
if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
{
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
}
if (builder->initialized())
{
if (concat_streams && builder->getNumStreams() > 1)
{
// It's possible to have many tables read from merge, resize(1) might open too many files at the same time.
// Using concat instead.
builder->addTransform(std::make_shared<ConcatProcessor>(builder->getHeader(), builder->getNumStreams()));
}
/// Add virtual columns if we don't already have them.
Block pipe_header = builder->getHeader();
if (has_database_virtual_column && !pipe_header.has("_database"))
{
ColumnWithTypeAndName column;
column.name = "_database";
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
if (has_table_virtual_column && !pipe_header.has("_table"))
{
ColumnWithTypeAndName column;
column.name = "_table";
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(table_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage);
}
return builder;
}
QueryPlan ReadFromMerge::createPlanForTable(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & modified_query_info,
const QueryProcessingStage::Enum & processed_stage,
UInt64 max_block_size,
const StorageWithLockAndName & storage_with_lock,
2023-02-15 14:03:52 +00:00
Names real_column_names,
2021-05-31 14:49:02 +00:00
ContextMutablePtr modified_context,
2023-09-19 11:27:31 +00:00
size_t streams_num)
{
2022-05-20 19:49:31 +00:00
const auto & [database_name, storage, _, table_name] = storage_with_lock;
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
2023-05-25 12:41:04 +00:00
if (!InterpreterSelectQuery::isQueryWithFinal(modified_query_info) && storage->needRewriteQueryWithFinal(real_column_names))
{
/// NOTE: It may not work correctly in some cases, because query was analyzed without final.
/// However, it's needed for MaterializedMySQL and it's unlikely that someone will use it with Merge tables.
modified_select.setFinal();
}
2023-02-15 14:03:52 +00:00
bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer;
2023-02-15 14:03:52 +00:00
auto storage_stage = storage->getQueryProcessingStage(modified_context,
QueryProcessingStage::Complete,
storage_snapshot,
modified_query_info);
2023-09-19 11:27:31 +00:00
QueryPlan plan;
2023-02-15 14:03:52 +00:00
if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns))
{
/// If there are only virtual columns in query, you must request at least one other column.
2019-12-30 18:20:43 +00:00
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
2023-02-15 14:03:52 +00:00
StorageView * view = dynamic_cast<StorageView *>(storage.get());
if (!view || allow_experimental_analyzer)
{
2023-02-15 14:03:52 +00:00
storage->read(plan,
real_column_names,
storage_snapshot,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
UInt32(streams_num));
2023-02-15 14:03:52 +00:00
}
else
{
/// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read.
/// The most intuitive way is to use InterpreterSelectQuery.
2023-02-15 14:03:52 +00:00
/// Intercept the settings
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
modified_context->setSetting("max_block_size", max_block_size);
InterpreterSelectQuery interpreter(modified_query_info.query,
modified_context,
storage,
view->getInMemoryMetadataPtr(),
SelectQueryOptions(processed_stage));
interpreter.buildQueryPlan(plan);
}
2022-05-23 19:47:32 +00:00
2022-05-27 20:47:35 +00:00
if (!plan.isInitialized())
return {};
2023-09-25 22:20:40 +00:00
applyFilters(plan);
}
2023-02-15 14:03:52 +00:00
else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
{
/// Maximum permissible parallelism is streams_num
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
2023-02-15 14:03:52 +00:00
if (allow_experimental_analyzer)
{
InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections());
2023-09-23 16:10:29 +00:00
auto & planner = interpreter.getPlanner();
planner.buildQueryPlanIfNeeded();
plan = std::move(planner).extractQueryPlan();
2023-02-14 11:20:01 +00:00
}
2023-02-15 14:03:52 +00:00
else
2023-02-14 11:20:01 +00:00
{
modified_select.replaceDatabaseAndTable(database_name, table_name);
/// TODO: Find a way to support projections for StorageMerge
2023-02-15 14:03:52 +00:00
InterpreterSelectQuery interpreter{modified_query_info.query,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections()};
2023-09-19 11:27:31 +00:00
interpreter.buildQueryPlan(plan);
2020-08-03 13:54:14 +00:00
}
}
2023-09-19 11:27:31 +00:00
return plan;
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
2021-06-25 08:00:30 +00:00
ContextPtr query_context,
const ASTPtr & query /* = nullptr */,
bool filter_by_database_virtual_column /* = false */,
bool filter_by_table_virtual_column /* = false */) const
{
/// FIXME: filtering does not work with allow_experimental_analyzer due to
/// different column names there (it has "table_name._table" not just
/// "_table")
2021-06-25 08:00:30 +00:00
assert(!filter_by_database_virtual_column || !filter_by_table_virtual_column || query);
const Settings & settings = query_context->getSettingsRef();
StorageListWithLocks selected_tables;
2021-06-07 09:14:29 +00:00
DatabaseTablesIterators database_table_iterators = getDatabaseIterators(getContext());
2021-06-25 08:00:30 +00:00
MutableColumnPtr database_name_virtual_column;
MutableColumnPtr table_name_virtual_column;
2021-06-25 08:00:30 +00:00
if (filter_by_database_virtual_column)
{
database_name_virtual_column = ColumnString::create();
}
if (filter_by_table_virtual_column)
{
table_name_virtual_column = ColumnString::create();
2021-06-25 08:00:30 +00:00
}
2021-06-07 09:14:29 +00:00
for (const auto & iterator : database_table_iterators)
{
2021-06-25 08:00:30 +00:00
if (filter_by_database_virtual_column)
database_name_virtual_column->insert(iterator->databaseName());
2021-06-07 09:14:29 +00:00
while (iterator->isValid())
{
StoragePtr storage = iterator->table();
if (!storage)
continue;
2021-06-07 09:14:29 +00:00
if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "Storage {} doesn't support PREWHERE.", storage->getName());
2021-06-07 09:14:29 +00:00
if (storage.get() != this)
{
auto table_lock = storage->lockForShare(query_context->getCurrentQueryId(), settings.lock_acquire_timeout);
selected_tables.emplace_back(iterator->databaseName(), storage, std::move(table_lock), iterator->name());
2021-06-25 08:00:30 +00:00
if (filter_by_table_virtual_column)
2021-06-07 09:14:29 +00:00
table_name_virtual_column->insert(iterator->name());
}
2021-06-07 09:14:29 +00:00
iterator->next();
}
}
2021-06-25 08:00:30 +00:00
if (filter_by_database_virtual_column)
{
/// Filter names of selected tables if there is a condition on "_database" virtual column in WHERE clause
Block virtual_columns_block
= Block{ColumnWithTypeAndName(std::move(database_name_virtual_column), std::make_shared<DataTypeString>(), "_database")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, query_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_database");
/// Remove unused databases from the list
selected_tables.remove_if([&](const auto & elem) { return values.find(std::get<0>(elem)) == values.end(); });
}
if (filter_by_table_virtual_column)
{
/// Filter names of selected tables if there is a condition on "_table" virtual column in WHERE clause
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(table_name_virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, query_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list
2021-06-07 09:14:29 +00:00
selected_tables.remove_if([&](const auto & elem) { return values.find(std::get<3>(elem)) == values.end(); });
}
return selected_tables;
}
2021-06-25 13:51:17 +00:00
DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const String & database_name, ContextPtr local_context) const
{
auto database = DatabaseCatalog::instance().getDatabase(database_name);
auto table_name_match = [this, database_name](const String & table_name_) -> bool
{
2021-06-25 13:51:17 +00:00
if (source_databases_and_tables)
{
if (auto it = source_databases_and_tables->find(database_name); it != source_databases_and_tables->end())
return it->second.contains(table_name_);
else
return false;
2021-06-25 13:51:17 +00:00
}
else
return source_table_regexp->match(table_name_);
};
return database->getTablesIterator(local_context, table_name_match);
}
2021-06-07 09:14:29 +00:00
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr local_context) const
{
try
{
checkStackSize();
}
catch (Exception & e)
{
e.addMessage("while getting table iterator of Merge table. Maybe caused by two Merge tables that will endlessly try to read each other's data");
throw;
}
2021-06-07 09:14:29 +00:00
DatabaseTablesIterators database_table_iterators;
2021-06-25 13:51:17 +00:00
/// database_name argument is not a regexp
if (!database_is_regexp)
database_table_iterators.emplace_back(getDatabaseIterator(source_database_name_or_regexp, local_context));
2021-06-07 09:14:29 +00:00
2021-06-25 13:51:17 +00:00
/// database_name argument is a regexp
else
{
2021-06-25 13:51:17 +00:00
auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & db : databases)
2021-06-07 09:14:29 +00:00
{
2021-06-25 13:51:17 +00:00
if (source_database_regexp->match(db.first))
database_table_iterators.emplace_back(getDatabaseIterator(db.first, local_context));
2021-06-07 09:14:29 +00:00
}
}
2021-06-07 09:14:29 +00:00
return database_table_iterators;
}
void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
2019-12-26 18:17:05 +00:00
{
2023-03-27 15:38:52 +00:00
std::optional<NameDependencies> name_deps{};
2019-12-26 18:17:05 +00:00
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN
&& command.type != AlterCommand::Type::COMMENT_TABLE)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
if (command.type == AlterCommand::Type::DROP_COLUMN && !command.clear)
{
2023-03-27 15:38:52 +00:00
if (!name_deps)
name_deps = getDependentViewsByColumn(local_context);
const auto & deps_mv = name_deps.value()[command.column_name];
if (!deps_mv.empty())
{
2023-01-17 00:19:44 +00:00
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,
"Trying to ALTER DROP column {} which is referenced by materialized view {}",
backQuoteIfNeed(command.column_name), toString(deps_mv));
}
}
2019-12-26 18:17:05 +00:00
}
}
2019-03-05 10:12:20 +00:00
void StorageMerge::alter(
2021-10-25 17:49:49 +00:00
const AlterCommands & params, ContextPtr local_context, AlterLockHolder &)
2013-09-23 12:01:19 +00:00
{
2019-12-03 16:25:32 +00:00
auto table_id = getStorageID();
2016-05-13 21:08:19 +00:00
2019-12-26 18:17:05 +00:00
StorageInMemoryMetadata storage_metadata = getInMemoryMetadata();
params.apply(storage_metadata, local_context);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, storage_metadata);
setInMemoryMetadata(storage_metadata);
2013-09-23 12:01:19 +00:00
}
2022-07-25 19:41:43 +00:00
void ReadFromMerge::convertingSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
2021-06-24 23:25:06 +00:00
const Aliases & aliases,
ContextPtr local_context,
2023-02-15 14:03:52 +00:00
QueryPipelineBuilder & builder,
const QueryProcessingStage::Enum & processed_stage)
{
2022-07-25 19:41:43 +00:00
Block before_block_header = builder.getHeader();
2020-11-17 17:16:55 +00:00
2021-06-24 23:25:06 +00:00
auto storage_sample_block = metadata_snapshot->getSampleBlock();
2022-07-25 19:41:43 +00:00
auto pipe_columns = builder.getHeader().getNamesAndTypesList();
2020-11-17 17:16:55 +00:00
2021-06-24 23:25:06 +00:00
for (const auto & alias : aliases)
2020-08-03 13:54:14 +00:00
{
2021-06-24 23:25:06 +00:00
pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type));
ASTPtr expr = alias.expression;
2021-06-24 23:25:06 +00:00
auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns);
auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context};
auto dag = std::make_shared<ActionsDAG>(pipe_columns);
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
2022-07-25 19:41:43 +00:00
builder.addSimpleTransform([&](const Block & stream_header)
2021-06-24 23:25:06 +00:00
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
2020-11-17 17:16:55 +00:00
2023-02-14 11:20:01 +00:00
ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name;
2021-07-15 10:35:37 +00:00
2023-02-15 14:03:52 +00:00
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)
2023-02-14 11:20:01 +00:00
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
convert_actions_match_columns_mode);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag),
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
{
2023-02-13 11:19:31 +00:00
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
2023-05-25 12:41:04 +00:00
if (order_info_->direction != 1 && InterpreterSelectQuery::isQueryWithFinal(query_info))
return false;
2023-09-25 22:20:40 +00:00
auto request_read_in_order = [order_info_](ReadFromMergeTree & read_from_merge_tree)
2023-09-19 11:27:31 +00:00
{
return read_from_merge_tree.requestReadingInOrder(
2023-09-25 22:20:40 +00:00
order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit);
2023-09-19 11:27:31 +00:00
};
bool ok = true;
for (const auto & plan : child_plans)
if (plan.isInitialized())
ok &= recursivelyApplyToReadingSteps(plan.getRootNode(), request_read_in_order);
if (!ok)
return false;
order_info = order_info_;
2023-09-19 11:27:31 +00:00
query_info.input_order_info = order_info;
return true;
}
2023-09-25 22:20:40 +00:00
void ReadFromMerge::applyFilters(const QueryPlan & plan) const
2023-09-19 11:27:31 +00:00
{
auto apply_filters = [this](ReadFromMergeTree & read_from_merge_tree)
{
size_t filters_dags_size = filter_dags.size();
for (size_t i = 0; i < filters_dags_size; ++i)
read_from_merge_tree.addFilter(filter_dags[i], filter_nodes.nodes[i]);
read_from_merge_tree.applyFilters();
return true;
};
2023-09-25 22:20:40 +00:00
recursivelyApplyToReadingSteps(plan.getRootNode(), apply_filters);
}
void ReadFromMerge::applyFilters()
{
2023-09-19 11:27:31 +00:00
for (const auto & plan : child_plans)
if (plan.isInitialized())
2023-09-25 22:20:40 +00:00
applyFilters(plan);
2023-09-19 11:27:31 +00:00
}
IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const
{
ColumnSizeByName column_sizes;
forEachTable([&](const auto & table)
{
for (const auto & [name, size] : table->getColumnSizes())
column_sizes[name].add(size);
});
return column_sizes;
}
std::tuple<bool /* is_regexp */, ASTPtr> StorageMerge::evaluateDatabaseName(const ASTPtr & node, ContextPtr context_)
{
if (const auto * func = node->as<ASTFunction>(); func && func->name == "REGEXP")
{
if (func->arguments->children.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "REGEXP in Merge ENGINE takes only one argument");
auto * literal = func->arguments->children[0]->as<ASTLiteral>();
if (!literal || literal->value.getType() != Field::Types::Which::String || literal->value.safeGet<String>().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument for REGEXP in Merge ENGINE should be a non empty String Literal");
return {true, func->arguments->children[0]};
}
auto ast = evaluateConstantExpressionForDatabaseName(node, context_);
return {false, ast};
}
void registerStorageMerge(StorageFactory & factory)
{
factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
{
/** In query, the name of database is specified as table engine argument which contains source tables,
* as well as regex for source-table names.
*/
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Merge requires exactly 2 parameters - name "
"of source database and regexp for table names.");
auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName(engine_args[0], args.getLocalContext());
2021-06-27 06:09:23 +00:00
if (!is_regexp)
engine_args[0] = database_ast;
String source_database_name_or_regexp = checkAndGetLiteralArgument<String>(database_ast, "database_name");
2021-06-25 15:01:57 +00:00
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");
return std::make_shared<StorageMerge>(
2021-06-25 15:01:57 +00:00
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, args.getContext());
},
{
.supports_schema_inference = true
});
}
NamesAndTypesList StorageMerge::getVirtuals() const
2020-04-27 13:55:30 +00:00
{
2023-02-02 16:33:31 +00:00
NamesAndTypesList virtuals{
{"_database", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_table", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
2020-06-02 02:06:16 +00:00
auto first_table = getFirstTable([](auto && table) { return table; });
if (first_table)
{
auto table_virtuals = first_table->getVirtuals();
virtuals.insert(virtuals.end(), table_virtuals.begin(), table_virtuals.end());
}
2020-04-27 17:46:51 +00:00
return virtuals;
2020-04-27 13:55:30 +00:00
}
2013-09-23 12:01:19 +00:00
}