Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2017-06-26 11:34:02 +03:00
commit 219740155a
11 changed files with 347 additions and 88 deletions

View File

@ -0,0 +1,159 @@
#include <Databases/DatabaseDictionary.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Storages/StorageDictionary.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_ALREADY_EXISTS;
extern const int UNKNOWN_TABLE;
extern const int LOGICAL_ERROR;
}
DatabaseDictionary::DatabaseDictionary(const String & name_, const Context & context)
: name(name_),
external_dictionaries(context.getExternalDictionaries()),
log(&Logger::get("DatabaseDictionary(" + name + ")"))
{
}
void DatabaseDictionary::loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag)
{
}
Tables DatabaseDictionary::loadTables()
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
Tables tables;
for (const auto & pair : external_dictionaries.dictionaries)
{
const std::string & name = pair.first;
if (deleted_tables.count(name))
continue;
auto dict_ptr = pair.second.dict;
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->get()->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[name] = StorageDictionary::create(name, columns, {}, {}, {}, dictionary_structure, name);
}
}
return tables;
}
bool DatabaseDictionary::isTableExist(const String & table_name) const
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
return external_dictionaries.dictionaries.count(table_name) && !deleted_tables.count(table_name);
}
StoragePtr DatabaseDictionary::tryGetTable(const String & table_name)
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
if (deleted_tables.count(table_name))
return {};
{
auto it = external_dictionaries.dictionaries.find(table_name);
if (it != external_dictionaries.dictionaries.end())
{
const auto & dict_ptr = it->second.dict;
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->get()->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
return StorageDictionary::create(table_name, columns, {}, {}, {}, dictionary_structure, table_name);
}
}
}
return {};
}
DatabaseIteratorPtr DatabaseDictionary::getIterator()
{
return std::make_unique<DatabaseSnaphotIterator>(loadTables());
}
bool DatabaseDictionary::empty() const
{
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
for (const auto & pair : external_dictionaries.dictionaries)
if (pair.second.dict && !deleted_tables.count(pair.first))
return false;
return true;
}
StoragePtr DatabaseDictionary::detachTable(const String & table_name)
{
throw Exception("DatabaseDictionary: detachTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::attachTable(const String & table_name, const StoragePtr & table)
{
throw Exception("DatabaseDictionary: attachTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::createTable(const String & table_name,
const StoragePtr & table,
const ASTPtr & query,
const String & engine,
const Settings & settings)
{
throw Exception("DatabaseDictionary: createTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::removeTable(const String & table_name)
{
if (!isTableExist(table_name))
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
const std::lock_guard<std::mutex> lock_dictionaries {external_dictionaries.dictionaries_mutex};
deleted_tables.insert(table_name);
}
void DatabaseDictionary::renameTable(const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
const Settings & settings)
{
throw Exception("DatabaseDictionary: renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
time_t DatabaseDictionary::getTableMetadataModificationTime(const String & table_name)
{
return static_cast<time_t>(0);
}
ASTPtr DatabaseDictionary::getCreateQuery(const String & table_name) const
{
throw Exception("DatabaseDictionary: getCreateQuery() is not supported", ErrorCodes::NOT_IMPLEMENTED);
return nullptr;
}
void DatabaseDictionary::shutdown()
{
}
void DatabaseDictionary::drop()
{
/// Additional actions to delete database are not required.
}
void DatabaseDictionary::alterTable(const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier)
{
throw Exception("DatabaseDictionary: alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -0,0 +1,83 @@
#pragma once
#include <mutex>
#include <unordered_set>
#include <Databases/DatabasesCommon.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class ExternalDictionaries;
/* Database to store StorageDictionary tables
* automatically creates tables for all dictionaries
*/
class DatabaseDictionary : public IDatabase
{
private:
const String name;
mutable std::mutex mutex;
const ExternalDictionaries & external_dictionaries;
std::unordered_set<String> deleted_tables;
Poco::Logger * log;
Tables loadTables();
public:
DatabaseDictionary(const String & name_, const Context & context);
String getEngineName() const override
{
return "Dictionary";
}
void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override;
bool isTableExist(const String & table_name) const override;
StoragePtr tryGetTable(const String & table_name) override;
DatabaseIteratorPtr getIterator() override;
bool empty() const override;
void createTable(const String & table_name,
const StoragePtr & table,
const ASTPtr & query,
const String & engine,
const Settings & settings) override;
void removeTable(const String & table_name) override;
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & table_name) override;
void renameTable(const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
const Settings & settings) override;
time_t getTableMetadataModificationTime(const String & table_name) override;
ASTPtr getCreateQuery(const String & table_name) const override;
void shutdown() override;
void drop() override;
void alterTable(const Context & context,
const String & name,
const NamesAndTypesList & columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier) override;
};
}

View File

@ -1,7 +1,7 @@
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseDictionary.h>
namespace DB
{
@ -22,6 +22,8 @@ DatabasePtr DatabaseFactory::get(
return std::make_shared<DatabaseOrdinary>(database_name, path);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name);
else if (engine_name == "Dictionary")
return std::make_shared<DatabaseDictionary>(database_name, context);
throw Exception("Unknown database engine: " + engine_name, ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -54,7 +54,7 @@ StoragePtr DatabaseMemory::detachTable(const String & table_name)
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second;
tables.erase(it);
}

View File

@ -342,7 +342,7 @@ void DatabaseOrdinary::renameTable(
StoragePtr table = tryGetTable(table_name);
if (!table)
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
try

View File

@ -46,6 +46,9 @@ public:
DatabaseSnaphotIterator(Tables & tables_)
: tables(tables_), it(tables.begin()) {}
DatabaseSnaphotIterator(Tables && tables_)
: tables(tables_), it(tables.begin()) {}
void next() override
{
++it;

View File

@ -38,6 +38,7 @@ class ExternalDictionaries
{
private:
friend class StorageSystemDictionaries;
friend class DatabaseDictionary;
mutable std::mutex dictionaries_mutex;

View File

@ -1,4 +1,3 @@
#include <sstream>
#include <Parsers/ASTCreateQuery.h>
#include <DataTypes/DataTypesNumber.h>
@ -7,51 +6,63 @@
#include <Dictionaries/CacheDictionary.h>
#include <Storages/StorageDictionary.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <common/logger_useful.h>
namespace DB {
namespace DB
{
StoragePtr StorageDictionary::create(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
const String & table_name,
Context & context,
ASTPtr & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults)
{
return make_shared(
table_name_, database_name_, context_, query_,
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_.getExternalDictionaries()
);
}
StorageDictionary::StorageDictionary(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const ExternalDictionaries & external_dictionaries_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), context(context_), columns(columns_),
external_dictionaries(external_dictionaries_)
{
logger = &Poco::Logger::get("StorageDictionary");
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query);
const ASTFunction & function = typeid_cast<const ASTFunction &> (*create.storage);
if (function.arguments) {
String dictionary_name;
if (function.arguments)
{
std::stringstream iss;
function.arguments->format(IAST::FormatSettings(iss, false, false));
dictionary_name = iss.str();
}
auto dictionary = external_dictionaries.getDictionary(dictionary_name);
checkNamesAndTypesCompatibleWithDictionary(dictionary);
const auto & dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
return make_shared(table_name, columns, materialized_columns, alias_columns,
column_defaults, dictionary_structure, dictionary_name);
}
StoragePtr StorageDictionary::create(
const String & table_name,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const DictionaryStructure & dictionary_structure,
const String & dictionary_name)
{
return make_shared(table_name, columns, materialized_columns, alias_columns,
column_defaults, dictionary_structure, dictionary_name);
}
StorageDictionary::StorageDictionary(
const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const DictionaryStructure & dictionary_structure_,
const String & dictionary_name_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
columns(columns_), dictionary_name(dictionary_name_),
logger(&Poco::Logger::get("StorageDictionary"))
{
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure_);
}
BlockInputStreams StorageDictionary::read(
@ -63,47 +74,47 @@ BlockInputStreams StorageDictionary::read(
const unsigned threads)
{
processed_stage = QueryProcessingStage::Complete;
auto dictionary = external_dictionaries.getDictionary(dictionary_name);
auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
}
NamesAndTypes StorageDictionary::getNamesAndTypesFromDictionaryStructure(Ptr dictionary) const
NamesAndTypesListPtr StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionaryStructure)
{
const DictionaryStructure & dictionaryStructure = dictionary->getStructure();
NamesAndTypes dictionaryNamesAndTypes;
NamesAndTypesListPtr dictionaryNamesAndTypes = std::make_shared<NamesAndTypesList>();
if (dictionaryStructure.id)
dictionaryNamesAndTypes.push_back(NameAndTypePair(dictionaryStructure.id->name,
dictionaryNamesAndTypes->push_back(NameAndTypePair(dictionaryStructure.id->name,
std::make_shared<DataTypeUInt64>()));
if (dictionaryStructure.range_min)
dictionaryNamesAndTypes.push_back(NameAndTypePair(dictionaryStructure.range_min->name,
dictionaryNamesAndTypes->push_back(NameAndTypePair(dictionaryStructure.range_min->name,
std::make_shared<DataTypeUInt16>()));
if (dictionaryStructure.range_max)
dictionaryNamesAndTypes.push_back(NameAndTypePair(dictionaryStructure.range_max->name,
dictionaryNamesAndTypes->push_back(NameAndTypePair(dictionaryStructure.range_max->name,
std::make_shared<DataTypeUInt16>()));
if (dictionaryStructure.key)
for (const auto & attribute : *dictionaryStructure.key)
dictionaryNamesAndTypes.push_back(NameAndTypePair(attribute.name, attribute.type));
dictionaryNamesAndTypes->push_back(NameAndTypePair(attribute.name, attribute.type));
for (const auto & attribute : dictionaryStructure.attributes)
dictionaryNamesAndTypes.push_back(NameAndTypePair(attribute.name, attribute.type));
dictionaryNamesAndTypes->push_back(NameAndTypePair(attribute.name, attribute.type));
return dictionaryNamesAndTypes;
}
void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(Ptr dictionary) const
void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionaryStructure) const
{
auto dictionaryNamesAndTypes = getNamesAndTypesFromDictionaryStructure(dictionary);
std::set<NameAndTypePair> namesAndTypesSet(dictionaryNamesAndTypes.begin(), dictionaryNamesAndTypes.end());
auto dictionaryNamesAndTypes = getNamesAndTypes(dictionaryStructure);
std::set<NameAndTypePair> namesAndTypesSet(dictionaryNamesAndTypes->begin(), dictionaryNamesAndTypes->end());
for (auto & column : *columns) {
if (namesAndTypesSet.find(column) == namesAndTypesSet.end()) {
for (auto & column : *columns)
{
if (namesAndTypesSet.find(column) == namesAndTypesSet.end())
{
std::string message = "Not found column ";
message += column.name + " " + column.type->getName();
message += " in dictionary " + dictionary_name + ". ";
message += "There are only columns ";
message += generateNamesAndTypesDescription(dictionaryNamesAndTypes.begin(), dictionaryNamesAndTypes.end());
message += generateNamesAndTypesDescription(dictionaryNamesAndTypes->begin(), dictionaryNamesAndTypes->end());
throw Exception(message);
}
}

View File

@ -1,24 +1,26 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/IStorage.h>
#include <common/MultiVersion.h>
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
#include <Interpreters/ExternalDictionaries.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class DictionaryStructure;
class IDictionaryBase;
class ExternalDictionaries;
class StorageDictionary : private ext::shared_ptr_helper<StorageDictionary>, public IStorage
{
friend class ext::shared_ptr_helper<StorageDictionary>;
public:
static StoragePtr create(
const String & table_name_,
const String & database_name_,
static StoragePtr create(const String & table_name_,
Context & context_,
ASTPtr & query_,
NamesAndTypesListPtr columns_,
@ -26,56 +28,55 @@ public:
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_);
static StoragePtr create(const String & table_name,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const DictionaryStructure & dictionary_structure,
const String & dictionary_name);
std::string getName() const override { return "Dictionary"; }
std::string getTableName() const override { return table_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
BlockInputStreams read(
const Names & column_names,
const ASTPtr& query,
BlockInputStreams read(const Names & column_names,
const ASTPtr & query,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
void drop() override {}
static NamesAndTypesListPtr getNamesAndTypes(const DictionaryStructure & dictionaryStructure);
private:
using Ptr = MultiVersion<IDictionaryBase>::Version;
String select_database_name;
String select_table_name;
String table_name;
String database_name;
ASTSelectQuery inner_query;
Context & context;
NamesAndTypesListPtr columns;
String dictionary_name;
const ExternalDictionaries & external_dictionaries;
Poco::Logger * logger;
StorageDictionary(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
StorageDictionary(const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const ExternalDictionaries & external_dictionaries_);
const DictionaryStructure & dictionary_structure_,
const String & dictionary_name_);
void checkNamesAndTypesCompatibleWithDictionary(Ptr dictionary) const;
NamesAndTypes getNamesAndTypesFromDictionaryStructure(Ptr dictionary) const;
void checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionaryStructure) const;
template <class ForwardIterator>
std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end) const {
if (begin == end) {
std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end) const
{
if (begin == end)
{
return "";
}
std::string description;
for (; begin != end; ++begin) {
for (; begin != end; ++begin)
{
description += ", ";
description += begin->name;
description += ' ';
@ -84,6 +85,4 @@ private:
return description.substr(2, description.size());
}
};
}

View File

@ -282,8 +282,9 @@ StoragePtr StorageFactory::get(
}
else if (name == "Dictionary")
{
return StorageDictionary::create(
table_name, database_name, context, query, columns,
table_name, context, query, columns,
materialized_columns, alias_columns, column_defaults);
}
else if (name == "TinyLog")

View File

@ -57,7 +57,7 @@ There are no quorum writes. You can't write data with confirmation that it was r
Each block of data is written atomically. The INSERT query is divided into blocks up to max_insert_block_size = 1048576 rows. In other words, if the INSERT query has less than 1048576 rows, it is made atomically.
Blocks of data are duplicated. For multiple writes of the same data block (data blocks of the same size containing the same rows in the same order), the block is only written once. The reason for this is in case of network failures when the client application doesn't know if the data was written to the DB, so the INSERT query can simply be repeated. It doesn't matter which replica INSERTs were sent to with identical data - INSERTs are idempotent. This only works for the last 100 blocks inserted in a table.
Blocks of data are deduplicated. For multiple writes of the same data block (data blocks of the same size containing the same rows in the same order), the block is only written once. The reason for this is in case of network failures when the client application doesn't know if the data was written to the DB, so the INSERT query can simply be repeated. It doesn't matter which replica INSERTs were sent to with identical data - INSERTs are idempotent. This only works for the last 100 blocks inserted in a table.
During replication, only the source data to insert is transferred over the network. Further data transformation (merging) is coordinated and performed on all the replicas in the same way. This minimizes network usage, which means that replication works well when replicas reside in different datacenters. (Note that duplicating data in different datacenters is the main goal of replication.)