This commit is contained in:
Alexander Tokmakov 2020-02-21 16:44:44 +03:00
parent e1f5a620b5
commit e117e5838e
7 changed files with 63 additions and 49 deletions

View File

@ -45,4 +45,12 @@ std::string getClusterName(const IAST & node)
throw Exception("Illegal expression instead of cluster name.", ErrorCodes::BAD_ARGUMENTS);
}
String getClusterNameAndMakeLiteral(ASTPtr & node)
{
String cluster_name = getClusterName(*node);
node = std::make_shared<ASTLiteral>(cluster_name);
return cluster_name;
}
}

View File

@ -1,13 +1,11 @@
#pragma once
#include <string>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class IAST;
/// Get the cluster name from AST.
/** The name of the cluster is the name of the tag in the xml configuration.
* Usually it is parsed as an identifier. That is, it can contain underscores, but can not contain hyphens,
@ -16,6 +14,8 @@ class IAST;
* This name will be parsed as an expression with an operator minus - not at all what you need.
* Therefore, consider this case separately.
*/
std::string getClusterName(const IAST & node);
String getClusterName(const IAST & node);
String getClusterNameAndMakeLiteral(ASTPtr & node);
}

View File

@ -446,8 +446,10 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
ASTs & engine_args = args.engine_args;
size_t arg_num = 0;
size_t arg_cnt = engine_args.size();
if (engine_args.size() < min_num_params || engine_args.size() > max_num_params)
if (arg_cnt < min_num_params || arg_cnt > max_num_params)
{
String msg;
if (is_extended_storage_def)
@ -477,15 +479,16 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
{
const auto * ast = engine_args[0]->as<ASTLiteral>();
const auto * ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
else
throw Exception(
"Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
++arg_num;
ast = engine_args[1]->as<ASTLiteral>();
ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
else
@ -497,39 +500,36 @@ static StoragePtr create(const StorageFactory::Arguments & args)
throw Exception(
"No replica name in config" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::NO_REPLICA_NAME_GIVEN);
engine_args.erase(engine_args.begin(), engine_args.begin() + 2);
++arg_num;
}
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{
if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.sign_column))
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.sign_column))
throw Exception(
"Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
--arg_cnt;
}
else if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
/// If the last element is not index_granularity or replica_name (a literal), then this is the name of the version column.
if (!engine_args.empty() && !engine_args.back()->as<ASTLiteral>())
if (arg_cnt && !engine_args[arg_cnt - 1]->as<ASTLiteral>())
{
if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.version_column))
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.version_column))
throw Exception(
"Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
--arg_cnt;
}
}
else if (merging_params.mode == MergeTreeData::MergingParams::Summing)
{
/// If the last element is not index_granularity or replica_name (a literal), then this is a list of summable columns.
if (!engine_args.empty() && !engine_args.back()->as<ASTLiteral>())
if (arg_cnt && !engine_args[arg_cnt - 1]->as<ASTLiteral>())
{
merging_params.columns_to_sum = extractColumnNames(engine_args.back());
engine_args.pop_back();
merging_params.columns_to_sum = extractColumnNames(engine_args[arg_cnt - 1]);
--arg_cnt;
}
}
else if (merging_params.mode == MergeTreeData::MergingParams::Graphite)
@ -538,7 +538,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
String error_msg = "Last parameter of GraphiteMergeTree must be name (in single quotes) of element in configuration file with Graphite options";
error_msg += getMergeTreeVerboseHelp(is_extended_storage_def);
if (const auto * ast = engine_args.back()->as<ASTLiteral>())
if (const auto * ast = engine_args[arg_cnt - 1]->as<ASTLiteral>())
{
if (ast->value.getType() != Field::Types::String)
throw Exception(error_msg, ErrorCodes::BAD_ARGUMENTS);
@ -548,24 +548,24 @@ static StoragePtr create(const StorageFactory::Arguments & args)
else
throw Exception(error_msg, ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
--arg_cnt;
setGraphitePatternsFromConfig(args.context, graphite_config_name, merging_params.graphite_params);
}
else if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
{
if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.version_column))
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.version_column))
throw Exception(
"Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
--arg_cnt;
if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.sign_column))
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.sign_column))
throw Exception(
"Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
engine_args.pop_back();
--arg_cnt;
}
String date_column_name;
@ -614,31 +614,38 @@ static StoragePtr create(const StorageFactory::Arguments & args)
}
else
{
/// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity)
if (engine_args.size() == 4)
{
sample_by_ast = engine_args[1];
engine_args.erase(engine_args.begin() + 1);
}
/// Now only three parameters remain - date (or partitioning expression), primary_key, index_granularity.
if (!tryGetIdentifierNameInto(engine_args[0], date_column_name))
/// Syntax: *MergeTree(..., date, [sample_key], primary_key, index_granularity, ...)
/// Get date:
if (!tryGetIdentifierNameInto(engine_args[arg_num], date_column_name))
throw Exception(
"Date column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
++arg_num;
order_by_ast = engine_args[1];
/// If there is an expression for sampling
if (arg_cnt - arg_num == 3)
{
sample_by_ast = engine_args[arg_num];
++arg_num;
}
const auto * ast = engine_args.back()->as<ASTLiteral>();
/// Now only two parameters remain - primary_key, index_granularity.
order_by_ast = engine_args[arg_num];
++arg_num;
const auto * ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::UInt64)
storage_settings->index_granularity = safeGet<UInt64>(ast->value);
else
throw Exception(
"Index granularity must be a positive integer" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
++arg_num;
}
if (arg_num != arg_cnt)
throw Exception("Wrong number of engine arguments.", ErrorCodes::BAD_ARGUMENTS);
if (!args.attach && !indices_description.empty() && !args.local_context.getSettingsRef().allow_experimental_data_skipping_indices)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);

