Merge pull request #914 from yandex/database-dictionary

DatabaseDictionary
This commit is contained in:
alexey-milovidov 2017-06-25 22:02:45 +03:00 committed by GitHub
commit e122cf0bcf
10 changed files with 346 additions and 87 deletions

View File

@ -0,0 +1,159 @@
#include <Databases/DatabaseDictionary.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Storages/StorageDictionary.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_ALREADY_EXISTS;
extern const int UNKNOWN_TABLE;
extern const int LOGICAL_ERROR;
}
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context)
: name(name_),
external_dictionaries(context.getExternalDictionaries()),
log(&Logger::get("DatabaseDictionary(" + name + ")"))
{
}
void DatabaseDictionary::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag)
{
}
Tables DatabaseDictionary::loadTables()
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
Tables tables;
for (const auto & pair : external_dictionaries.dictionaries)
{
const std::string & name = pair.first;
if (deleted_tables.count(name))
continue;
auto dict_ptr = pair.second.dict;
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->get()->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[name] = StorageDictionary::create(name, columns, {}, {}, {}, dictionary_structure, name);
}
}
return tables;
}
bool DatabaseDictionary::isTableExist(const String & table_name) const
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
return external_dictionaries.dictionaries.count(table_name) && !deleted_tables.count(table_name);
}
StoragePtr DatabaseDictionary::tryGetTable(const String & table_name)
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
if (deleted_tables.count(table_name))
return {};
{
auto it = external_dictionaries.dictionaries.find(table_name);
if (it != external_dictionaries.dictionaries.end())
{
const auto & dict_ptr = it->second.dict;
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->get()->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
return StorageDictionary::create(table_name, columns, {}, {}, {}, dictionary_structure, table_name);
}
}
}
return {};
}
DatabaseIteratorPtr DatabaseDictionary::getIterator()
{
return std::make_unique<DatabaseSnaphotIterator>(loadTables());
}
bool DatabaseDictionary::empty() const
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
for (const auto & pair : external_dictionaries.dictionaries)
if (pair.second.dict && !deleted_tables.count(pair.first))
return false;
return true;
}
StoragePtr DatabaseDictionary::detachTable(const String & table_name)
{
throw Exception("DatabaseDictionary: detachTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::attachTable(const String & table_name, const StoragePtr & table)
{
throw Exception("DatabaseDictionary: attachTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::createTable(const String & table_name,
const StoragePtr & table,
const ASTPtr & query,
const String & engine,
const Settings & settings)
{
throw Exception("DatabaseDictionary: createTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::removeTable(const String & table_name)
{
if (!isTableExist(table_name))
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
deleted_tables.insert(table_name);
}
void DatabaseDictionary::renameTable(const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
const Settings & settings)
{
throw Exception("DatabaseDictionary: renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
time_t DatabaseDictionary::getTableMetadataModificationTime(const String & table_name)
{
return static_cast<time_t>(0);
}
ASTPtr DatabaseDictionary::getCreateQuery(const String & table_name) const
{
throw Exception("DatabaseDictionary: getCreateQuery() is not supported", ErrorCodes::NOT_IMPLEMENTED);
return nullptr;
}
void DatabaseDictionary::shutdown()
{
}
void DatabaseDictionary::drop()
{
/// Additional actions to delete database are not required.
}
void DatabaseDictionary::alterTable(const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier)
{
throw Exception("DatabaseDictionary: alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -0,0 +1,83 @@
#pragma once
#include <mutex>
#include <unordered_set>
#include <Databases/DatabasesCommon.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class ExternalDictionaries;
/* Database to store StorageDictionary tables
* automatically creates tables for all dictionaries
*/
class DatabaseDictionary : public IDatabase
{
private:
const String name;
mutable std::mutex mutex;
const ExternalDictionaries & external_dictionaries;
std::unordered_set<String> deleted_tables;
Poco::Logger * log;
Tables loadTables();
public:
DatabaseDictionary(const String & name_, const Context & context);
String getEngineName() const override
{
return "Dictionary";
}
void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override;
bool isTableExist(const String & table_name) const override;
StoragePtr tryGetTable(const String & table_name) override;
DatabaseIteratorPtr getIterator() override;
bool empty() const override;
void createTable(const String & table_name,
const StoragePtr & table,
const ASTPtr & query,
const String & engine,
const Settings & settings) override;
void removeTable(const String & table_name) override;
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & table_name) override;
void renameTable(const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
const Settings & settings) override;
time_t getTableMetadataModificationTime(const String & table_name) override;
ASTPtr getCreateQuery(const String & table_name) const override;
void shutdown() override;
void drop() override;
void alterTable(const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier) override;
};
}

View File

@ -1,7 +1,7 @@
#include <Databases/DatabaseFactory.h> #include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseOrdinary.h> #include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseMemory.h> #include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseDictionary.h>
namespace DB namespace DB
{ {
@ -22,6 +22,8 @@ DatabasePtr DatabaseFactory::get(
return std::make_shared<DatabaseOrdinary>(database_name, path); return std::make_shared<DatabaseOrdinary>(database_name, path);
else if (engine_name == "Memory") else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name); return std::make_shared<DatabaseMemory>(database_name);
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name, context);
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE); throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
} }

View File

@ -54,7 +54,7 @@ StoragePtr DatabaseMemory::detachTable(const String & table_name)
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name); auto it = tables.find(table_name);
if (it == tables.end()) if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS); throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second; res = it->second;
tables.erase(it); tables.erase(it);
} }

View File

@ -342,7 +342,7 @@ void DatabaseOrdinary::renameTable(
StoragePtr table = tryGetTable(table_name); StoragePtr table = tryGetTable(table_name);
if (!table) if (!table)
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS); throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown. /// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
try try

View File

@ -46,6 +46,9 @@ public:
DatabaseSnaphotIterator(Tables & tables_) DatabaseSnaphotIterator(Tables & tables_)
: tables(tables_), it(tables.begin()) {} : tables(tables_), it(tables.begin()) {}
DatabaseSnaphotIterator(Tables && tables_)
: tables(tables_), it(tables.begin()) {}
void next() override void next() override
{ {
++it; ++it;

View File

@ -38,6 +38,7 @@ class ExternalDictionaries
{ {
private: private:
friend class StorageSystemDictionaries; friend class StorageSystemDictionaries;
friend class DatabaseDictionary;
mutable std::mutex dictionaries_mutex; mutable std::mutex dictionaries_mutex;

View File

@ -1,4 +1,3 @@
#include <sstream> #include <sstream>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
@ -7,51 +6,63 @@
#include <Dictionaries/CacheDictionary.h> #include <Dictionaries/CacheDictionary.h>
#include <Storages/StorageDictionary.h> #include <Storages/StorageDictionary.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
namespace DB { namespace DB
{
StoragePtr StorageDictionary::create( StoragePtr StorageDictionary::create(
const String & table_name_, const String & table_name,
const String & database_name_, Context & context,
Context & context_, ASTPtr & query,
ASTPtr & query_, NamesAndTypesListPtr columns,
NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns,
const NamesAndTypesList & alias_columns_, const ColumnDefaults & column_defaults)
const ColumnDefaults & column_defaults_)
{ {
return make_shared( ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query);
table_name_, database_name_, context_, query_,
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_.getExternalDictionaries()
);
}
StorageDictionary::StorageDictionary(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const ExternalDictionaries & external_dictionaries_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), context(context_), columns(columns_),
external_dictionaries(external_dictionaries_)
{
logger = &Poco::Logger::get("StorageDictionary");
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
const ASTFunction & function = typeid_cast<const ASTFunction &> (*create.storage); const ASTFunction & function = typeid_cast<const ASTFunction &> (*create.storage);
if (function.arguments) { String dictionary_name;
if (function.arguments)
{
std::stringstream iss; std::stringstream iss;
function.arguments->format(IAST::FormatSettings(iss, false, false)); function.arguments->format(IAST::FormatSettings(iss, false, false));
dictionary_name = iss.str(); dictionary_name = iss.str();
} }
auto dictionary = external_dictionaries.getDictionary(dictionary_name);
checkNamesAndTypesCompatibleWithDictionary(dictionary); const auto & dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
return make_shared(table_name, columns, materialized_columns, alias_columns,
column_defaults, dictionary_structure, dictionary_name);
}
StoragePtr StorageDictionary::create(
const String & table_name,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const DictionaryStructure & dictionary_structure,
const String & dictionary_name)
{
return make_shared(table_name, columns, materialized_columns, alias_columns,
column_defaults, dictionary_structure, dictionary_name);
}
StorageDictionary::StorageDictionary(
const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const DictionaryStructure & dictionary_structure_,
const String & dictionary_name_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
columns(columns_), dictionary_name(dictionary_name_),
logger(&Poco::Logger::get("StorageDictionary"))
{
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure_);
} }
BlockInputStreams StorageDictionary::read( BlockInputStreams StorageDictionary::read(
@ -63,47 +74,47 @@ BlockInputStreams StorageDictionary::read(
const unsigned threads) const unsigned threads)
{ {
processed_stage = QueryProcessingStage::Complete; processed_stage = QueryProcessingStage::Complete;
auto dictionary = external_dictionaries.getDictionary(dictionary_name); auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)}; return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
} }
NamesAndTypes StorageDictionary::getNamesAndTypesFromDictionaryStructure(Ptr dictionary) const NamesAndTypesListPtr StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionaryStructure)
{ {
const DictionaryStructure & dictionaryStructure = dictionary->getStructure(); NamesAndTypesListPtr dictionaryNamesAndTypes = std::make_shared<NamesAndTypesList>();
NamesAndTypes dictionaryNamesAndTypes;
if (dictionaryStructure.id) if (dictionaryStructure.id)
dictionaryNamesAndTypes.push_back(NameAndTypePair(dictionaryStructure.id->name, dictionaryNamesAndTypes->push_back(NameAndTypePair(dictionaryStructure.id->name,
std::make_shared<DataTypeUInt64>())); std::make_shared<DataTypeUInt64>()));
if (dictionaryStructure.range_min) if (dictionaryStructure.range_min)
dictionaryNamesAndTypes.push_back(NameAndTypePair(dictionaryStructure.range_min->name, dictionaryNamesAndTypes->push_back(NameAndTypePair(dictionaryStructure.range_min->name,
std::make_shared<DataTypeUInt16>())); std::make_shared<DataTypeUInt16>()));
if (dictionaryStructure.range_max) if (dictionaryStructure.range_max)
dictionaryNamesAndTypes.push_back(NameAndTypePair(dictionaryStructure.range_max->name, dictionaryNamesAndTypes->push_back(NameAndTypePair(dictionaryStructure.range_max->name,
std::make_shared<DataTypeUInt16>())); std::make_shared<DataTypeUInt16>()));
if (dictionaryStructure.key) if (dictionaryStructure.key)
for (const auto & attribute : *dictionaryStructure.key) for (const auto & attribute : *dictionaryStructure.key)
dictionaryNamesAndTypes.push_back(NameAndTypePair(attribute.name, attribute.type)); dictionaryNamesAndTypes->push_back(NameAndTypePair(attribute.name, attribute.type));
for (const auto & attribute : dictionaryStructure.attributes) for (const auto & attribute : dictionaryStructure.attributes)
dictionaryNamesAndTypes.push_back(NameAndTypePair(attribute.name, attribute.type)); dictionaryNamesAndTypes->push_back(NameAndTypePair(attribute.name, attribute.type));
return dictionaryNamesAndTypes; return dictionaryNamesAndTypes;
} }
void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(Ptr dictionary) const void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionaryStructure) const
{ {
auto dictionaryNamesAndTypes = getNamesAndTypesFromDictionaryStructure(dictionary); auto dictionaryNamesAndTypes = getNamesAndTypes(dictionaryStructure);
std::set<NameAndTypePair> namesAndTypesSet(dictionaryNamesAndTypes.begin(), dictionaryNamesAndTypes.end()); std::set<NameAndTypePair> namesAndTypesSet(dictionaryNamesAndTypes->begin(), dictionaryNamesAndTypes->end());
for (auto & column : *columns) { for (auto & column : *columns)
if (namesAndTypesSet.find(column) == namesAndTypesSet.end()) { {
if (namesAndTypesSet.find(column) == namesAndTypesSet.end())
{
std::string message = "Not found column "; std::string message = "Not found column ";
message += column.name + " " + column.type->getName(); message += column.name + " " + column.type->getName();
message += " in dictionary " + dictionary_name + ". "; message += " in dictionary " + dictionary_name + ". ";
message += "There are only columns "; message += "There are only columns ";
message += generateNamesAndTypesDescription(dictionaryNamesAndTypes.begin(), dictionaryNamesAndTypes.end()); message += generateNamesAndTypesDescription(dictionaryNamesAndTypes->begin(), dictionaryNamesAndTypes->end());
throw Exception(message); throw Exception(message);
} }
} }

View File

@ -1,24 +1,26 @@
#pragma once #pragma once
#include <ext/shared_ptr_helper.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <common/MultiVersion.h> #include <common/MultiVersion.h>
#include <common/logger_useful.h> #include <ext/shared_ptr_helper.h>
#include <Interpreters/ExternalDictionaries.h> namespace Poco
{
class Logger;
}
namespace DB namespace DB
{ {
class DictionaryStructure;
class IDictionaryBase;
class ExternalDictionaries;
class StorageDictionary : private ext::shared_ptr_helper<StorageDictionary>, public IStorage class StorageDictionary : private ext::shared_ptr_helper<StorageDictionary>, public IStorage
{ {
friend class ext::shared_ptr_helper<StorageDictionary>; friend class ext::shared_ptr_helper<StorageDictionary>;
public: public:
static StoragePtr create( static StoragePtr create(const String & table_name_,
const String & table_name_,
const String & database_name_,
Context & context_, Context & context_,
ASTPtr & query_, ASTPtr & query_,
NamesAndTypesListPtr columns_, NamesAndTypesListPtr columns_,
@ -26,56 +28,55 @@ public:
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_); const ColumnDefaults & column_defaults_);
static StoragePtr create(const String & table_name,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const DictionaryStructure & dictionary_structure,
const String & dictionary_name);
std::string getName() const override { return "Dictionary"; } std::string getName() const override { return "Dictionary"; }
std::string getTableName() const override { return table_name; } std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
BlockInputStreams read(const Names & column_names,
BlockInputStreams read( const ASTPtr & query,
const Names & column_names,
const ASTPtr& query,
const Context & context, const Context & context,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override; unsigned threads = 1) override;
void drop() override {} void drop() override {}
static NamesAndTypesListPtr getNamesAndTypes(const DictionaryStructure & dictionaryStructure);
private: private:
using Ptr = MultiVersion<IDictionaryBase>::Version; using Ptr = MultiVersion<IDictionaryBase>::Version;
String select_database_name;
String select_table_name;
String table_name; String table_name;
String database_name;
ASTSelectQuery inner_query;
Context & context;
NamesAndTypesListPtr columns; NamesAndTypesListPtr columns;
String dictionary_name; String dictionary_name;
const ExternalDictionaries & external_dictionaries;
Poco::Logger * logger; Poco::Logger * logger;
StorageDictionary( StorageDictionary(const String & table_name_,
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
NamesAndTypesListPtr columns_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
const ExternalDictionaries & external_dictionaries_); const DictionaryStructure & dictionary_structure_,
const String & dictionary_name_);
void checkNamesAndTypesCompatibleWithDictionary(Ptr dictionary) const; void checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionaryStructure) const;
NamesAndTypes getNamesAndTypesFromDictionaryStructure(Ptr dictionary) const;
template <class ForwardIterator> template <class ForwardIterator>
std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end) const { std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end) const
if (begin == end) { {
if (begin == end)
{
return ""; return "";
} }
std::string description; std::string description;
for (; begin != end; ++begin) { for (; begin != end; ++begin)
{
description += ", "; description += ", ";
description += begin->name; description += begin->name;
description += ' '; description += ' ';
@ -84,6 +85,4 @@ private:
return description.substr(2, description.size()); return description.substr(2, description.size());
} }
}; };
} }

View File

@ -282,8 +282,9 @@ StoragePtr StorageFactory::get(
} }
else if (name == "Dictionary") else if (name == "Dictionary")
{ {
return StorageDictionary::create( return StorageDictionary::create(
table_name, database_name, context, query, columns, table_name, context, query, columns,
materialized_columns, alias_columns, column_defaults); materialized_columns, alias_columns, column_defaults);
} }
else if (name == "TinyLog") else if (name == "TinyLog")