Merge pull request #21910 from kitaisreal/table-function-dictionary

Added table function dictionary
This commit is contained in:
Maksim Kita 2021-03-22 16:08:56 +03:00 committed by GitHub
commit 8be263062d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 255 additions and 106 deletions

View File

@ -75,27 +75,11 @@ namespace ErrorCodes
class FunctionDictHelper
{
public:
explicit FunctionDictHelper(const Context & context_) : context(context_), external_loader(context.getExternalDictionariesLoader()) {}
explicit FunctionDictHelper(const Context & context_) : context(context_) {}
std::shared_ptr<const IDictionaryBase> getDictionary(const String & dictionary_name)
{
String resolved_name = DatabaseCatalog::instance().resolveDictionaryName(dictionary_name);
bool can_load_dictionary = external_loader.hasDictionary(resolved_name);
if (!can_load_dictionary)
{
/// If dictionary not found. And database was not implicitly specified
/// we can qualify dictionary name with current database name.
/// It will help if dictionary is created with DDL and is in current database.
if (dictionary_name.find('.') == std::string::npos)
{
String dictionary_name_with_database = context.getCurrentDatabase() + '.' + dictionary_name;
resolved_name = DatabaseCatalog::instance().resolveDictionaryName(dictionary_name_with_database);
}
}
auto dict = external_loader.getDictionary(resolved_name);
auto dict = context.getExternalDictionariesLoader().getDictionary(dictionary_name, context);
if (!access_checked)
{
@ -134,31 +118,11 @@ public:
DictionaryStructure getDictionaryStructure(const String & dictionary_name) const
{
String resolved_name = DatabaseCatalog::instance().resolveDictionaryName(dictionary_name);
auto load_result = external_loader.getLoadResult(resolved_name);
if (!load_result.config)
{
/// If dictionary not found. And database was not implicitly specified
/// we can qualify dictionary name with current database name.
/// It will help if dictionary is created with DDL and is in current database.
if (dictionary_name.find('.') == std::string::npos)
{
String dictionary_name_with_database = context.getCurrentDatabase() + '.' + dictionary_name;
resolved_name = DatabaseCatalog::instance().resolveDictionaryName(dictionary_name_with_database);
load_result = external_loader.getLoadResult(resolved_name);
}
}
if (!load_result.config)
throw Exception("Dictionary " + backQuote(dictionary_name) + " not found", ErrorCodes::BAD_ARGUMENTS);
return ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
return context.getExternalDictionariesLoader().getDictionaryStructure(dictionary_name, context);
}
const Context & context;
private:
const ExternalDictionariesLoader & external_loader;
const Context & context;
/// Access cannot be not granted, since in this case checkAccess() will throw and access_checked will not be updated.
std::atomic<bool> access_checked = false;

View File

@ -910,31 +910,6 @@ String DatabaseCatalog::getPathForUUID(const UUID & uuid)
return toString(uuid).substr(0, uuid_prefix_len) + '/' + toString(uuid) + '/';
}
String DatabaseCatalog::resolveDictionaryName(const String & name) const
{
/// If it's dictionary from Atomic database, then we need to convert qualified name to UUID.
/// Try to split name and get id from associated StorageDictionary.
/// If something went wrong, return name as is.
/// TODO support dot in name for dictionaries in Atomic databases
auto pos = name.find('.');
if (pos == std::string::npos || name.find('.', pos + 1) != std::string::npos)
return name;
String maybe_database_name = name.substr(0, pos);
String maybe_table_name = name.substr(pos + 1);
auto db_and_table = tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, global_context);
if (!db_and_table.first)
return name;
assert(db_and_table.second);
if (db_and_table.first->getUUID() == UUIDHelpers::Nil)
return name;
if (db_and_table.second->getName() != "Dictionary")
return name;
return toString(db_and_table.second->getStorageID().uuid);
}
void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
{
if (uuid == UUIDHelpers::Nil)

View File

@ -192,9 +192,6 @@ public:
String getPathForDroppedMetadata(const StorageID & table_id) const;
void enqueueDroppedTableCleanup(StorageID table_id, StoragePtr table, String dropped_metadata_path, bool ignore_delay = false);
/// Try convert qualified dictionary name to persistent UUID
String resolveDictionaryName(const String & name) const;
void waitTableFinallyDropped(const UUID & uuid);
private:

View File

@ -758,8 +758,8 @@ static bool allowDictJoin(StoragePtr joined_storage, const Context & context, St
if (!dict)
return false;
dict_name = dict->resolvedDictionaryName();
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dict_name);
dict_name = dict->dictionaryName();
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dict_name, context);
if (!dictionary)
return false;

View File

@ -1,6 +1,10 @@
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
@ -13,10 +17,15 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/// Must not acquire Context lock in constructor to avoid possibility of deadlocks.
ExternalDictionariesLoader::ExternalDictionariesLoader(Context & context_)
ExternalDictionariesLoader::ExternalDictionariesLoader(Context & global_context_)
: ExternalLoader("external dictionary", &Poco::Logger::get("ExternalDictionariesLoader"))
, context(context_)
, global_context(global_context_)
{
setConfigSettings({"dictionary", "name", "database", "uuid"});
enableAsyncLoading(true);
@ -31,9 +40,88 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
/// For dictionaries from databases (created with DDL queries) we have to perform
/// additional checks, so we identify them here.
bool dictionary_from_database = !repository_name.empty();
return DictionaryFactory::instance().create(name, config, key_in_config, context, dictionary_from_database);
return DictionaryFactory::instance().create(name, config, key_in_config, global_context, dictionary_from_database);
}
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::getDictionary(const std::string & dictionary_name, const Context & context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, context.getCurrentDatabase());
return std::static_pointer_cast<const IDictionaryBase>(load(resolved_dictionary_name));
}
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::tryGetDictionary(const std::string & dictionary_name, const Context & context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, context.getCurrentDatabase());
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(resolved_dictionary_name));
}
void ExternalDictionariesLoader::reloadDictionary(const std::string & dictionary_name, const Context & context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, context.getCurrentDatabase());
loadOrReload(resolved_dictionary_name);
}
DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const std::string & dictionary_name, const Context & query_context) const
{
std::string resolved_name = resolveDictionaryName(dictionary_name, query_context.getCurrentDatabase());
auto load_result = getLoadResult(resolved_name);
if (!load_result.config)
throw Exception("Dictionary " + backQuote(dictionary_name) + " config not found", ErrorCodes::BAD_ARGUMENTS);
return ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
}
std::string ExternalDictionariesLoader::resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const
{
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name);
bool has_dictionary = has(resolved_name);
if (!has_dictionary)
{
/// If dictionary not found. And database was not implicitly specified
/// we can qualify dictionary name with current database name.
/// It will help if dictionary is created with DDL and is in current database.
if (dictionary_name.find('.') == std::string::npos)
{
String dictionary_name_with_database = current_database_name + '.' + dictionary_name;
resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name_with_database);
has_dictionary = has(resolved_name);
}
}
if (!has_dictionary)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Dictionary ({}) not found", backQuote(dictionary_name));
return resolved_name;
}
std::string ExternalDictionariesLoader::resolveDictionaryNameFromDatabaseCatalog(const std::string & name) const
{
/// If it's dictionary from Atomic database, then we need to convert qualified name to UUID.
/// Try to split name and get id from associated StorageDictionary.
/// If something went wrong, return name as is.
auto pos = name.find('.');
if (pos == std::string::npos || name.find('.', pos + 1) != std::string::npos)
return name;
std::string maybe_database_name = name.substr(0, pos);
std::string maybe_table_name = name.substr(pos + 1);
auto [db, table] = DatabaseCatalog::instance().tryGetDatabaseAndTable({maybe_database_name, maybe_table_name}, global_context);
if (!db)
return name;
assert(table);
if (db->getUUID() == UUIDHelpers::Nil)
return name;
if (table->getName() != "Dictionary")
return name;
return toString(table->getStorageID().uuid);
}
DictionaryStructure
ExternalDictionariesLoader::getDictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config)

