DatabaseDictionary creates new storage for every create table query; drop table works untill restart

This commit is contained in:
Nikolai Kochetov 2017-06-23 18:55:45 +03:00
parent 4c114bc3a1
commit adb632319d
8 changed files with 129 additions and 110 deletions

View File

@ -1,13 +1,11 @@
#include <common/logger_useful.h>
#include <Databases/DatabaseDictionary.h>
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
#include <Storages/StorageDictionary.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Storages/StorageDictionary.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_ALREADY_EXISTS;
@ -15,62 +13,80 @@ extern const int UNKNOWN_TABLE;
extern const int LOGICAL_ERROR;
}
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context)
: name(name_),
external_dictionaries(context.getExternalDictionaries()),
log(&Logger::get("DatabaseDictionary(" + name + ")"))
{
}
void DatabaseDictionary::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag)
{
log = &Logger::get("DatabaseDictionary(" + name + ")");
}
const auto & external_dictionaries = context.getExternalDictionaries();
const std::lock_guard<std::mutex> lock_dictionaries{external_dictionaries.dictionaries_mutex};
const std::lock_guard<std::mutex> lock_tables(mutex);
Tables DatabaseDictionary::loadTables()
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
Tables tables;
for (const auto & pair : external_dictionaries.dictionaries)
{
const std::string & name = pair.first;
if (deleted_tables.count(name))
continue;
auto dict_ptr = pair.second.dict;
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->get()->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[name] =
StorageDictionary::create(name, context, columns, {}, {}, {}, dictionary_structure, name);
tables[name] = StorageDictionary::create(name, columns, {}, {}, {}, dictionary_structure, name);
}
}
for (const auto & pair : external_dictionaries.failed_dictionaries)
{
const std::string & name = pair.first;
const DictionaryStructure & dictionary_structure = pair.second.dict->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[name] =
StorageDictionary::create(name, context, columns, {}, {}, {}, dictionary_structure, name);
}
return tables;
}
bool DatabaseDictionary::isTableExist(const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.count(table_name);
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
return external_dictionaries.dictionaries.count(table_name) && !deleted_tables.count(table_name);
}
StoragePtr DatabaseDictionary::tryGetTable(const String & table_name)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
if (deleted_tables.count(table_name))
return {};
return it->second;
{
auto it = external_dictionaries.dictionaries.find(table_name);
if (it != external_dictionaries.dictionaries.end())
{
const auto & dict_ptr = it->second.dict;
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->get()->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
return StorageDictionary::create(table_name, columns, {}, {}, {}, dictionary_structure, table_name);
}
}
}
return {};
}
DatabaseIteratorPtr DatabaseDictionary::getIterator()
{
std::lock_guard<std::mutex> lock(mutex);
return std::make_unique<DatabaseSnaphotIterator>(tables);
return std::make_unique<DatabaseSnaphotIterator>(loadTables());
}
bool DatabaseDictionary::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.empty();
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
for (const auto & pair : external_dictionaries.dictionaries)
if (pair.second.dict && !deleted_tables.count(pair.first))
return false;
return true;
}
StoragePtr DatabaseDictionary::detachTable(const String & table_name)
@ -83,23 +99,29 @@ void DatabaseDictionary::attachTable(const String & table_name, const StoragePtr
throw Exception("DatabaseDictionary: attachTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::createTable(
const String & table_name, const StoragePtr & table, const ASTPtr & query, const String & engine, const Settings & settings)
void DatabaseDictionary::createTable(const String & table_name,
const StoragePtr & table,
const ASTPtr & query,
const String & engine,
const Settings & settings)
{
throw Exception("DatabaseDictionary: attachTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("DatabaseDictionary: createTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::removeTable(const String & table_name)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
tables.erase(it);
if (!isTableExist(table_name))
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
deleted_tables.insert(table_name);
}
void DatabaseDictionary::renameTable(
const Context & context, const String & table_name, IDatabase & to_database, const String & to_table_name, const Settings & settings)
void DatabaseDictionary::renameTable(const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
const Settings & settings)
{
throw Exception("DatabaseDictionary: renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
@ -117,14 +139,6 @@ ASTPtr DatabaseDictionary::getCreateQuery(const String & table_name) const
void DatabaseDictionary::shutdown()
{
/// You can not hold a lock during shutdown.
/// Because inside `shutdown` function tables can work with database, and mutex is not recursive.
for (auto iterator = getIterator(); iterator->isValid(); iterator->next())
iterator->table()->shutdown();
std::lock_guard<std::mutex> lock(mutex);
tables.clear();
}
void DatabaseDictionary::drop()
@ -132,16 +146,14 @@ void DatabaseDictionary::drop()
/// Additional actions to delete database are not required.
}
void DatabaseDictionary::alterTable(
const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier)
void DatabaseDictionary::alterTable(const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier)
{
throw Exception("DatabaseDictionary: alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -1,38 +1,43 @@
#pragma once
#include <mutex>
#include <unordered_set>
#include <Databases/DatabasesCommon.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
namespace Poco {
namespace Poco
{
class Logger;
}
namespace DB
{
class ExternalDictionaries;
/* Database to store StorageDictionary tables
* automatically creates tables for all dictionaries
*/
class DatabaseDictionary : public IDatabase
{
protected:
private:
const String name;
mutable std::mutex mutex;
Tables tables;
const ExternalDictionaries & external_dictionaries;
std::unordered_set<String> deleted_tables;
Poco::Logger * log;
public:
Tables loadTables();
DatabaseDictionary(const String & name_) : name(name_) {}
public:
DatabaseDictionary(const String & name_, const Context & context);
String getEngineName() const override {
return "Dictionary";
}
void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override;
bool isTableExist(const String & table_name) const override;
@ -42,16 +47,22 @@ public:
bool empty() const override;
void createTable(
const String & table_name, const StoragePtr & table, const ASTPtr & query, const String & engine, const Settings & settings) override;
void createTable(const String & table_name,
const StoragePtr & table,
const ASTPtr & query,
const String & engine,
const Settings & settings) override;
void removeTable(const String & table_name) override;
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & table_name) override;
void renameTable(
const Context & context, const String & table_name, IDatabase & to_database, const String & to_table_name, const Settings & settings) override;
void renameTable(const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
const Settings & settings) override;
time_t getTableMetadataModificationTime(const String & table_name) override;
@ -60,15 +71,12 @@ public:
void shutdown() override;
void drop() override;
void alterTable(
const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier) override;
void alterTable(const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier) override;
};
}

View File

@ -3,7 +3,6 @@
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseDictionary.h>
namespace DB
{
@ -24,7 +23,7 @@ DatabasePtr DatabaseFactory::get(
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name);
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name);
return std::make_shared<DatabaseDictionary>(database_name, context);
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -54,7 +54,7 @@ StoragePtr DatabaseMemory::detachTable(const String & table_name)
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second;
tables.erase(it);
}

View File

@ -342,7 +342,7 @@ void DatabaseOrdinary::renameTable(
StoragePtr table = tryGetTable(table_name);
if (!table)
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
try

View File

@ -46,6 +46,9 @@ public:
DatabaseSnaphotIterator(Tables & tables_)
: tables(tables_), it(tables.begin()) {}
DatabaseSnaphotIterator(Tables && tables_)
: tables(tables_), it(tables.begin()) {}
void next() override
{
++it;

View File

@ -9,7 +9,8 @@
#include <Interpreters/ExternalDictionaries.h>
#include <common/logger_useful.h>
namespace DB {
namespace DB
{
StoragePtr StorageDictionary::create(
const String & table_name,
@ -23,7 +24,8 @@ StoragePtr StorageDictionary::create(
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query);
const ASTFunction & function = typeid_cast<const ASTFunction &> (*create.storage);
String dictionary_name;
if (function.arguments) {
if (function.arguments)
{
std::stringstream iss;
function.arguments->format(IAST::FormatSettings(iss, false, false));
dictionary_name = iss.str();
@ -31,13 +33,12 @@ StoragePtr StorageDictionary::create(
const auto & dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
return make_shared(table_name, context, columns, materialized_columns, alias_columns,
return make_shared(table_name, columns, materialized_columns, alias_columns,
column_defaults, dictionary_structure, dictionary_name);
}
StoragePtr StorageDictionary::create(
const String & table_name,
Context & context,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
@ -45,13 +46,12 @@ StoragePtr StorageDictionary::create(
const DictionaryStructure & dictionary_structure,
const String & dictionary_name)
{
return make_shared(table_name, context, columns, materialized_columns, alias_columns,
return make_shared(table_name, columns, materialized_columns, alias_columns,
column_defaults, dictionary_structure, dictionary_name);
}
StorageDictionary::StorageDictionary(
const String & table_name_,
Context & context_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
@ -59,7 +59,7 @@ StorageDictionary::StorageDictionary(
const DictionaryStructure & dictionary_structure_,
const String & dictionary_name_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
context(context_), columns(columns_), dictionary_name(dictionary_name_),
columns(columns_), dictionary_name(dictionary_name_),
logger(&Poco::Logger::get("StorageDictionary"))
{
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure_);
@ -106,8 +106,10 @@ void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(const Diction
auto dictionaryNamesAndTypes = getNamesAndTypes(dictionaryStructure);
std::set<NameAndTypePair> namesAndTypesSet(dictionaryNamesAndTypes->begin(), dictionaryNamesAndTypes->end());
for (auto & column : *columns) {
if (namesAndTypesSet.find(column) == namesAndTypesSet.end()) {
for (auto & column : *columns)
{
if (namesAndTypesSet.find(column) == namesAndTypesSet.end())
{
std::string message = "Not found column ";
message += column.name + " " + column.type->getName();
message += " in dictionary " + dictionary_name + ". ";

View File

@ -1,24 +1,26 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <common/MultiVersion.h>
#include <ext/shared_ptr_helper.h>
namespace Poco { class Logger; }
namespace Poco
{
class Logger;
}
namespace DB
{
class DictionaryStructure;
class IDictionaryBase;
class ExternalDictionaries;
class StorageDictionary : private ext::shared_ptr_helper<StorageDictionary>, public IStorage
{
friend class ext::shared_ptr_helper<StorageDictionary>;
public:
static StoragePtr create(
const String & table_name_,
static StoragePtr create(const String & table_name_,
Context & context_,
ASTPtr & query_,
NamesAndTypesListPtr columns_,
@ -26,9 +28,7 @@ public:
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_);
static StoragePtr create(
const String & table_name,
Context & context,
static StoragePtr create(const String & table_name,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
@ -39,31 +39,25 @@ public:
std::string getName() const override { return "Dictionary"; }
std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
BlockInputStreams read(
const Names & column_names,
const ASTPtr& query,
BlockInputStreams read(const Names & column_names,
const ASTPtr & query,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
void drop() override {}
static NamesAndTypesListPtr getNamesAndTypes(const DictionaryStructure & dictionaryStructure);
private:
using Ptr = MultiVersion<IDictionaryBase>::Version;
String table_name;
Context & context;
NamesAndTypesListPtr columns;
String dictionary_name;
Poco::Logger * logger;
StorageDictionary(
const String & table_name_,
Context & context_,
StorageDictionary(const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
@ -74,12 +68,15 @@ private:
void checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionaryStructure) const;
template <class ForwardIterator>
std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end) const {
if (begin == end) {
std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end) const
{
if (begin == end)
{
return "";
}
std::string description;
for (; begin != end; ++begin) {
for (; begin != end; ++begin)
{
description += ", ";
description += begin->name;
description += ' ';
@ -88,6 +85,4 @@ private:
return description.substr(2, description.size());
}
};
}