ClickHouse/src/Storages/StorageMaterializedView.cpp

445 lines
17 KiB
C++
Raw Normal View History

#include <Storages/StorageMaterializedView.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
2017-05-23 18:33:48 +00:00
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Access/AccessFlags.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/AlterCommands.h>
#include <Storages/StorageFactory.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/SelectQueryDescription.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
2021-03-04 17:38:12 +00:00
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
2021-07-23 19:33:59 +00:00
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_QUERY;
extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW;
}
2020-03-18 17:38:52 +00:00
static inline String generateInnerTableName(const StorageID & view_id)
{
2020-03-18 17:38:52 +00:00
if (view_id.hasUUID())
2020-03-20 00:07:52 +00:00
return ".inner_id." + toString(view_id.uuid);
2020-03-18 17:38:52 +00:00
return ".inner." + view_id.getTableName();
}
/// Remove columns from target_header that does not exists in src_header
static void removeNonCommonColumns(const Block & src_header, Block & target_header)
{
std::set<size_t> target_only_positions;
for (const auto & column : target_header)
{
if (!src_header.has(column.name))
target_only_positions.insert(target_header.getPositionByName(column.name));
}
target_header.erase(target_only_positions);
}
StorageMaterializedView::StorageMaterializedView(
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
ContextPtr local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_)
2021-05-31 14:49:02 +00:00
: IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
2019-08-24 21:20:20 +00:00
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
2019-12-10 19:48:16 +00:00
/// If the destination table is not set, use inner table
2019-12-10 19:48:16 +00:00
has_inner_table = query.to_table_id.empty();
2019-12-10 19:48:16 +00:00
if (has_inner_table && !query.storage)
throw Exception(
"You must specify where to save results of a MaterializedView query: either ENGINE or an existing table in a TO clause",
ErrorCodes::INCORRECT_QUERY);
2018-02-25 07:39:45 +00:00
if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
2018-02-25 06:34:20 +00:00
auto select = SelectQueryDescription::getSelectQueryFromASTForMatView(query.select->clone(), local_context);
2020-06-19 15:39:41 +00:00
storage_metadata.setSelectQuery(select);
setInMemoryMetadata(storage_metadata);
2020-01-31 17:12:18 +00:00
2021-04-13 20:14:05 +00:00
bool point_to_itself_by_uuid = has_inner_table && query.to_inner_uuid != UUIDHelpers::Nil
&& query.to_inner_uuid == table_id_.uuid;
bool point_to_itself_by_name = !has_inner_table && query.to_table_id.database_name == table_id_.database_name
&& query.to_table_id.table_name == table_id_.table_name;
2021-04-13 19:13:26 +00:00
if (point_to_itself_by_uuid || point_to_itself_by_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Materialized view {} cannot point to itself", table_id_.getFullTableName());
2019-12-10 19:48:16 +00:00
if (!has_inner_table)
{
2019-12-10 19:48:16 +00:00
target_table_id = query.to_table_id;
}
2019-12-10 19:48:16 +00:00
else if (attach_)
{
2019-12-10 19:48:16 +00:00
/// If there is an ATTACH request, then the internal table must already be created.
2021-03-08 17:26:38 +00:00
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);
}
else
{
/// We will create a query to create an internal table.
auto create_context = Context::createCopy(local_context);
auto manual_create_query = std::make_shared<ASTCreateQuery>();
2020-01-31 17:12:18 +00:00
manual_create_query->database = getStorageID().database_name;
2020-03-18 17:38:52 +00:00
manual_create_query->table = generateInnerTableName(getStorageID());
2021-03-08 17:26:38 +00:00
manual_create_query->uuid = query.to_inner_uuid;
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());
manual_create_query->set(manual_create_query->columns_list, new_columns_list);
manual_create_query->set(manual_create_query->storage, query.storage->ptr());
2021-01-26 17:51:25 +00:00
InterpreterCreateQuery create_interpreter(manual_create_query, create_context);
2019-12-10 19:48:16 +00:00
create_interpreter.setInternal(true);
create_interpreter.execute();
target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->database, manual_create_query->table}, getContext())->getStorageID();
}
2019-12-10 19:48:16 +00:00
if (!select.select_table_id.empty())
DatabaseCatalog::instance().addDependency(select.select_table_id, getStorageID());
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
SelectQueryInfo & query_info) const
{
return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getInMemoryMetadataPtr(), query_info);
}
2020-08-03 13:54:14 +00:00
Pipe StorageMaterializedView::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
2021-03-04 17:38:12 +00:00
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));
}
void StorageMaterializedView::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
2017-06-02 15:54:39 +00:00
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
2020-05-13 13:49:10 +00:00
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);
storage->read(query_plan, column_names, target_metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
2020-10-22 12:01:22 +00:00
if (query_plan.isInitialized())
{
auto mv_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, local_context, processed_stage);
2021-01-21 12:57:18 +00:00
auto target_header = query_plan.getCurrentDataStream().header;
/// No need to convert columns that does not exists in MV
removeNonCommonColumns(mv_header, target_header);
/// No need to convert columns that does not exists in the result header.
///
/// Distributed storage may process query up to the specific stage, and
/// so the result header may not include all the columns from the
/// materialized view.
removeNonCommonColumns(target_header, mv_header);
if (!blocksHaveEqualStructure(mv_header, target_header))
{
auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),
mv_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
converting_step->setStepDescription("Convert target table structure to MaterializedView structure");
query_plan.addStep(std::move(converting_step));
}
2020-10-22 12:01:22 +00:00
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Add table lock for destination table.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
storage,
std::move(lock),
limits,
leaf_limits,
nullptr,
nullptr);
adding_limits_and_quota->setStepDescription("Lock destination table for MaterializedView");
2020-10-22 12:01:22 +00:00
query_plan.addStep(std::move(adding_limits_and_quota));
}
}
2021-07-23 19:33:59 +00:00
SinkToStoragePtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr local_context)
{
auto storage = getTargetTable();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
2021-07-23 19:33:59 +00:00
auto sink = storage->write(query, metadata_snapshot, local_context);
2021-07-23 19:33:59 +00:00
sink->addTableLock(lock);
return sink;
}
2020-01-22 11:30:11 +00:00
void StorageMaterializedView::drop()
2018-04-21 00:35:20 +00:00
{
2019-12-03 16:25:32 +00:00
auto table_id = getStorageID();
2020-06-17 14:06:22 +00:00
const auto & select_query = getInMemoryMetadataPtr()->getSelectQuery();
if (!select_query.select_table_id.empty())
DatabaseCatalog::instance().removeDependency(select_query.select_table_id, table_id);
2018-04-21 00:35:20 +00:00
dropInnerTableIfAny(true, getContext());
}
2021-04-11 07:44:40 +00:00
void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
{
if (has_inner_table && tryGetTargetTable())
2021-04-11 07:44:40 +00:00
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
2018-06-09 15:48:22 +00:00
}
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
2018-06-09 15:48:22 +00:00
{
if (has_inner_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, target_table_id, true);
2018-04-21 00:35:20 +00:00
}
void StorageMaterializedView::checkStatementCanBeForwarded() const
{
if (!has_inner_table)
throw Exception(
2019-12-10 19:48:16 +00:00
"MATERIALIZED VIEW targets existing table " + target_table_id.getNameForLogs() + ". "
+ "Execute the statement directly on it.", ErrorCodes::INCORRECT_QUERY);
}
2020-06-17 13:39:26 +00:00
bool StorageMaterializedView::optimize(
const ASTPtr & query,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
ContextPtr local_context)
{
checkStatementCanBeForwarded();
2020-06-17 13:39:26 +00:00
auto storage_ptr = getTargetTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
}
void StorageMaterializedView::alter(
const AlterCommands & params,
ContextPtr local_context,
2020-06-18 16:10:47 +00:00
TableLockHolder &)
{
auto table_id = getStorageID();
2020-06-09 21:22:01 +00:00
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
2020-06-17 14:06:22 +00:00
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
params.apply(new_metadata, local_context);
2020-01-31 17:12:18 +00:00
/// start modify query
if (local_context->getSettingsRef().allow_experimental_alter_materialized_view_structure)
2020-01-31 17:12:18 +00:00
{
2020-06-09 21:22:01 +00:00
const auto & new_select = new_metadata.select;
2020-06-17 14:06:22 +00:00
const auto & old_select = old_metadata.getSelectQuery();
2020-01-31 17:12:18 +00:00
DatabaseCatalog::instance().updateDependency(old_select.select_table_id, table_id, new_select.select_table_id, table_id);
2020-01-31 17:12:18 +00:00
2020-06-15 18:08:05 +00:00
new_metadata.setSelectQuery(new_select);
2020-01-31 17:12:18 +00:00
}
/// end modify query
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
}
void StorageMaterializedView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
if (settings.allow_experimental_alter_materialized_view_structure)
{
2020-01-31 17:12:18 +00:00
for (const auto & command : commands)
{
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
else
{
for (const auto & command : commands)
{
if (!command.isCommentAlter())
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
}
void StorageMaterializedView::checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const
{
checkStatementCanBeForwarded();
getTargetTable()->checkMutationIsPossible(commands, settings);
}
2020-08-03 13:54:14 +00:00
Pipe StorageMaterializedView::alterPartition(
const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, ContextPtr local_context)
{
checkStatementCanBeForwarded();
return getTargetTable()->alterPartition(metadata_snapshot, commands, local_context);
}
2020-07-14 08:19:39 +00:00
void StorageMaterializedView::checkAlterPartitionIsPossible(
const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const
{
checkStatementCanBeForwarded();
getTargetTable()->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings);
}
void StorageMaterializedView::mutate(const MutationCommands & commands, ContextPtr local_context)
{
checkStatementCanBeForwarded();
getTargetTable()->mutate(commands, local_context);
}
2020-04-07 14:05:51 +00:00
void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
{
2020-03-18 17:38:52 +00:00
auto old_table_id = getStorageID();
2020-06-17 14:06:22 +00:00
auto metadata_snapshot = getInMemoryMetadataPtr();
2020-04-10 01:35:37 +00:00
bool from_atomic_to_atomic_database = old_table_id.hasUUID() && new_table_id.hasUUID();
2020-04-07 14:05:51 +00:00
2021-05-19 18:53:31 +00:00
if (!from_atomic_to_atomic_database && has_inner_table && tryGetTargetTable())
{
2020-04-08 01:02:00 +00:00
auto new_target_table_name = generateInnerTableName(new_table_id);
2019-12-10 19:48:16 +00:00
auto rename = std::make_shared<ASTRenameQuery>();
2019-12-10 19:48:16 +00:00
ASTRenameQuery::Table from;
2021-05-19 18:53:31 +00:00
assert(target_table_id.database_name == old_table_id.database_name);
2019-12-10 19:48:16 +00:00
from.database = target_table_id.database_name;
from.table = target_table_id.table_name;
2019-12-10 19:48:16 +00:00
ASTRenameQuery::Table to;
2021-05-19 18:53:31 +00:00
to.database = new_table_id.database_name;
2019-12-10 19:48:16 +00:00
to.table = new_target_table_name;
2019-12-10 19:48:16 +00:00
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
InterpreterRenameQuery(rename, getContext()).execute();
2021-05-19 18:53:31 +00:00
target_table_id.database_name = new_table_id.database_name;
2019-12-10 19:48:16 +00:00
target_table_id.table_name = new_target_table_name;
}
2020-04-07 14:05:51 +00:00
IStorage::renameInMemory(new_table_id);
2021-05-19 18:53:31 +00:00
if (from_atomic_to_atomic_database && has_inner_table)
{
assert(target_table_id.database_name == old_table_id.database_name);
target_table_id.database_name = new_table_id.database_name;
}
2020-06-17 14:06:22 +00:00
const auto & select_query = metadata_snapshot->getSelectQuery();
// TODO Actually we don't need to update dependency if MV has UUID, but then db and table name will be outdated
DatabaseCatalog::instance().updateDependency(select_query.select_table_id, old_table_id, select_query.select_table_id, getStorageID());
}
void StorageMaterializedView::shutdown()
{
2020-06-17 14:06:22 +00:00
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & select_query = metadata_snapshot->getSelectQuery();
/// Make sure the dependency is removed after DETACH TABLE
if (!select_query.select_table_id.empty())
DatabaseCatalog::instance().removeDependency(select_query.select_table_id, getStorageID());
}
StoragePtr StorageMaterializedView::getTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().getTable(target_table_id, getContext());
}
StoragePtr StorageMaterializedView::tryGetTargetTable() const
{
checkStackSize();
return DatabaseCatalog::instance().tryGetTable(target_table_id, getContext());
}
2019-04-04 13:13:59 +00:00
Strings StorageMaterializedView::getDataPaths() const
{
if (auto table = tryGetTargetTable())
2019-04-04 13:13:59 +00:00
return table->getDataPaths();
return {};
}
ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type)
{
2021-05-31 13:38:33 +00:00
if (has_inner_table)
{
if (auto target_table = tryGetTargetTable())
return target_table->getActionLock(type);
}
return ActionLock{};
}
void registerStorageMaterializedView(StorageFactory & factory)
{
factory.registerStorage("MaterializedView", [](const StorageFactory::Arguments & args)
{
/// Pass local_context here to convey setting for inner table
return StorageMaterializedView::create(
args.table_id, args.getLocalContext(), args.query,
args.columns, args.attach);
});
}
}