ClickHouse/programs/copier/Internals.cpp

298 lines
10 KiB
C++
Raw Normal View History

2020-02-19 15:01:08 +00:00
#include "Internals.h"
2020-03-03 09:49:06 +00:00
#include <Storages/MergeTree/MergeTreeData.h>
2020-06-10 11:16:31 +00:00
#include <Storages/extractKeyExpressionList.h>
2021-09-16 17:40:42 +00:00
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
2020-02-19 15:01:08 +00:00
2020-02-20 09:06:00 +00:00
namespace DB
2020-02-19 20:50:27 +00:00
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
2020-02-19 15:01:08 +00:00
2020-02-20 10:01:02 +00:00
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
2020-02-19 15:59:47 +00:00
ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data)
{
2020-11-10 18:22:26 +00:00
std::stringstream ss(xml_data); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
2020-02-19 15:01:08 +00:00
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
2020-02-19 15:59:47 +00:00
String getQuotedTable(const String & database, const String & table)
{
if (database.empty())
2020-02-19 15:01:08 +00:00
return backQuoteIfNeed(table);
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
}
2020-02-19 15:59:47 +00:00
String getQuotedTable(const DatabaseAndTableName & db_and_table)
{
2020-02-19 15:01:08 +00:00
return getQuotedTable(db_and_table.first, db_and_table.second);
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
2020-02-19 15:45:49 +00:00
const String & cluster_name, const String & database, const String & table,
2020-02-19 15:59:47 +00:00
const ASTPtr & sharding_key_ast)
{
2020-02-19 15:01:08 +00:00
auto args = std::make_shared<ASTExpressionList>();
args->children.emplace_back(std::make_shared<ASTLiteral>(cluster_name));
args->children.emplace_back(std::make_shared<ASTIdentifier>(database));
args->children.emplace_back(std::make_shared<ASTIdentifier>(table));
if (sharding_key_ast)
args->children.emplace_back(sharding_key_ast);
auto engine = std::make_shared<ASTFunction>();
engine->name = "Distributed";
engine->arguments = args;
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, engine);
return storage;
}
2020-02-19 15:59:47 +00:00
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream)
{
2020-02-19 15:01:08 +00:00
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
2021-09-16 17:40:42 +00:00
Block getBlockWithAllStreamData(QueryPipeline pipeline)
2020-02-19 15:59:47 +00:00
{
2021-09-16 17:40:42 +00:00
QueryPipelineBuilder builder;
builder.init(std::move(pipeline));
builder.addTransform(std::make_shared<SquashingChunksTransform>(
builder.getHeader(),
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max()));
auto cur_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
Block block;
PullingPipelineExecutor executor(cur_pipeline);
executor.pull(block);
return block;
2020-02-19 15:01:08 +00:00
}
2020-02-19 15:59:47 +00:00
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast)
{
2020-02-19 15:45:49 +00:00
const auto & storage = storage_ast->as<ASTStorage &>();
2020-02-19 15:01:08 +00:00
return storage.partition_by || storage.order_by || storage.sample_by;
}
2020-02-19 15:59:47 +00:00
ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
{
2020-02-19 15:01:08 +00:00
String storage_str = queryToString(storage_ast);
2020-02-19 15:45:49 +00:00
const auto & storage = storage_ast->as<ASTStorage &>();
const auto & engine = storage.engine->as<ASTFunction &>();
2020-02-19 15:01:08 +00:00
2020-02-19 15:59:47 +00:00
if (!endsWith(engine.name, "MergeTree"))
{
2020-02-19 15:01:08 +00:00
throw Exception(
"Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
2020-02-19 15:59:47 +00:00
if (isExtendedDefinitionStorage(storage_ast))
{
2020-02-19 15:01:08 +00:00
if (storage.partition_by)
return storage.partition_by->clone();
2020-02-19 15:45:49 +00:00
static const char * all = "all";
2020-02-19 15:01:08 +00:00
return std::make_shared<ASTLiteral>(Field(all, strlen(all)));
2020-02-19 15:59:47 +00:00
}
else
{
2020-02-19 15:01:08 +00:00
bool is_replicated = startsWith(engine.name, "Replicated");
size_t min_args = is_replicated ? 3 : 1;
if (!engine.arguments)
throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
ASTPtr arguments_ast = engine.arguments->clone();
2020-02-19 15:45:49 +00:00
ASTs & arguments = arguments_ast->children;
2020-02-19 15:01:08 +00:00
if (arguments.size() < min_args)
throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str,
ErrorCodes::BAD_ARGUMENTS);
2020-02-19 15:45:49 +00:00
ASTPtr & month_arg = is_replicated ? arguments[2] : arguments[1];
2020-02-19 15:01:08 +00:00
return makeASTFunction("toYYYYMM", month_arg->clone());
}
}
2020-03-03 09:49:06 +00:00
ASTPtr extractPrimaryKey(const ASTPtr & storage_ast)
2020-02-20 09:01:06 +00:00
{
String storage_str = queryToString(storage_ast);
2020-03-03 09:49:06 +00:00
const auto & storage = storage_ast->as<ASTStorage &>();
2020-02-20 09:01:06 +00:00
const auto & engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree"))
{
throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
if (!isExtendedDefinitionStorage(storage_ast))
{
throw Exception("Is not extended deginition storage " + storage_str + " Will be fixed later.",
ErrorCodes::BAD_ARGUMENTS);
}
if (storage.primary_key)
return storage.primary_key->clone();
2020-03-03 09:49:06 +00:00
return nullptr;
}
ASTPtr extractOrderBy(const ASTPtr & storage_ast)
{
String storage_str = queryToString(storage_ast);
const auto & storage = storage_ast->as<ASTStorage &>();
const auto & engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree"))
{
throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
if (!isExtendedDefinitionStorage(storage_ast))
{
throw Exception("Is not extended deginition storage " + storage_str + " Will be fixed later.",
ErrorCodes::BAD_ARGUMENTS);
}
if (storage.order_by)
return storage.order_by->clone();
throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
2020-02-20 09:01:06 +00:00
}
/// Wraps only identifiers with backticks.
2020-10-09 17:44:54 +00:00
std::string wrapIdentifiersWithBackticks(const ASTPtr & root)
2020-10-09 16:58:45 +00:00
{
2020-10-09 23:31:33 +00:00
if (auto identifier = std::dynamic_pointer_cast<ASTIdentifier>(root))
return backQuote(identifier->name());
2020-10-09 16:58:45 +00:00
2020-10-09 23:31:33 +00:00
if (auto function = std::dynamic_pointer_cast<ASTFunction>(root))
2020-10-09 17:44:54 +00:00
return function->name + '(' + wrapIdentifiersWithBackticks(function->arguments) + ')';
2020-10-09 16:58:45 +00:00
2020-10-09 17:44:54 +00:00
if (auto expression_list = std::dynamic_pointer_cast<ASTExpressionList>(root))
{
Names function_arguments(expression_list->children.size());
for (size_t i = 0; i < expression_list->children.size(); ++i)
function_arguments[i] = wrapIdentifiersWithBackticks(expression_list->children[0]);
return boost::algorithm::join(function_arguments, ", ");
}
2020-10-09 16:58:45 +00:00
throw Exception("Primary key could be represented only as columns or functions from columns.", ErrorCodes::BAD_ARGUMENTS);
}
2020-03-03 09:49:06 +00:00
Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
2020-02-20 09:01:06 +00:00
{
2020-03-03 09:49:06 +00:00
const auto sorting_key_ast = extractOrderBy(storage_ast);
const auto primary_key_ast = extractPrimaryKey(storage_ast);
2020-02-20 09:01:06 +00:00
2020-06-10 11:16:31 +00:00
const auto sorting_key_expr_list = extractKeyExpressionList(sorting_key_ast);
2020-03-03 09:49:06 +00:00
const auto primary_key_expr_list = primary_key_ast
2020-06-10 11:16:31 +00:00
? extractKeyExpressionList(primary_key_ast) : sorting_key_expr_list->clone();
2020-02-20 09:01:06 +00:00
2020-03-18 13:25:49 +00:00
/// Maybe we have to handle VersionedCollapsing engine separately. But in our case in looks pointless.
2020-02-20 09:01:06 +00:00
2020-03-03 09:49:06 +00:00
size_t primary_key_size = primary_key_expr_list->children.size();
size_t sorting_key_size = sorting_key_expr_list->children.size();
2020-02-20 09:01:06 +00:00
2020-03-03 09:49:06 +00:00
if (primary_key_size > sorting_key_size)
throw Exception("Primary key must be a prefix of the sorting key, but its length: "
+ toString(primary_key_size) + " is greater than the sorting key length: " + toString(sorting_key_size),
ErrorCodes::BAD_ARGUMENTS);
Names primary_key_columns;
NameSet primary_key_columns_set;
for (size_t i = 0; i < sorting_key_size; ++i)
{
2020-10-09 17:44:54 +00:00
/// Column name could be represented as a f_1(f_2(...f_n(column_name))).
/// Each f_i could take one or more parameters.
2020-10-09 17:44:54 +00:00
/// We will wrap identifiers with backticks to allow non-standart identifier names.
2020-03-03 09:49:06 +00:00
String sorting_key_column = sorting_key_expr_list->children[i]->getColumnName();
if (i < primary_key_size)
{
String pk_column = primary_key_expr_list->children[i]->getColumnName();
if (pk_column != sorting_key_column)
throw Exception("Primary key must be a prefix of the sorting key, but the column in the position "
+ toString(i) + " is " + sorting_key_column +", not " + pk_column,
2020-03-03 09:49:06 +00:00
ErrorCodes::BAD_ARGUMENTS);
if (!primary_key_columns_set.emplace(pk_column).second)
throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
2020-10-09 17:44:54 +00:00
primary_key_columns.push_back(wrapIdentifiersWithBackticks(primary_key_expr_list->children[i]));
2020-03-03 09:49:06 +00:00
}
}
return primary_key_columns;
2020-02-20 09:01:06 +00:00
}
2020-09-21 10:24:10 +00:00
bool isReplicatedTableEngine(const ASTPtr & storage_ast)
2020-03-03 13:15:23 +00:00
{
const auto & storage = storage_ast->as<ASTStorage &>();
const auto & engine = storage.engine->as<ASTFunction &>();
if (!endsWith(engine.name, "MergeTree"))
{
2020-09-21 10:24:10 +00:00
String storage_str = queryToString(storage_ast);
2020-03-03 13:15:23 +00:00
throw Exception(
"Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported",
ErrorCodes::BAD_ARGUMENTS);
}
2020-09-21 10:24:10 +00:00
return startsWith(engine.name, "Replicated");
2020-03-03 13:15:23 +00:00
}
2020-02-19 15:59:47 +00:00
ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random)
{
2020-02-19 15:01:08 +00:00
ShardPriority res;
if (replicas.empty())
return res;
res.is_remote = 1;
2020-05-18 08:08:55 +00:00
for (const auto & replica : replicas)
2020-02-19 15:01:08 +00:00
{
if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name)))
{
res.is_remote = 0;
break;
}
}
res.hostname_difference = std::numeric_limits<size_t>::max();
2020-05-18 08:08:55 +00:00
for (const auto & replica : replicas)
2020-02-19 15:01:08 +00:00
{
size_t difference = getHostNameDifference(local_hostname, replica.host_name);
res.hostname_difference = std::min(difference, res.hostname_difference);
}
res.random = random;
return res;
}
}