ClickHouse/src/Storages/StorageExternalDistributed.cpp

341 lines
13 KiB
C++
Raw Normal View History

#include "StorageExternalDistributed.h"
2021-03-27 14:35:44 +00:00
#include <Storages/StorageFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
2022-05-23 19:47:32 +00:00
#include <Interpreters/InterpreterSelectQuery.h>
#include <Core/PostgreSQL/PoolWithFailover.h>
2021-03-27 14:35:44 +00:00
#include <Parsers/ASTLiteral.h>
#include <Common/parseAddress.h>
2022-05-23 19:47:32 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
2021-03-27 14:35:44 +00:00
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageMySQL.h>
#include <Storages/MySQL/MySQLSettings.h>
2021-03-27 19:18:05 +00:00
#include <Storages/StoragePostgreSQL.h>
2021-04-21 12:32:57 +00:00
#include <Storages/StorageURL.h>
2021-09-03 11:16:32 +00:00
#include <Storages/ExternalDataSourceConfiguration.h>
2022-12-16 23:34:29 +00:00
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/checkAndGetLiteralArgument.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2022-05-23 19:47:32 +00:00
#include <Processors/QueryPlan/UnionStep.h>
2021-03-27 14:35:44 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
StorageExternalDistributed::StorageExternalDistributed(
2021-03-27 14:35:44 +00:00
const StorageID & table_id_,
2021-03-28 10:27:37 +00:00
ExternalStorageEngine table_engine,
2021-03-27 14:35:44 +00:00
const String & cluster_description,
2021-09-15 22:45:43 +00:00
const ExternalDataSourceConfiguration & configuration,
2021-03-27 14:35:44 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
ContextPtr context)
2021-03-27 14:35:44 +00:00
: IStorage(table_id_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2021-03-27 14:35:44 +00:00
setInMemoryMetadata(storage_metadata);
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
2021-03-27 14:35:44 +00:00
std::vector<String> shards_descriptions = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses);
2021-04-01 12:09:44 +00:00
std::vector<std::pair<std::string, UInt16>> addresses;
2021-03-27 14:35:44 +00:00
2021-04-21 16:17:45 +00:00
#if USE_MYSQL || USE_LIBPQXX
/// For each shard pass replicas description into storage, replicas are managed by storage's PoolWithFailover.
2021-03-27 14:35:44 +00:00
for (const auto & shard_description : shards_descriptions)
{
StoragePtr shard;
2021-03-28 10:27:37 +00:00
switch (table_engine)
{
2021-03-28 10:27:37 +00:00
#if USE_MYSQL
case ExternalStorageEngine::MySQL:
{
2021-04-01 12:09:44 +00:00
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 3306);
2021-04-01 11:30:20 +00:00
2021-03-28 10:27:37 +00:00
mysqlxx::PoolWithFailover pool(
2021-09-15 22:45:43 +00:00
configuration.database,
2021-04-01 10:27:24 +00:00
addresses,
2021-09-15 22:45:43 +00:00
configuration.username,
configuration.password);
2021-03-28 10:27:37 +00:00
2022-05-04 10:01:29 +00:00
shard = std::make_shared<StorageMySQL>(
2021-03-28 10:27:37 +00:00
table_id_,
std::move(pool),
2021-09-15 22:45:43 +00:00
configuration.database,
configuration.table,
2021-03-28 10:27:37 +00:00
/* replace_query = */ false,
/* on_duplicate_clause = */ "",
2021-04-23 12:18:23 +00:00
columns_,
constraints_,
String{},
context,
MySQLSettings{});
2021-03-28 10:27:37 +00:00
break;
}
#endif
#if USE_LIBPQXX
case ExternalStorageEngine::PostgreSQL:
{
2021-04-01 12:09:44 +00:00
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432);
StoragePostgreSQL::Configuration postgres_conf;
2021-09-15 22:45:43 +00:00
postgres_conf.addresses = addresses;
postgres_conf.username = configuration.username;
postgres_conf.password = configuration.password;
postgres_conf.database = configuration.database;
postgres_conf.table = configuration.table;
postgres_conf.schema = configuration.schema;
2021-04-01 11:30:20 +00:00
2022-06-23 21:44:59 +00:00
const auto & settings = context->getSettingsRef();
2021-05-07 11:18:49 +00:00
auto pool = std::make_shared<postgres::PoolWithFailover>(
2021-09-15 22:45:43 +00:00
postgres_conf,
2022-06-23 21:44:59 +00:00
settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout,
POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
settings.postgresql_connection_pool_auto_close_connection);
2021-03-28 10:27:37 +00:00
shard = std::make_shared<StoragePostgreSQL>(table_id_, std::move(pool), configuration.table, columns_, constraints_, String{});
2021-03-28 10:27:37 +00:00
break;
}
#endif
default:
2021-04-21 16:17:45 +00:00
{
2021-03-28 10:27:37 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS,
2021-04-21 12:32:57 +00:00
"Unsupported table engine. Supported engines are: MySQL, PostgreSQL, URL");
2021-04-21 16:17:45 +00:00
}
}
shards.emplace(std::move(shard));
2021-03-27 14:35:44 +00:00
}
2021-04-21 16:17:45 +00:00
#else
2021-09-15 22:45:43 +00:00
(void)configuration;
(void)cluster_description;
2021-04-21 16:17:45 +00:00
(void)addresses;
2021-09-15 22:45:43 +00:00
(void)table_engine;
2021-04-21 16:17:45 +00:00
#endif
2021-03-27 14:35:44 +00:00
}
2021-04-21 12:32:57 +00:00
StorageExternalDistributed::StorageExternalDistributed(
const String & addresses_description,
const StorageID & table_id,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const String & compression_method,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
ContextPtr context)
: IStorage(table_id)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
2021-04-21 14:36:04 +00:00
/// Generate addresses without splitting for failover options
std::vector<String> url_descriptions = parseRemoteDescription(addresses_description, 0, addresses_description.size(), ',', max_addresses);
std::vector<String> uri_options;
2021-04-21 12:32:57 +00:00
2021-04-21 14:36:04 +00:00
for (const auto & url_description : url_descriptions)
2021-04-21 12:32:57 +00:00
{
2021-04-21 14:36:04 +00:00
/// For each uri (which acts like shard) check if it has failover options
uri_options = parseRemoteDescription(url_description, 0, url_description.size(), '|', max_addresses);
StoragePtr shard;
if (uri_options.size() > 1)
{
shard = std::make_shared<StorageURLWithFailover>(
uri_options,
table_id,
format_name,
format_settings,
columns, constraints, context,
compression_method);
}
else
{
shard = std::make_shared<StorageURL>(
2021-10-26 09:31:01 +00:00
url_description, table_id, format_name, format_settings, columns, constraints, String{}, context, compression_method);
2021-04-21 14:36:04 +00:00
LOG_DEBUG(&Poco::Logger::get("StorageURLDistributed"), "Adding URL: {}", url_description);
}
shards.emplace(std::move(shard));
2021-03-27 14:35:44 +00:00
}
}
2022-05-23 19:47:32 +00:00
void StorageExternalDistributed::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
2021-03-27 14:35:44 +00:00
{
2022-05-23 19:47:32 +00:00
std::vector<std::unique_ptr<QueryPlan>> plans;
2021-03-27 14:35:44 +00:00
for (const auto & shard : shards)
{
2022-05-23 19:47:32 +00:00
plans.emplace_back(std::make_unique<QueryPlan>());
shard->read(
*plans.back(),
column_names,
storage_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams
2022-05-23 19:47:32 +00:00
);
2021-03-27 14:35:44 +00:00
}
2022-05-23 19:47:32 +00:00
if (plans.empty())
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
}
if (plans.size() == 1)
{
query_plan = std::move(*plans.front());
return;
}
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
2021-03-27 14:35:44 +00:00
}
void registerStorageExternalDistributed(StorageFactory & factory)
2021-03-27 14:35:44 +00:00
{
factory.registerStorage("ExternalDistributed", [](const StorageFactory::Arguments & args)
2021-03-27 14:35:44 +00:00
{
ASTs & engine_args = args.engine_args;
2021-09-15 22:45:43 +00:00
if (engine_args.size() < 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine ExternalDistributed must have at least 2 arguments: engine_name, named_collection and/or description");
2021-03-27 14:35:44 +00:00
auto engine_name = checkAndGetLiteralArgument<String>(engine_args[0], "engine_name");
2021-09-15 22:45:43 +00:00
StorageExternalDistributed::ExternalStorageEngine table_engine;
if (engine_name == "URL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::URL;
else if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL",
engine_name);
2021-03-27 14:35:44 +00:00
2021-09-15 22:45:43 +00:00
ASTs inner_engine_args(engine_args.begin() + 1, engine_args.end());
String cluster_description;
2021-03-27 14:35:44 +00:00
2021-04-21 12:32:57 +00:00
if (engine_name == "URL")
{
2022-12-16 23:34:29 +00:00
StorageURL::Configuration configuration;
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args))
2021-09-15 22:45:43 +00:00
{
2022-12-16 23:34:29 +00:00
StorageURL::processNamedCollectionResult(configuration, *named_collection);
StorageURL::collectHeaders(engine_args, configuration.headers, args.getLocalContext());
2021-09-15 22:45:43 +00:00
}
else
{
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
cluster_description = checkAndGetLiteralArgument<String>(engine_args[1], "cluster_description");
configuration.format = checkAndGetLiteralArgument<String>(engine_args[2], "format");
2021-09-15 22:45:43 +00:00
configuration.compression_method = "auto";
if (engine_args.size() == 4)
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[3], "compression_method");
2021-09-15 22:45:43 +00:00
}
2021-04-21 12:32:57 +00:00
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
return std::make_shared<StorageExternalDistributed>(
2021-09-15 22:45:43 +00:00
cluster_description,
2021-04-21 12:32:57 +00:00
args.table_id,
2021-09-15 22:45:43 +00:00
configuration.format,
2021-04-21 12:32:57 +00:00
format_settings,
2021-09-15 22:45:43 +00:00
configuration.compression_method,
2021-04-21 12:32:57 +00:00
args.columns,
args.constraints,
args.getContext());
}
2021-03-28 10:27:37 +00:00
else
2021-04-21 12:32:57 +00:00
{
2021-09-15 22:45:43 +00:00
ExternalDataSourceConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(inner_engine_args, args.getLocalContext()))
{
2021-12-27 14:41:37 +00:00
auto [common_configuration, storage_specific_args, _] = named_collection.value();
2021-09-15 22:45:43 +00:00
configuration.set(common_configuration);
for (const auto & [name, value] : storage_specific_args)
{
if (name == "description")
cluster_description = checkAndGetLiteralArgument<String>(value, "cluster_description");
2021-09-15 22:45:43 +00:00
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unknown key-value argument {} for table function URL", name);
}
if (cluster_description.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
2021-09-22 15:10:25 +00:00
"Engine ExternalDistribued must have `description` key-value argument or named collection parameter");
2021-09-15 22:45:43 +00:00
}
2021-04-21 12:32:57 +00:00
else
2021-09-15 22:45:43 +00:00
{
if (engine_args.size() != 6)
throw Exception(
"Storage ExternalDistributed requires 5 parameters: "
"ExternalDistributed('engine_name', 'cluster_description', 'database', 'table', 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
cluster_description = checkAndGetLiteralArgument<String>(engine_args[1], "cluster_description");
configuration.database = checkAndGetLiteralArgument<String>(engine_args[2], "database");
configuration.table = checkAndGetLiteralArgument<String>(engine_args[3], "table");
configuration.username = checkAndGetLiteralArgument<String>(engine_args[4], "username");
configuration.password = checkAndGetLiteralArgument<String>(engine_args[5], "password");
2021-09-15 22:45:43 +00:00
}
2021-04-21 12:32:57 +00:00
return std::make_shared<StorageExternalDistributed>(
2021-04-21 12:32:57 +00:00
args.table_id,
table_engine,
2021-09-15 22:45:43 +00:00
cluster_description,
configuration,
2021-04-21 12:32:57 +00:00
args.columns,
args.constraints,
2021-05-02 16:33:45 +00:00
args.comment,
2021-04-21 12:32:57 +00:00
args.getContext());
}
2021-03-27 14:35:44 +00:00
},
{
2021-04-21 12:32:57 +00:00
.source_access_type = AccessType::SOURCES,
2021-03-27 14:35:44 +00:00
});
}
}