ClickHouse/dbms/src/Storages/StorageFactory.cpp

391 lines
15 KiB
C++
Raw Normal View History

#include <Poco/Util/Application.h>
#include <Poco/Util/AbstractConfiguration.h>
2012-05-22 19:32:56 +00:00
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
2012-05-30 05:53:09 +00:00
#include <DB/Parsers/ASTLiteral.h>
2012-05-22 19:32:56 +00:00
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/reinterpretAsIdentifier.h>
2012-05-22 19:32:56 +00:00
2011-10-31 17:30:44 +00:00
#include <DB/Storages/StorageLog.h>
2012-06-25 00:17:19 +00:00
#include <DB/Storages/StorageTinyLog.h>
2011-10-31 17:55:06 +00:00
#include <DB/Storages/StorageMemory.h>
#include <DB/Storages/StorageBuffer.h>
#include <DB/Storages/StorageNull.h>
2012-05-30 05:53:09 +00:00
#include <DB/Storages/StorageMerge.h>
2012-07-18 19:30:33 +00:00
#include <DB/Storages/StorageMergeTree.h>
2012-05-22 19:32:56 +00:00
#include <DB/Storages/StorageDistributed.h>
2011-10-31 17:30:44 +00:00
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemOne.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/StorageView.h>
#include <DB/Storages/StorageMaterializedView.h>
#include <DB/Storages/StorageChunks.h>
#include <DB/Storages/StorageChunkRef.h>
2013-02-11 09:19:48 +00:00
#include <DB/Storages/StorageChunkMerger.h>
2014-03-21 19:17:59 +00:00
#include <DB/Storages/StorageReplicatedMergeTree.h>
2011-10-31 17:30:44 +00:00
namespace DB
{
2014-03-21 19:17:59 +00:00
static bool endsWith(const std::string & s, const std::string & suffix)
{
return s.size() >= suffix.size() && 0 == strncmp(s.data() + s.size() - suffix.size(), suffix.data(), suffix.size());
2014-03-21 19:17:59 +00:00
}
static bool startsWith(const std::string & s, const std::string & prefix)
{
return s.size() >= prefix.size() && 0 == strncmp(s.data(), prefix.data(), prefix.size());
2014-03-21 19:17:59 +00:00
}
2014-03-18 17:20:44 +00:00
/** Для StorageMergeTree: достать первичный ключ в виде ASTExpressionList.
* Он может быть указан в кортеже: (CounterID, Date),
* или как один столбец: CounterID.
*/
static ASTPtr extractPrimaryKey(const ASTPtr & node, const std::string & storage_name)
{
const ASTFunction * primary_expr_func = typeid_cast<const ASTFunction *>(&*node);
2014-03-18 17:20:44 +00:00
if (primary_expr_func && primary_expr_func->name == "tuple")
{
/// Первичный ключ указан в кортеже.
return primary_expr_func->children.at(0);
}
else
{
/// Первичный ключ состоит из одного столбца.
ASTExpressionList * res = new ASTExpressionList;
ASTPtr res_ptr = res;
res->children.push_back(node);
return res_ptr;
}
}
2011-10-31 17:30:44 +00:00
StoragePtr StorageFactory::get(
const String & name,
const String & data_path,
const String & table_name,
const String & database_name,
Context & local_context,
2012-05-22 19:32:56 +00:00
Context & context,
ASTPtr & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
bool attach) const
2011-10-31 17:30:44 +00:00
{
if (name == "Log")
2012-05-22 19:32:56 +00:00
{
return StorageLog::create(
data_path, table_name, columns,
materialized_columns, alias_columns, column_defaults,
context.getSettings().max_compress_block_size);
2012-06-25 00:17:19 +00:00
}
else if (name == "Chunks")
{
return StorageChunks::create(
data_path, table_name, database_name, columns,
materialized_columns, alias_columns, column_defaults,
context, attach);
}
else if (name == "ChunkRef")
{
throw Exception("Table with storage ChunkRef must not be created manually.", ErrorCodes::TABLE_MUST_NOT_BE_CREATED_MANUALLY);
}
else if (name == "View")
{
return StorageView::create(
table_name, database_name, context, query, columns,
materialized_columns, alias_columns, column_defaults);
}
else if (name == "MaterializedView")
{
return StorageMaterializedView::create(
table_name, database_name, context, query, columns,
materialized_columns, alias_columns, column_defaults,
attach);
}
2013-02-11 09:19:48 +00:00
else if (name == "ChunkMerger")
{
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
2013-02-11 09:19:48 +00:00
do
{
if (args_func.size() != 1)
break;
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() < 3 || args.size() > 4)
2013-02-11 09:19:48 +00:00
break;
String source_database = reinterpretAsIdentifier(args[0], local_context).name;
String source_table_name_regexp = safeGet<const String &>(typeid_cast<ASTLiteral &>(*args[1]).value);
size_t chunks_to_merge = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*args[2]).value);
2013-03-14 11:43:43 +00:00
String destination_name_prefix = "group_";
String destination_database = source_database;
2013-03-14 11:43:43 +00:00
if (args.size() > 3)
destination_name_prefix = typeid_cast<ASTIdentifier &>(*args[3]).name;
return StorageChunkMerger::create(
database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults,
source_database, source_table_name_regexp,
destination_name_prefix, chunks_to_merge, context);
} while (false);
throw Exception("Storage ChunkMerger requires from 3 to 4 parameters:"
" source database, regexp for source table names, number of chunks to merge, [destination tables name prefix].",
2013-02-11 09:19:48 +00:00
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
2012-06-25 00:51:23 +00:00
else if (name == "TinyLog")
2012-06-25 00:17:19 +00:00
{
return StorageTinyLog::create(
data_path, table_name, columns,
materialized_columns, alias_columns, column_defaults,
attach, context.getSettings().max_compress_block_size);
2012-05-22 19:32:56 +00:00
}
2011-10-31 17:55:06 +00:00
else if (name == "Memory")
2012-05-22 19:32:56 +00:00
{
return StorageMemory::create(table_name, columns, materialized_columns, alias_columns, column_defaults);
2012-05-22 19:32:56 +00:00
}
else if (name == "Null")
{
return StorageNull::create(table_name, columns, materialized_columns, alias_columns, column_defaults);
}
2012-05-30 05:53:09 +00:00
else if (name == "Merge")
{
/** В запросе в качестве аргумента для движка указано имя БД, в которой находятся таблицы-источники,
* а также регексп для имён таблиц-источников.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
2012-05-30 05:53:09 +00:00
if (args_func.size() != 1)
throw Exception("Storage Merge requires exactly 2 parameters"
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
2012-05-30 05:53:09 +00:00
if (args.size() != 2)
throw Exception("Storage Merge requires exactly 2 parameters"
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String source_database = reinterpretAsIdentifier(args[0], local_context).name;
String table_name_regexp = safeGet<const String &>(typeid_cast<ASTLiteral &>(*args[1]).value);
return StorageMerge::create(
table_name, columns,
materialized_columns, alias_columns, column_defaults,
source_database, table_name_regexp, context);
2012-05-30 05:53:09 +00:00
}
2012-05-22 19:32:56 +00:00
else if (name == "Distributed")
{
/** В запросе в качестве аргумента для движка указано имя конфигурационной секции,
2012-05-30 05:53:09 +00:00
* в которой задан список удалённых серверов, а также имя удалённой БД и имя удалённой таблицы.
2012-05-22 19:32:56 +00:00
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
2012-05-22 20:18:45 +00:00
if (args_func.size() != 1)
throw Exception("Storage Distributed requires 3 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table.",
2012-05-22 20:18:45 +00:00
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3 && args.size() != 4)
2014-08-21 12:07:29 +00:00
throw Exception("Storage Distributed requires 3 or 4 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table,"
" sharding key expression (optional).",
2012-05-22 19:32:56 +00:00
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name = typeid_cast<ASTIdentifier &>(*args[0]).name;
String remote_database = reinterpretAsIdentifier(args[1], local_context).name;
String remote_table = typeid_cast<ASTIdentifier &>(*args[2]).name;
const auto & sharding_key = args.size() == 4 ? args[3] : nullptr;
return StorageDistributed::create(
table_name, columns,
materialized_columns, alias_columns, column_defaults,
remote_database, remote_table, cluster_name,
context, sharding_key, data_path);
2012-05-22 19:32:56 +00:00
}
else if (name == "Buffer")
{
/** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
*
* db, table - в какую таблицу сбрасывать данные из буфера.
* num_buckets - уровень параллелизма.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - условия вытеснения из буфера.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage Buffer requires 9 parameters: "
" destination database, destination table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String destination_database = reinterpretAsIdentifier(args[0], local_context).name;
String destination_table = typeid_cast<ASTIdentifier &>(*args[1]).name;
size_t num_buckets = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[2]).value);
time_t min_time = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[3]).value);
time_t max_time = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[4]).value);
size_t min_rows = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[5]).value);
size_t max_rows = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[6]).value);
size_t min_bytes = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[7]).value);
size_t max_bytes = apply_visitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[8]).value);
return StorageBuffer::create(
table_name, columns, context,
num_buckets, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes},
destination_database, destination_table);
}
2014-03-21 19:17:59 +00:00
else if (endsWith(name, "MergeTree"))
2012-07-18 19:30:33 +00:00
{
2014-08-04 22:10:56 +00:00
/** Движки [Replicated][Summing|Collapsing|Aggregating|]MergeTree (8 комбинаций)
2014-03-21 19:17:59 +00:00
* В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper
2012-07-18 19:30:33 +00:00
* - имя столбца с датой;
2014-05-22 18:58:07 +00:00
* - (не обязательно) выражение для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX);
2014-03-21 19:17:59 +00:00
* - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких);
* - index_granularity;
* - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
* Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
2012-07-18 19:30:33 +00:00
*/
2014-03-21 19:17:59 +00:00
String name_part = name.substr(0, name.size() - strlen("MergeTree"));
2012-07-18 19:30:33 +00:00
2014-03-21 19:17:59 +00:00
bool replicated = startsWith(name_part, "Replicated");
if (replicated)
name_part = name_part.substr(strlen("Replicated"));
2012-07-18 19:30:33 +00:00
2014-03-21 19:17:59 +00:00
MergeTreeData::Mode mode = MergeTreeData::Ordinary;
2012-07-18 19:30:33 +00:00
2014-03-21 19:17:59 +00:00
if (name_part == "Collapsing")
mode = MergeTreeData::Collapsing;
else if (name_part == "Summing")
mode = MergeTreeData::Summing;
2014-05-28 14:54:42 +00:00
else if (name_part == "Aggregating")
mode = MergeTreeData::Aggregating;
2014-03-21 19:17:59 +00:00
else if (!name_part.empty())
throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE);
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
2012-08-16 17:27:40 +00:00
2014-03-24 13:59:04 +00:00
ASTs args;
2012-08-16 17:27:40 +00:00
2014-03-24 13:59:04 +00:00
if (args_func.size() == 1)
args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
2012-08-16 17:27:40 +00:00
2014-03-21 19:17:59 +00:00
size_t additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing ? 1 : 0);
if (args.size() != additional_params + 3 && args.size() != additional_params + 4)
{
String params;
if (replicated)
2014-04-14 10:18:23 +00:00
params += "path in ZooKeeper, replica name or '', ";
2014-03-21 19:17:59 +00:00
params += "name of column with date, [name of column for sampling], primary key expression, index granularity";
if (mode == MergeTreeData::Collapsing)
params += ", sign column";
2014-03-24 13:59:04 +00:00
throw Exception("Storage " + name + " requires " + toString(additional_params + 3) + " or "
2014-03-21 19:17:59 +00:00
+ toString(additional_params + 4) +" parameters: " + params,
2012-08-16 17:27:40 +00:00
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
2014-03-21 19:17:59 +00:00
}
2012-08-16 17:27:40 +00:00
2014-03-21 19:17:59 +00:00
String zookeeper_path;
String replica_name;
String date_column_name;
ASTPtr primary_expr_list;
ASTPtr sampling_expression;
UInt64 index_granularity;
2012-08-16 17:27:40 +00:00
2014-03-21 19:17:59 +00:00
String sign_column_name;
2012-08-16 17:27:40 +00:00
2014-03-21 19:17:59 +00:00
if (replicated)
{
auto ast = typeid_cast<ASTLiteral *>(&*args[0]);
2014-03-21 19:17:59 +00:00
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
else
throw Exception("Path in ZooKeeper must be a string literal", ErrorCodes::BAD_ARGUMENTS);
ast = typeid_cast<ASTLiteral *>(&*args[1]);
2014-03-21 19:17:59 +00:00
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
else
throw Exception("Replica name must be a string literal", ErrorCodes::BAD_ARGUMENTS);
2014-04-14 10:18:23 +00:00
if (replica_name.empty())
throw Exception("No replica name in config", ErrorCodes::NO_REPLICA_NAME_GIVEN);
2014-04-14 10:18:23 +00:00
2014-03-21 19:17:59 +00:00
args.erase(args.begin(), args.begin() + 2);
}
if (mode == MergeTreeData::Collapsing)
{
if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back()))
2014-03-21 19:17:59 +00:00
sign_column_name = ast->name;
else
throw Exception("Sign column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
args.pop_back();
}
if (args.size() == 4)
{
sampling_expression = args[1];
args.erase(args.begin() + 1);
}
if (auto ast = typeid_cast<ASTIdentifier *>(&*args[0]))
2014-03-21 19:17:59 +00:00
date_column_name = ast->name;
else
throw Exception("Date column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
primary_expr_list = extractPrimaryKey(args[1], name);
auto ast = typeid_cast<ASTLiteral *>(&*args[2]);
2014-03-21 19:17:59 +00:00
if (ast && ast->value.getType() == Field::Types::UInt64)
index_granularity = safeGet<UInt64>(ast->value);
else
throw Exception("Index granularity must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, attach, data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name,
2014-03-21 19:17:59 +00:00
sampling_expression, index_granularity, mode, sign_column_name);
else
return StorageMergeTree::create(
data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name,
2014-03-21 19:17:59 +00:00
sampling_expression, index_granularity, mode, sign_column_name);
2012-08-16 17:27:40 +00:00
}
2011-10-31 17:30:44 +00:00
else
throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE);
}
}