Merge pull request #25064 from ucasfl/merge

Database argument for StorageMerge support regular expression.
This commit is contained in:
Kseniia Sumarokova 2021-07-05 14:14:44 +03:00 committed by GitHub
commit c72a1be0be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 588 additions and 128 deletions

View File

@ -103,6 +103,24 @@ ASTPtr evaluateConstantExpressionForDatabaseName(const ASTPtr & node, ContextPtr
return res;
}
std::tuple<bool, ASTPtr> evaluateDatabaseNameForMergeEngine(const ASTPtr & node, ContextPtr context)
{
if (const auto * func = node->as<ASTFunction>(); func && func->name == "REGEXP")
{
if (func->arguments->children.size() != 1)
throw Exception("Arguments for REGEXP in Merge ENGINE should be 1", ErrorCodes::BAD_ARGUMENTS);
auto * literal = func->arguments->children[0]->as<ASTLiteral>();
if (!literal || literal->value.safeGet<String>().empty())
throw Exception("Argument for REGEXP in Merge ENGINE should be a non empty String Literal", ErrorCodes::BAD_ARGUMENTS);
return std::tuple{true, func->arguments->children[0]};
}
auto ast = evaluateConstantExpressionForDatabaseName(node, context);
return std::tuple{false, ast};
}
namespace
{
using Conjunction = ColumnsWithTypeAndName;

View File

@ -53,4 +53,6 @@ ASTPtr evaluateConstantExpressionForDatabaseName(const ASTPtr & node, ContextPtr
*/
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr, size_t & limit);
// Evaluate database name or regexp for StorageMerge and TableFunction merge
std::tuple<bool, ASTPtr> evaluateDatabaseNameForMergeEngine(const ASTPtr & node, ContextPtr context);
}

View File