View File

@ -253,10 +253,6 @@ StorageDistributed::StorageDistributed(
if (num_local_shards && remote_database == id_.database_name && remote_table == id_.table_name)
throw Exception("Distributed table " + id_.table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
}
if (remote_database.empty())
{
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
}
}
@ -480,6 +476,9 @@ void StorageDistributed::alter(const AlterCommands & params, const Context & con
void StorageDistributed::startup()
{
if (remote_database.empty() && !remote_table_function_ptr)
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
if (!volume)
return;
@ -723,7 +722,7 @@ void registerStorageDistributed(StorageFactory & factory)
"policy to store data in (optional).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name = getClusterName(*engine_args[0]);
String cluster_name = getClusterNameAndMakeLiteral(engine_args[0]);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);

View File

@ -1 +1 @@
CREATE TABLE default.BannerDict (`BannerID` UInt64, `CompaignID` UInt64) ENGINE = ODBC(\'DSN=pgconn;Database=postgres\', somedb, bannerdict)
CREATE TABLE default.BannerDict (`BannerID` UInt64, `CompaignID` UInt64) ENGINE = ODBC(\'DSN=pgconn;Database=postgres\', \'somedb\', \'bannerdict\')

View File

@ -2,10 +2,10 @@ CREATE TABLE test_01083.file (`n` Int8) ENGINE = File(\'TSVWithNamesAndTypes\')
CREATE TABLE test_01083.buffer (`n` Int8) ENGINE = Buffer(\'test_01083\', \'file\', 16, 10, 200, 10000, 1000000, 10000000, 1000000000)
CREATE TABLE test_01083.merge (`n` Int8) ENGINE = Merge(\'test_01083\', \'distributed\')
CREATE TABLE test_01083.merge_tf AS merge(\'test_01083\', \'.*\')
CREATE TABLE test_01083.distributed (`n` Int8) ENGINE = Distributed(test_cluster, \'test_01083\', \'file\')
CREATE TABLE test_01083.distributed_tf AS cluster(\'test_cluster\', \'test_01083\', \'file\')
CREATE TABLE test_01083.distributed (`n` Int8) ENGINE = Distributed(\'test_shard_localhost\', \'test_01083\', \'file\')
CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'file\')
CREATE TABLE test_01083.url (`n` UInt64, `_path` String) ENGINE = URL(\'http://localhost:8123/?query=select+n,+_path+from+test_01083.file+format+CSV\', \'CSV\')
CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_cluster\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'url\')))
CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'url\')))
1
1
1

View File

@ -6,8 +6,8 @@ CREATE TABLE file (n Int8) ENGINE = File(upper('tsv') || 'WithNames' || 'AndType
CREATE TABLE buffer (n Int8) ENGINE = Buffer(currentDatabase(), file, 16, 10, 200, 10000, 1000000, 10000000, 1000000000);
CREATE TABLE merge (n Int8) ENGINE = Merge('', lower('DISTRIBUTED'));
CREATE TABLE merge_tf as merge(currentDatabase(), '.*');
CREATE TABLE distributed (n Int8) ENGINE = Distributed(test_cluster, currentDatabase(), 'fi' || 'le');
CREATE TABLE distributed_tf as cluster('test' || '_' || 'cluster', '', 'fi' || 'le');
CREATE TABLE distributed (n Int8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'fi' || 'le');
CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'fi' || 'le');
INSERT INTO file VALUES (1);
CREATE TABLE url (n UInt64, _path String) ENGINE=URL
@ -31,7 +31,7 @@ CREATE TABLE rich_syntax as remote
'localhos{x|y|t}',
cluster
(
'test' || '_' || 'cluster',
'test' || '_' || 'shard_localhost',
remote
(
'127.0.0.{1..4}',