View File

@ -1,9 +1,11 @@
#pragma once
#include <Dictionaries/IDictionary.h>
#include <Interpreters/ExternalLoader.h>
#include <memory>
#include <Common/quoteString.h>
#include <Interpreters/ExternalLoader.h>
#include <Dictionaries/IDictionary.h>
namespace DB
{
class Context;
@ -16,24 +18,18 @@ public:
using DictPtr = std::shared_ptr<const IDictionaryBase>;
/// Dictionaries will be loaded immediately and then will be updated in separate thread, each 'reload_period' seconds.
explicit ExternalDictionariesLoader(Context & context_);
explicit ExternalDictionariesLoader(Context & global_context_);
DictPtr getDictionary(const std::string & name) const
{
return std::static_pointer_cast<const IDictionaryBase>(load(name));
}
DictPtr getDictionary(const std::string & dictionary_name, const Context & context) const;
DictPtr tryGetDictionary(const std::string & name) const
{
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(name));
}
DictPtr tryGetDictionary(const std::string & dictionary_name, const Context & context) const;
bool hasDictionary(const std::string & name) const
{
return has(name);
}
void reloadDictionary(const std::string & dictionary_name, const Context & context) const;
DictionaryStructure getDictionaryStructure(const std::string & dictionary_name, const Context & context) const;
static DictionaryStructure getDictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config = "dictionary");
static DictionaryStructure getDictionaryStructure(const ObjectConfig & config);
static void resetAll();
@ -42,11 +38,16 @@ protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config, const std::string & repository_name) const override;
std::string resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const;
/// Try convert qualified dictionary name to persistent UUID
std::string resolveDictionaryNameFromDatabaseCatalog(const std::string & name) const;
friend class StorageSystemDictionaries;
friend class DatabaseDictionary;
private:
Context & context;
Context & global_context;
};
}