@ -47,13 +47,16 @@ StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const Strings & source_tables_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const DbToTableSetMap & source_databases_and_tables_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, source_database(source_database_)
, source_tables(std::in_place, source_tables_.begin(), source_tables_.end())
, source_database_regexp(source_database_name_or_regexp_)
, source_databases_and_tables(source_databases_and_tables_)
, source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -65,13 +68,16 @@ StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const String & source_table_regexp_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, source_database(source_database_)
, source_database_regexp(source_database_name_or_regexp_)
, source_table_regexp(source_table_regexp_)
, source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -82,15 +88,18 @@ StorageMerge::StorageMerge(
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
auto iterator = getDatabaseIterator(getContext());
auto database_table_iterators = getDatabaseIterators(getContext());
while (iterator->isValid())
for (auto & iterator : database_table_iterators)
{
const auto & table = iterator->table();
if (table.get() != this && predicate(table))
return table;
while (iterator->isValid())
{
const auto & table = iterator->table();
if (table.get() != this && predicate(table))
return table;
iterator->next();
iterator->next();
}
}
return {};
@ -112,7 +121,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, Cont
size_t i = 0;
for (const auto & table : selected_tables)
{
const auto & storage_ptr = std::get<0>(table);
const auto & storage_ptr = std::get<1>(table);
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
if (storage_ptr->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot))
return true;
@ -145,22 +154,25 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(local_context);
DatabaseTablesIterators database_table_iterators = getDatabaseIterators(local_context);
size_t selected_table_size = 0;
while (iterator->isValid())
for (const auto & iterator : database_table_iterators)
{
const auto & table = iterator->table();
if (table && table.get() != this)
while (iterator->isValid())
{
++selected_table_size;
stage_in_source_tables = std::max(
stage_in_source_tables,
table->getQueryProcessingStage(local_context, to_stage, table->getInMemoryMetadataPtr(), query_info));
}
const auto & table = iterator->table();
if (table && table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(
stage_in_source_tables,
table->getQueryProcessingStage(local_context, to_stage, table->getInMemoryMetadataPtr(), query_info));
}
iterator->next();
iterator->next();
}
}
return selected_table_size == 1 ? stage_in_source_tables : std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
@ -178,13 +190,16 @@ Pipe StorageMerge::read(
{
Pipes pipes;
bool has_database_virtual_column = false;
bool has_table_virtual_column = false;
Names real_column_names;
real_column_names.reserve(column_names.size());
for (const auto & column_name : column_names)
{
if (column_name == "_table" && isVirtualColumn(column_name, metadata_snapshot))
if (column_name == "_database" && isVirtualColumn(column_name, metadata_snapshot))
has_database_virtual_column = true;
else if (column_name == "_table" && isVirtualColumn(column_name, metadata_snapshot))
has_table_virtual_column = true;
else
real_column_names.push_back(column_name);
@ -202,12 +217,24 @@ Pipe StorageMerge::read(
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(local_context, query_info.query, has_table_virtual_column);
StorageListWithLocks selected_tables
= getSelectedTables(local_context, query_info.query, has_database_virtual_column, has_table_virtual_column);
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
return createSources(
{}, query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
{},
query_info,
processed_stage,
max_block_size,
header,
{},
{},
real_column_names,
modified_context,
0,
has_database_virtual_column,
has_table_virtual_column);
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
@ -220,7 +247,7 @@ Pipe StorageMerge::read(
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto storage_ptr = std::get<0>(*it);
auto storage_ptr = std::get<1>(*it);
auto storage_metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, local_context);
if (it == selected_tables.begin())
@ -245,7 +272,7 @@ Pipe StorageMerge::read(
remaining_streams -= current_streams;
current_streams = std::max(size_t(1), current_streams);
const auto & storage = std::get<0>(table);
const auto & storage = std::get<1>(table);
/// If sampling requested, then check that table supports it.
if (query_info.query->as<ASTSelectQuery>()->sampleSize() && !storage->supportsSampling())
@ -293,9 +320,18 @@ Pipe StorageMerge::read(
}
auto source_pipe = createSources(
storage_metadata_snapshot, query_info, processed_stage,
max_block_size, header, aliases, table, required_columns.empty() ? real_column_names : required_columns,
modified_context, current_streams, has_table_virtual_column);
storage_metadata_snapshot,
query_info,
processed_stage,
max_block_size,
header,
aliases,
table,
required_columns.empty() ? real_column_names : required_columns,
modified_context,
current_streams,
has_database_virtual_column,
has_table_virtual_column);
pipes.emplace_back(std::move(source_pipe));
}
@ -321,10 +357,11 @@ Pipe StorageMerge::createSources(
Names & real_column_names,
ContextMutablePtr modified_context,
size_t streams_num,
bool has_database_virtual_column,
bool has_table_virtual_column,
bool concat_streams)
{
const auto & [storage, struct_lock, table_name] = storage_with_lock;
const auto & [database_name, storage, struct_lock, table_name] = storage_with_lock;
SelectQueryInfo modified_query_info = query_info;
modified_query_info.query = query_info.query->clone();
@ -336,6 +373,7 @@ Pipe StorageMerge::createSources(
modified_query_info.syntax_analyzer_result = std::make_shared<TreeRewriterResult>(std::move(new_analyzer_res));
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", table_name);
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_database", database_name);
Pipe pipe;
@ -369,7 +407,7 @@ Pipe StorageMerge::createSources(
}
else if (processed_stage > storage_stage)
{
modified_select.replaceDatabaseAndTable(source_database, table_name);
modified_select.replaceDatabaseAndTable(database_name, table_name);
/// Maximum permissible parallelism is streams_num
modified_context->setSetting("max_threads", streams_num);
@ -394,6 +432,24 @@ Pipe StorageMerge::createSources(
// Using concat instead.
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
if (has_database_virtual_column)
{
ColumnWithTypeAndName column;
column.name = "_database";
column.type = std::make_shared<DataTypeString>();
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag),
ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions);
});
}
if (has_table_virtual_column)
{
ColumnWithTypeAndName column;
@ -425,41 +481,67 @@ Pipe StorageMerge::createSources(
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
ContextPtr query_context,
const ASTPtr & query /* = nullptr */,
bool filter_by_virtual_column /* = false */) const
ContextPtr query_context,
const ASTPtr & query /* = nullptr */,
bool filter_by_database_virtual_column /* = false */,
bool filter_by_table_virtual_column /* = false */) const
{
assert(!filter_by_virtual_column || query);
assert(!filter_by_database_virtual_column || !filter_by_table_virtual_column || query);
const Settings & settings = query_context->getSettingsRef();
StorageListWithLocks selected_tables;
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(getContext());
DatabaseTablesIterators database_table_iterators = getDatabaseIterators(getContext());
MutableColumnPtr database_name_virtual_column;
MutableColumnPtr table_name_virtual_column;
if (filter_by_virtual_column)
table_name_virtual_column = ColumnString::create();
while (iterator->isValid())
if (filter_by_database_virtual_column)
{
StoragePtr storage = iterator->table();
if (!storage)
continue;
if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere())
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
if (storage.get() != this)
{
auto table_lock = storage->lockForShare(query_context->getCurrentQueryId(), settings.lock_acquire_timeout);
selected_tables.emplace_back(storage, std::move(table_lock), iterator->name());
if (filter_by_virtual_column)
table_name_virtual_column->insert(iterator->name());
}
iterator->next();
database_name_virtual_column = ColumnString::create();
}
if (filter_by_virtual_column)
if (filter_by_table_virtual_column)
{
table_name_virtual_column = ColumnString::create();
}
for (const auto & iterator : database_table_iterators)
{
if (filter_by_database_virtual_column)
database_name_virtual_column->insert(iterator->databaseName());
while (iterator->isValid())
{
StoragePtr storage = iterator->table();
if (!storage)
continue;
if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere())
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
if (storage.get() != this)
{
auto table_lock = storage->lockForShare(query_context->getCurrentQueryId(), settings.lock_acquire_timeout);
selected_tables.emplace_back(iterator->databaseName(), storage, std::move(table_lock), iterator->name());
if (filter_by_table_virtual_column)
table_name_virtual_column->insert(iterator->name());
}
iterator->next();
}
}
if (filter_by_database_virtual_column)
{
/// Filter names of selected tables if there is a condition on "_database" virtual column in WHERE clause
Block virtual_columns_block
= Block{ColumnWithTypeAndName(std::move(database_name_virtual_column), std::make_shared<DataTypeString>(), "_database")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, query_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_database");
/// Remove unused databases from the list
selected_tables.remove_if([&](const auto & elem) { return values.find(std::get<0>(elem)) == values.end(); });
}
if (filter_by_table_virtual_column)
{
/// Filter names of selected tables if there is a condition on "_table" virtual column in WHERE clause
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(table_name_virtual_column), std::make_shared<DataTypeString>(), "_table")};
@ -467,13 +549,30 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list
selected_tables.remove_if([&] (const auto & elem) { return values.find(std::get<2>(elem)) == values.end(); });
selected_tables.remove_if([&](const auto & elem) { return values.find(std::get<3>(elem)) == values.end(); });
}
return selected_tables;
}
DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(ContextPtr local_context) const
DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const String & database_name, ContextPtr local_context) const
{
auto database = DatabaseCatalog::instance().getDatabase(database_name);
auto table_name_match = [this, &database_name](const String & table_name_) -> bool {
if (source_databases_and_tables)
{
const auto & source_tables = (*source_databases_and_tables).at(database_name);
return source_tables.count(table_name_);
}
else
return source_table_regexp->match(table_name_);
};
return database->getTablesIterator(local_context, table_name_match);
}
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr local_context) const
{
try
{
@ -485,17 +584,25 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(ContextPtr local_con
throw;
}
auto database = DatabaseCatalog::instance().getDatabase(source_database);
DatabaseTablesIterators database_table_iterators;
auto table_name_match = [this](const String & table_name_) -> bool
/// database_name argument is not a regexp
if (!database_is_regexp)
database_table_iterators.emplace_back(getDatabaseIterator(source_database_name_or_regexp, local_context));
/// database_name argument is a regexp
else
{
if (source_tables)
return source_tables->count(table_name_);
else
return source_table_regexp->match(table_name_);
};
auto databases = DatabaseCatalog::instance().getDatabases();
return database->getTablesIterator(local_context, table_name_match);
for (const auto & db : databases)
{
if (source_database_regexp->match(db.first))
database_table_iterators.emplace_back(getDatabaseIterator(db.first, local_context));
}
}
return database_table_iterators;
}
@ -632,19 +739,23 @@ void registerStorageMerge(StorageFactory & factory)
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionForDatabaseName(engine_args[0], args.getLocalContext());
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
auto [is_regexp, database_ast] = evaluateDatabaseNameForMergeEngine(engine_args[0], args.getLocalContext());
String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
if (!is_regexp)
engine_args[0] = database_ast;
String source_database_name_or_regexp = database_ast->as<ASTLiteral &>().value.safeGet<String>();
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageMerge::create(args.table_id, args.columns, args.comment, source_database, table_name_regexp, args.getContext());
return StorageMerge::create(
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, args.getContext());
});
}
NamesAndTypesList StorageMerge::getVirtuals() const
{
NamesAndTypesList virtuals{{"_table", std::make_shared<DataTypeString>()}};
NamesAndTypesList virtuals{{"_database", std::make_shared<DataTypeString>()}, {"_table", std::make_shared<DataTypeString>()}};
auto first_table = getFirstTable([](auto && table) { return table; });
if (first_table)

View File

@ -49,20 +49,32 @@ public:
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override;
private:
String source_database;
std::optional<std::unordered_set<String>> source_tables;
std::optional<OptimizedRegularExpression> source_table_regexp;
using DbToTableSetMap = std::map<String, std::set<String>>;
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
std::optional<OptimizedRegularExpression> source_database_regexp;
std::optional<OptimizedRegularExpression> source_table_regexp;
std::optional<DbToTableSetMap> source_databases_and_tables;
String source_database_name_or_regexp;
bool database_is_regexp = false;
/// (Database, Table, Lock, TableName)
using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
StorageMerge::StorageListWithLocks getSelectedTables(
ContextPtr query_context, const ASTPtr & query = nullptr, bool filter_by_virtual_column = false) const;
ContextPtr query_context,
const ASTPtr & query = nullptr,
bool filter_by_database_virtual_column = false,
bool filter_by_table_virtual_column = false) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;
DatabaseTablesIteratorPtr getDatabaseIterator(ContextPtr context) const;
DatabaseTablesIteratorPtr getDatabaseIterator(const String & database_name, ContextPtr context) const;
DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const;
NamesAndTypesList getVirtuals() const override;
ColumnSizeByName getColumnSizes() const override;
@ -72,15 +84,17 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const Strings & source_tables_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const DbToTableSetMap & source_databases_and_tables_,
ContextPtr context_);
StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const String & source_table_regexp_,
ContextPtr context_);
@ -104,6 +118,7 @@ protected:
Names & real_column_names,
ContextMutablePtr modified_context,
size_t streams_num,
bool has_database_virtual_column,
bool has_table_virtual_column,
bool concat_streams = false);

