This commit is contained in:
feng lv 2021-06-04 14:48:48 +00:00
parent c0ef021c6a
commit 32c45d14ab
4 changed files with 24 additions and 27 deletions

View File

@ -103,13 +103,9 @@ StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const Strings & source_tables_,
const std::unordered_map<String, std::unordered_set<String>> & source_databases_and_tables_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, source_database(source_database_)
, source_tables(std::in_place, source_tables_.begin(), source_tables_.end())
: IStorage(table_id_), WithContext(context_->getGlobalContext()), source_databases_and_tables(source_databases_and_tables_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -121,12 +117,12 @@ StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const String & source_database_regexp_,
const String & source_table_regexp_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, source_database(source_database_)
, source_database_regexp(source_database_regexp_)
, source_table_regexp(source_table_regexp_)
{
StorageInMemoryMetadata storage_metadata;
@ -624,10 +620,11 @@ void registerStorageMerge(StorageFactory & factory)
engine_args[0] = evaluateConstantExpressionForDatabaseName(engine_args[0], args.getLocalContext());
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
String source_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String source_database_regexp = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String table_name_regexp = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageMerge::create(args.table_id, args.columns, args.comment, source_database, table_name_regexp, args.getContext());
return StorageMerge::create(
args.table_id, args.columns, args.comment, source_database_regexp, table_name_regexp, args.getContext());
});
}

View File

@ -49,9 +49,9 @@ public:
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override;
private:
String source_database;
std::optional<std::unordered_set<String>> source_tables;
std::optional<OptimizedRegularExpression> source_database_regexp;
std::optional<OptimizedRegularExpression> source_table_regexp;
std::optional<std::unordered_map<String, std::unordered_set<String>>> source_databases_and_tables;
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
@ -72,15 +72,14 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const Strings & source_tables_,
const std::unordered_map<String, std::unordered_set<String>> & source_databases_and_tables_,
ContextPtr context_);
StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & comment,
const String & source_database_,
const String & source_database_regexp_,
const String & source_table_regexp_,
ContextPtr context_);

View File

@ -52,20 +52,22 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, ContextPtr
args[0] = evaluateConstantExpressionForDatabaseName(args[0], context);
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
source_database_regexp = args[0]->as<ASTLiteral &>().value.safeGet<String>();
source_table_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
const Strings & TableFunctionMerge::getSourceTables(ContextPtr context) const
const std::unordered_map<String, std::unordered_set<String>> & TableFunctionMerge::getSourceDatabasesAndTables(ContextPtr context) const
{
if (source_tables)
return *source_tables;
if (source_databases_and_tables)
return *source_databases_and_tables;
auto database = DatabaseCatalog::instance().getDatabase(source_database);
// auto database = DatabaseCatalog::instance().getDatabase(source_database);
OptimizedRegularExpression re(source_table_regexp);
auto table_name_match = [&](const String & table_name_) { return re.match(table_name_); };
OptimizedRegularExpression database_re(source_database_regexp);
OptimizedRegularExpression table_re(source_table_regexp);
auto database_name_match = [&](const String & database_name_) { return database_re.match(database_name_); };
auto table_name_match = [&](const String & table_name_) { return table_re.match(table_name_); };
auto access = context->getAccess();
bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, source_database);
@ -110,8 +112,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, Cont
StorageID(getDatabaseName(), table_name),
getActualTableStructure(context),
String{},
source_database,
getSourceTables(context),
getSourceDatabasesAndTables(context),
context);
res->startup();

View File

@ -19,13 +19,13 @@ private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Merge"; }
const Strings & getSourceTables(ContextPtr context) const;
const std::unordered_map<String, std::unordered_set<String>> & getSourceDatabasesAndTables(ContextPtr context) const;
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
String source_database;
String source_database_regexp;
String source_table_regexp;
mutable std::optional<Strings> source_tables;
mutable std::optional<std::unordered_map<String, std::unordered_set<String>>> source_databases_and_tables;
};