View File

@ -278,8 +278,10 @@ BlockIO InterpreterSystemQuery::execute()
case Type::RELOAD_DICTIONARY:
{
context.checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
system_context.getExternalDictionariesLoader().loadOrReload(
DatabaseCatalog::instance().resolveDictionaryName(query.target_dictionary));
auto & external_dictionaries_loader = system_context.getExternalDictionariesLoader();
external_dictionaries_loader.reloadDictionary(query.target_dictionary, context);
ExternalDictionariesLoader::resetAll();
break;
}

View File

@ -89,7 +89,7 @@ struct StorageID
const String & config_prefix);
/// If dictionary has UUID, then use it as dictionary name in ExternalLoader to allow dictionary renaming.
/// DatabaseCatalog::resolveDictionaryName(...) should be used to access such dictionaries by name.
/// ExternalDictnariesLoader::resolveDictionaryName(...) should be used to access such dictionaries by name.
String getInternalDictionaryName() const;
private:

View File

@ -135,8 +135,7 @@ void optimizeGroupBy(ASTSelectQuery * select_query, const NameSet & source_colum
const auto & dict_name = dict_name_ast->value.safeGet<String>();
const auto & attr_name = attr_name_ast->value.safeGet<String>();
String resolved_name = DatabaseCatalog::instance().resolveDictionaryName(dict_name);
const auto & dict_ptr = context.getExternalDictionariesLoader().getDictionary(resolved_name);
const auto & dict_ptr = context.getExternalDictionariesLoader().getDictionary(dict_name, context);
if (!dict_ptr->isInjective(attr_name))
{
++i;

View File

@ -89,13 +89,6 @@ String StorageDictionary::generateNamesAndTypesDescription(const NamesAndTypesLi
return ss.str();
}
String StorageDictionary::resolvedDictionaryName() const
{
if (location == Location::SameDatabaseAndNameAsDictionary)
return dictionary_name;
return DatabaseCatalog::instance().resolveDictionaryName(dictionary_name);
}
StorageDictionary::StorageDictionary(
const StorageID & table_id_,
const String & dictionary_name_,
@ -140,7 +133,7 @@ Pipe StorageDictionary::read(
const size_t max_block_size,
const unsigned /*threads*/)
{
auto dictionary = context.getExternalDictionariesLoader().getDictionary(resolvedDictionaryName());
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name, context);
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
/// TODO: update dictionary interface for processors.
return Pipe(std::make_shared<SourceFromInputStream>(stream));
@ -160,8 +153,8 @@ void registerStorageDictionary(StorageFactory & factory)
if (!args.attach)
{
auto resolved = DatabaseCatalog::instance().resolveDictionaryName(dictionary_name);
const auto & dictionary = args.context.getExternalDictionariesLoader().getDictionary(resolved);
const auto & context = args.context;
const auto & dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name, context);
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
checkNamesAndTypesCompatibleWithDictionary(dictionary_name, args.columns, dictionary_structure);
}

View File

@ -7,10 +7,12 @@
namespace DB
{
struct DictionaryStructure;
class TableFunctionDictionary;
class StorageDictionary final : public ext::shared_ptr_helper<StorageDictionary>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageDictionary>;
friend class TableFunctionDictionary;
public:
std::string getName() const override { return "Dictionary"; }
@ -30,7 +32,6 @@ public:
static String generateNamesAndTypesDescription(const NamesAndTypesList & list);
const String & dictionaryName() const { return dictionary_name; }
String resolvedDictionaryName() const;
/// Specifies where the table is located relative to the dictionary.
enum class Location

View File

@ -55,7 +55,7 @@ public:
/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_ = {}) const;
virtual ~ITableFunction() {}
virtual ~ITableFunction() = default;
private:
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;

View File

@ -0,0 +1,63 @@
#include <TableFunctions/TableFunctionDictionary.h>
#include <Parsers/ASTLiteral.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/StorageDictionary.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void TableFunctionDictionary::parseArguments(const ASTPtr & ast_function, const Context & context)
{
// Parse args
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function ({}) must have arguments.", quoteString(getName()));
ASTs & args = args_func.at(0)->children;
if (args.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function ({}) requires 1 arguments", quoteString(getName()));
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
dictionary_name = args[0]->as<ASTLiteral &>().value.safeGet<String>();
}
ColumnsDescription TableFunctionDictionary::getActualTableStructure(const Context & context) const
{
const ExternalDictionariesLoader & external_loader = context.getExternalDictionariesLoader();
auto dictionary_structure = external_loader.getDictionaryStructure(dictionary_name, context);
auto result = ColumnsDescription(StorageDictionary::getNamesAndTypes(dictionary_structure));
return result;
}
StoragePtr TableFunctionDictionary::executeImpl(
const ASTPtr &, const Context & context, const std::string & table_name, ColumnsDescription) const
{
StorageID dict_id(getDatabaseName(), table_name);
auto dictionary_table_structure = getActualTableStructure(context);
return StorageDictionary::create(dict_id, dictionary_name, std::move(dictionary_table_structure), StorageDictionary::Location::Custom);
}
void registerTableFunctionDictionary(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDictionary>();
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
namespace DB
{
class Context;
/* file(path, format, structure) - creates a temporary storage from file
*
* The file must be in the clickhouse data directory.
* The relative path begins with the clickhouse data directory.
*/
class TableFunctionDictionary final : public ITableFunction
{
public:
static constexpr auto name = "dictionary";
std::string getName() const override
{
return name;
}
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
ColumnsDescription getActualTableStructure(const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription) const override;
const char * getStorageTypeName() const override { return "Dictionary"; }
private:
String dictionary_name;
ColumnsDescription dictionary_columns;
};}

View File

@ -40,6 +40,8 @@ void registerTableFunctions()
#if USE_LIBPQXX
registerTableFunctionPostgreSQL(factory);
#endif
registerTableFunctionDictionary(factory);
}
}

View File

@ -41,6 +41,8 @@ void registerTableFunctionMySQL(TableFunctionFactory & factory);
void registerTableFunctionPostgreSQL(TableFunctionFactory & factory);
#endif
void registerTableFunctionDictionary(TableFunctionFactory & factory);
void registerTableFunctions();
}

View File

@ -12,6 +12,7 @@ SRCS(
ITableFunction.cpp
ITableFunctionFileLike.cpp
ITableFunctionXDBC.cpp
TableFunctionDictionary.cpp
TableFunctionFactory.cpp
TableFunctionFile.cpp
TableFunctionGenerateRandom.cpp

View File

@ -0,0 +1,2 @@
0 0
1 1

View File

@ -0,0 +1,25 @@
DROP TABLE IF EXISTS table_function_dictionary_source_table;
CREATE TABLE table_function_dictionary_source_table
(
id UInt64,
value UInt64
)
ENGINE = TinyLog;
INSERT INTO table_function_dictionary_source_table VALUES (0, 0);
INSERT INTO table_function_dictionary_source_table VALUES (1, 1);
DROP DICTIONARY IF EXISTS table_function_dictionary_test_dictionary;
CREATE DICTIONARY table_function_dictionary_test_dictionary
(
id UInt64,
value UInt64 DEFAULT 0
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_function_dictionary_source_table'))
LAYOUT(DIRECT());
SELECT * FROM dictionary('table_function_dictionary_test_dictionary');
DROP TABLE table_function_dictionary_source_table;
DROP DICTIONARY table_function_dictionary_test_dictionary;