View File

@ -18,17 +18,20 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
}
namespace
{
[[noreturn]] void throwNoTablesMatchRegexp(const String & source_database, const String & source_table_regexp)
[[noreturn]] void throwNoTablesMatchRegexp(const String & source_database_regexp, const String & source_table_regexp)
{
throw Exception(
"Error while executing table function merge. In database " + source_database
+ " no one matches regular expression: " + source_table_regexp,
ErrorCodes::UNKNOWN_TABLE);
ErrorCodes::BAD_ARGUMENTS,
"Error while executing table function merge. Either there is no database, which matches regular expression `{}`, or there are "
"no tables in database matches `{}`, which fit tables expression: {}",
source_database_regexp,
source_database_regexp,
source_table_regexp);
}
}
@ -49,58 +52,72 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, ContextPtr
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[0] = evaluateConstantExpressionForDatabaseName(args[0], context);
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
auto [is_regexp, database_ast] = evaluateDatabaseNameForMergeEngine(args[0], context);
source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
database_is_regexp = is_regexp;
if (!is_regexp)
args[0] = database_ast;
source_database_name_or_regexp = database_ast->as<ASTLiteral &>().value.safeGet<String>();
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
source_table_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
const Strings & TableFunctionMerge::getSourceTables(ContextPtr context) const
const TableFunctionMerge::DbToTableSetMap & TableFunctionMerge::getSourceDatabasesAndTables(ContextPtr context) const
{
if (source_tables)
return *source_tables;
if (source_databases_and_tables)
return *source_databases_and_tables;
auto database = DatabaseCatalog::instance().getDatabase(source_database);
source_databases_and_tables.emplace();
OptimizedRegularExpression re(source_table_regexp);
auto table_name_match = [&](const String & table_name_) { return re.match(table_name_); };
auto access = context->getAccess();
bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, source_database);
bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, source_database);
source_tables.emplace();
for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next())
/// database_name is not a regexp
if (!database_is_regexp)
{
if (!it->table())
continue;
bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, source_database, it->name());
if (!granted_show)
continue;
if (!granted_select_on_all_tables)
access->checkAccess(AccessType::SELECT, source_database, it->name());
source_tables->emplace_back(it->name());
auto source_tables = getMatchedTablesWithAccess(source_database_name_or_regexp, source_table_regexp, context);
if (source_tables.empty())
throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp);
(*source_databases_and_tables)[source_database_name_or_regexp] = source_tables;
}
if (source_tables->empty())
throwNoTablesMatchRegexp(source_database, source_table_regexp);
/// database_name is a regexp
else
{
OptimizedRegularExpression database_re(source_database_name_or_regexp);
auto databases = DatabaseCatalog::instance().getDatabases();
return *source_tables;
for (const auto & db : databases)
{
if (database_re.match(db.first))
{
auto source_tables = getMatchedTablesWithAccess(db.first, source_table_regexp, context);
if (!source_tables.empty())
(*source_databases_and_tables)[db.first] = source_tables;
}
}
if ((*source_databases_and_tables).empty())
throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp);
}
return *source_databases_and_tables;
}
ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context) const
{
for (const auto & table_name : getSourceTables(context))
for (const auto & db_with_tables : getSourceDatabasesAndTables(context))
{
auto storage = DatabaseCatalog::instance().tryGetTable(StorageID{source_database, table_name}, context);
if (storage)
return ColumnsDescription{storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical()};
for (const auto & table : db_with_tables.second)
{
auto storage = DatabaseCatalog::instance().tryGetTable(StorageID{db_with_tables.first, table}, context);
if (storage)
return ColumnsDescription{storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical()};
}
}
throwNoTablesMatchRegexp(source_database, source_table_regexp);
throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp);
}
@ -110,14 +127,44 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, Cont
StorageID(getDatabaseName(), table_name),
getActualTableStructure(context),
String{},
source_database,
getSourceTables(context),
source_database_name_or_regexp,
database_is_regexp,
getSourceDatabasesAndTables(context),
context);
res->startup();
return res;
}
TableFunctionMerge::TableSet
TableFunctionMerge::getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context)
{
OptimizedRegularExpression table_re(table_regexp);
auto table_name_match = [&](const String & table_name) { return table_re.match(table_name); };
auto access = context->getAccess();
auto database = DatabaseCatalog::instance().getDatabase(database_name);
bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, database_name);
bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, database_name);
TableSet tables;
for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next())
{
if (!it->table())
continue;
bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, database_name, it->name());
if (!granted_show)
continue;
if (!granted_select_on_all_tables)
access->checkAccess(AccessType::SELECT, database_name, it->name());
tables.emplace(it->name());
}
return tables;
}
void registerTableFunctionMerge(TableFunctionFactory & factory)
{

View File

@ -15,17 +15,22 @@ class TableFunctionMerge : public ITableFunction
public:
static constexpr auto name = "merge";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Merge"; }
const Strings & getSourceTables(ContextPtr context) const;
using TableSet = std::set<String>;
using DbToTableSetMap = std::map<String, TableSet>;
const DbToTableSetMap & getSourceDatabasesAndTables(ContextPtr context) const;
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static TableSet getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context);
String source_database;
String source_database_name_or_regexp;
String source_table_regexp;
mutable std::optional<Strings> source_tables;
bool database_is_regexp = false;
mutable std::optional<DbToTableSetMap> source_databases_and_tables;
};

