ClickHouse/src/Storages/StorageExternalDistributed.cpp

199 lines
7.6 KiB
C++
Raw Normal View History

#include "StorageExternalDistributed.h"
2021-03-27 14:35:44 +00:00
#include <Storages/StorageFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
2022-05-23 19:47:32 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
#include <Core/PostgreSQL/PoolWithFailover.h>
2021-03-27 14:35:44 +00:00
#include <Parsers/ASTLiteral.h>
#include <Common/parseAddress.h>
2022-05-23 19:47:32 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
2021-03-27 14:35:44 +00:00
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageMySQL.h>
#include <Storages/MySQL/MySQLSettings.h>
2021-03-27 19:18:05 +00:00
#include <Storages/StoragePostgreSQL.h>
2021-04-21 12:32:57 +00:00
#include <Storages/StorageURL.h>
#include <Storages/MySQL/MySQLHelpers.h>
2022-12-16 23:34:29 +00:00
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/checkAndGetLiteralArgument.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2022-05-23 19:47:32 +00:00
#include <Processors/QueryPlan/UnionStep.h>
2021-03-27 14:35:44 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
StorageExternalDistributed::StorageExternalDistributed(
2021-03-27 14:35:44 +00:00
const StorageID & table_id_,
std::unordered_set<StoragePtr> && shards_,
2021-03-27 14:35:44 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment)
2021-03-27 14:35:44 +00:00
: IStorage(table_id_)
, shards(shards_)
2021-03-27 14:35:44 +00:00
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2021-03-27 14:35:44 +00:00
setInMemoryMetadata(storage_metadata);
}
2022-05-23 19:47:32 +00:00
void StorageExternalDistributed::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
2021-03-27 14:35:44 +00:00
{
2022-05-23 19:47:32 +00:00
std::vector<std::unique_ptr<QueryPlan>> plans;
2021-03-27 14:35:44 +00:00
for (const auto & shard : shards)
{
2022-05-23 19:47:32 +00:00
plans.emplace_back(std::make_unique<QueryPlan>());
shard->read(
*plans.back(),
column_names,
storage_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams
2022-05-23 19:47:32 +00:00
);
2021-03-27 14:35:44 +00:00
}
2022-05-23 19:47:32 +00:00
if (plans.empty())
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
}
if (plans.size() == 1)
{
query_plan = std::move(*plans.front());
return;
}
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
2021-03-27 14:35:44 +00:00
}
void registerStorageExternalDistributed(StorageFactory & factory)
2021-03-27 14:35:44 +00:00
{
factory.registerStorage("ExternalDistributed", [](const StorageFactory::Arguments & args)
2021-03-27 14:35:44 +00:00
{
ASTs & engine_args = args.engine_args;
2021-09-15 22:45:43 +00:00
if (engine_args.size() < 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Engine ExternalDistributed must have at least 2 arguments: "
"engine_name, named_collection and/or description");
2021-03-27 14:35:44 +00:00
auto context = args.getLocalContext();
2023-02-21 15:20:57 +00:00
[[maybe_unused]] const auto & settings = context->getSettingsRef();
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto get_addresses = [&](const std::string addresses_expr)
{
return parseRemoteDescription(addresses_expr, 0, addresses_expr.size(), ',', max_addresses);
};
2021-03-27 14:35:44 +00:00
std::unordered_set<StoragePtr> shards;
2021-09-15 22:45:43 +00:00
ASTs inner_engine_args(engine_args.begin() + 1, engine_args.end());
2021-03-27 14:35:44 +00:00
auto engine_name = checkAndGetLiteralArgument<String>(engine_args[0], "engine_name");
2021-04-21 12:32:57 +00:00
if (engine_name == "URL")
{
auto configuration = StorageURL::getConfiguration(inner_engine_args, context);
auto shards_addresses = get_addresses(configuration.addresses_expr);
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
for (const auto & shard_address : shards_addresses)
2021-09-15 22:45:43 +00:00
{
auto uri_options = parseRemoteDescription(shard_address, 0, shard_address.size(), '|', max_addresses);
if (uri_options.size() > 1)
{
shards.insert(
std::make_shared<StorageURLWithFailover>(
uri_options, args.table_id, configuration.format, format_settings,
args.columns, args.constraints, context, configuration.compression_method));
}
else
{
shards.insert(std::make_shared<StorageURL>(
shard_address, args.table_id, configuration.format, format_settings,
args.columns, args.constraints, String{}, context, configuration.compression_method));
}
2021-09-15 22:45:43 +00:00
}
2021-04-21 12:32:57 +00:00
}
#if USE_MYSQL
else if (engine_name == "MySQL")
2021-04-21 12:32:57 +00:00
{
MySQLSettings mysql_settings;
auto configuration = StorageMySQL::getConfiguration(inner_engine_args, context, mysql_settings);
auto shards_addresses = get_addresses(configuration.addresses_expr);
for (const auto & shard_address : shards_addresses)
2021-09-15 22:45:43 +00:00
{
auto current_configuration{configuration};
current_configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_address, max_addresses, 3306);
2023-03-02 18:04:33 +00:00
auto pool = createMySQLPoolWithFailover(current_configuration, mysql_settings);
shards.insert(std::make_shared<StorageMySQL>(
args.table_id, std::move(pool), configuration.database, configuration.table,
/* replace_query = */ false, /* on_duplicate_clause = */ "",
args.columns, args.constraints, String{}, context, mysql_settings));
2021-09-15 22:45:43 +00:00
}
}
#endif
#if USE_LIBPQXX
else if (engine_name == "PostgreSQL")
{
auto configuration = StoragePostgreSQL::getConfiguration(inner_engine_args, context);
auto shards_addresses = get_addresses(configuration.addresses_expr);
for (const auto & shard_address : shards_addresses)
2021-09-15 22:45:43 +00:00
{
auto current_configuration{configuration};
current_configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_address, max_addresses, 5432);
auto pool = std::make_shared<postgres::PoolWithFailover>(
current_configuration,
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
shards.insert(std::make_shared<StoragePostgreSQL>(
args.table_id, std::move(pool), configuration.table, args.columns, args.constraints, String{}));
2021-09-15 22:45:43 +00:00
}
2021-04-21 12:32:57 +00:00
}
#endif
else
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. "
"Supported engines are: MySQL, PostgreSQL, URL",
engine_name);
}
return std::make_shared<StorageExternalDistributed>(
args.table_id,
std::move(shards),
args.columns,
args.constraints,
args.comment);
2021-03-27 14:35:44 +00:00
},
{
2021-04-21 12:32:57 +00:00
.source_access_type = AccessType::SOURCES,
2021-03-27 14:35:44 +00:00
});
}
}