View File

@ -38,7 +38,7 @@ def test_merge():
assert "it's necessary to have grant CREATE TEMPORARY TABLE ON *.*" in instance.query_and_get_error(select_query, user = 'A')
instance.query("GRANT CREATE TEMPORARY TABLE ON *.* TO A")
assert "no one matches regular expression" in instance.query_and_get_error(select_query, user = 'A')
assert "no tables in database matches" in instance.query_and_get_error(select_query, user = 'A')
instance.query("GRANT SELECT ON default.table1 TO A")
assert instance.query(select_query, user = 'A') == "1\n"

View File

@ -0,0 +1,195 @@
CREATE TABLE t_merge as 01902_db.t ENGINE=Merge(REGEXP(^01902_db), ^t)
SELECT _database, _table, n FROM 01902_db.t_merge ORDER BY _database, _table, n
01902_db t 0
01902_db t 1
01902_db t 2
01902_db t 3
01902_db t 4
01902_db t 5
01902_db t 6
01902_db t 7
01902_db t 8
01902_db t 9
01902_db1 t1 0
01902_db1 t1 1
01902_db1 t1 2
01902_db1 t1 3
01902_db1 t1 4
01902_db1 t1 5
01902_db1 t1 6
01902_db1 t1 7
01902_db1 t1 8
01902_db1 t1 9
01902_db2 t2 0
01902_db2 t2 1
01902_db2 t2 2
01902_db2 t2 3
01902_db2 t2 4
01902_db2 t2 5
01902_db2 t2 6
01902_db2 t2 7
01902_db2 t2 8
01902_db2 t2 9
01902_db3 t3 0
01902_db3 t3 1
01902_db3 t3 2
01902_db3 t3 3
01902_db3 t3 4
01902_db3 t3 5
01902_db3 t3 6
01902_db3 t3 7
01902_db3 t3 8
01902_db3 t3 9
SHOW CREATE TABLE 01902_db.t_merge
CREATE TABLE `01902_db`.t_merge\n(\n `n` Int8\n)\nENGINE = Merge(REGEXP(\'^01902_db\'), \'^t\')
SELECT _database, _table, n FROM merge(REGEXP(^01902_db), ^t) ORDER BY _database, _table, n
01902_db t 0
01902_db t 1
01902_db t 2
01902_db t 3
01902_db t 4
01902_db t 5
01902_db t 6
01902_db t 7
01902_db t 8
01902_db t 9
01902_db t_merge 0
01902_db t_merge 0
01902_db t_merge 0
01902_db t_merge 0
01902_db t_merge 1
01902_db t_merge 1
01902_db t_merge 1
01902_db t_merge 1
01902_db t_merge 2
01902_db t_merge 2
01902_db t_merge 2
01902_db t_merge 2
01902_db t_merge 3
01902_db t_merge 3
01902_db t_merge 3
01902_db t_merge 3
01902_db t_merge 4
01902_db t_merge 4
01902_db t_merge 4
01902_db t_merge 4
01902_db t_merge 5
01902_db t_merge 5
01902_db t_merge 5
01902_db t_merge 5
01902_db t_merge 6
01902_db t_merge 6
01902_db t_merge 6
01902_db t_merge 6
01902_db t_merge 7
01902_db t_merge 7
01902_db t_merge 7
01902_db t_merge 7
01902_db t_merge 8
01902_db t_merge 8
01902_db t_merge 8
01902_db t_merge 8
01902_db t_merge 9
01902_db t_merge 9
01902_db t_merge 9
01902_db t_merge 9
01902_db1 t1 0
01902_db1 t1 1
01902_db1 t1 2
01902_db1 t1 3
01902_db1 t1 4
01902_db1 t1 5
01902_db1 t1 6
01902_db1 t1 7
01902_db1 t1 8
01902_db1 t1 9
01902_db2 t2 0
01902_db2 t2 1
01902_db2 t2 2
01902_db2 t2 3
01902_db2 t2 4
01902_db2 t2 5
01902_db2 t2 6
01902_db2 t2 7
01902_db2 t2 8
01902_db2 t2 9
01902_db3 t3 0
01902_db3 t3 1
01902_db3 t3 2
01902_db3 t3 3
01902_db3 t3 4
01902_db3 t3 5
01902_db3 t3 6
01902_db3 t3 7
01902_db3 t3 8
01902_db3 t3 9
SELECT _database, _table, n FROM 01902_db.t_merge WHERE _database = 01902_db1 ORDER BY _database, _table, n
01902_db1 t1 0
01902_db1 t1 1
01902_db1 t1 2
01902_db1 t1 3
01902_db1 t1 4
01902_db1 t1 5
01902_db1 t1 6
01902_db1 t1 7
01902_db1 t1 8
01902_db1 t1 9
SELECT _database, _table, n FROM 01902_db.t_merge WHERE _table = t1 ORDER BY _database, _table, n
01902_db1 t1 0
01902_db1 t1 1
01902_db1 t1 2
01902_db1 t1 3
01902_db1 t1 4
01902_db1 t1 5
01902_db1 t1 6
01902_db1 t1 7
01902_db1 t1 8
01902_db1 t1 9
CREATE TABLE t_merge1 as 01902_db.t ENGINE=Merge(01902_db, ^t$)
SELECT _database, _table, n FROM 01902_db.t_merge1 ORDER BY _database, _table, n
01902_db t 0
01902_db t 1
01902_db t 2
01902_db t 3
01902_db t 4
01902_db t 5
01902_db t 6
01902_db t 7
01902_db t 8
01902_db t 9
SELECT _database, _table, n FROM merge(01902_db, ^t$) ORDER BY _database, _table, n
01902_db t 0
01902_db t 1
01902_db t 2
01902_db t 3
01902_db t 4
01902_db t 5
01902_db t 6
01902_db t 7
01902_db t 8
01902_db t 9
CREATE TABLE t_merge_1 as 01902_db.t ENGINE=Merge(currentDatabase(), ^t)
SELECT _database, _table, n FROM 01902_db.t_merge_1 ORDER BY _database, _table, n
01902_db1 t1 0
01902_db1 t1 1
01902_db1 t1 2
01902_db1 t1 3
01902_db1 t1 4
01902_db1 t1 5
01902_db1 t1 6
01902_db1 t1 7
01902_db1 t1 8
01902_db1 t1 9
SHOW CREATE TABLE 01902_db.t_merge_1
CREATE TABLE `01902_db`.t_merge_1\n(\n `n` Int8\n)\nENGINE = Merge(\'01902_db1\', \'^t\')
SELECT _database, _table, n FROM merge(currentDatabase(), ^t) ORDER BY _database, _table, n
01902_db1 t1 0
01902_db1 t1 1
01902_db1 t1 2
01902_db1 t1 3
01902_db1 t1 4
01902_db1 t1 5
01902_db1 t1 6
01902_db1 t1 7
01902_db1 t1 8
01902_db1 t1 9

View File

@ -0,0 +1,66 @@
DROP DATABASE IF EXISTS 01902_db;
DROP DATABASE IF EXISTS 01902_db1;
DROP DATABASE IF EXISTS 01902_db2;
DROP DATABASE IF EXISTS 01902_db3;
CREATE DATABASE 01902_db;
CREATE DATABASE 01902_db1;
CREATE DATABASE 01902_db2;
CREATE DATABASE 01902_db3;
CREATE TABLE 01902_db.t (n Int8) ENGINE=MergeTree ORDER BY n;
CREATE TABLE 01902_db1.t1 (n Int8) ENGINE=MergeTree ORDER BY n;
CREATE TABLE 01902_db2.t2 (n Int8) ENGINE=MergeTree ORDER BY n;
CREATE TABLE 01902_db3.t3 (n Int8) ENGINE=MergeTree ORDER BY n;
INSERT INTO 01902_db.t SELECT * FROM numbers(10);
INSERT INTO 01902_db1.t1 SELECT * FROM numbers(10);
INSERT INTO 01902_db2.t2 SELECT * FROM numbers(10);
INSERT INTO 01902_db3.t3 SELECT * FROM numbers(10);
SELECT 'CREATE TABLE t_merge as 01902_db.t ENGINE=Merge(REGEXP(^01902_db), ^t)';
CREATE TABLE 01902_db.t_merge as 01902_db.t ENGINE=Merge(REGEXP('^01902_db'), '^t');
SELECT 'SELECT _database, _table, n FROM 01902_db.t_merge ORDER BY _database, _table, n';
SELECT _database, _table, n FROM 01902_db.t_merge ORDER BY _database, _table, n;
SELECT 'SHOW CREATE TABLE 01902_db.t_merge';
SHOW CREATE TABLE 01902_db.t_merge;
SELECT 'SELECT _database, _table, n FROM merge(REGEXP(^01902_db), ^t) ORDER BY _database, _table, n';
SELECT _database, _table, n FROM merge(REGEXP('^01902_db'), '^t') ORDER BY _database, _table, n;
SELECT 'SELECT _database, _table, n FROM 01902_db.t_merge WHERE _database = 01902_db1 ORDER BY _database, _table, n';
SELECT _database, _table, n FROM 01902_db.t_merge WHERE _database = '01902_db1' ORDER BY _database, _table, n;
SELECT 'SELECT _database, _table, n FROM 01902_db.t_merge WHERE _table = t1 ORDER BY _database, _table, n';
SELECT _database, _table, n FROM 01902_db.t_merge WHERE _table = 't1' ORDER BY _database, _table, n;
-- not regexp
SELECT 'CREATE TABLE t_merge1 as 01902_db.t ENGINE=Merge(01902_db, ^t$)';
CREATE TABLE 01902_db.t_merge1 as 01902_db.t ENGINE=Merge('01902_db', '^t$');
SELECT 'SELECT _database, _table, n FROM 01902_db.t_merge1 ORDER BY _database, _table, n';
SELECT _database, _table, n FROM 01902_db.t_merge1 ORDER BY _database, _table, n;
SELECT 'SELECT _database, _table, n FROM merge(01902_db, ^t$) ORDER BY _database, _table, n';
SELECT _database, _table, n FROM merge('01902_db', '^t$') ORDER BY _database, _table, n;
USE 01902_db1;
SELECT 'CREATE TABLE t_merge_1 as 01902_db.t ENGINE=Merge(currentDatabase(), ^t)';
CREATE TABLE 01902_db.t_merge_1 as 01902_db.t ENGINE=Merge(currentDatabase(), '^t');
SELECT 'SELECT _database, _table, n FROM 01902_db.t_merge_1 ORDER BY _database, _table, n';
SELECT _database, _table, n FROM 01902_db.t_merge_1 ORDER BY _database, _table, n;
SELECT 'SHOW CREATE TABLE 01902_db.t_merge_1';
SHOW CREATE TABLE 01902_db.t_merge_1;
SELECT 'SELECT _database, _table, n FROM merge(currentDatabase(), ^t) ORDER BY _database, _table, n';
SELECT _database, _table, n FROM merge(currentDatabase(), '^t') ORDER BY _database, _table, n;
DROP DATABASE 01902_db;
DROP DATABASE 01902_db1;
DROP DATABASE 01902_db2;
DROP DATABASE 01902_db3;

View File

@ -511,6 +511,7 @@
"01914_exchange_dictionaries",
"01915_create_or_replace_dictionary",
"01925_test_storage_merge_aliases",
"01933_client_replxx_convert_history" /// Uses non unique history file
"01933_client_replxx_convert_history", /// Uses non unique history file
"01902_table_function_merge_db_repr"
]
}