This commit is contained in:
Alexey Milovidov 2024-08-09 21:42:16 +02:00
parent b421594791
commit ae5982f92a
2 changed files with 22 additions and 17 deletions

View File

@ -79,11 +79,16 @@ std::vector<String> parseRemoteDescription(
/// Look for the corresponding closing bracket /// Look for the corresponding closing bracket
for (m = i + 1; m < r; ++m) for (m = i + 1; m < r; ++m)
{ {
if (description[m] == '{') ++cnt; if (description[m] == '{')
if (description[m] == '}') --cnt; ++cnt;
if (description[m] == '.' && description[m-1] == '.') last_dot = m; if (description[m] == '}')
if (description[m] == separator) have_splitter = true; --cnt;
if (cnt == 0) break; if (description[m] == '.' && description[m-1] == '.')
last_dot = m;
if (description[m] == separator)
have_splitter = true;
if (cnt == 0)
break;
} }
if (cnt != 0) if (cnt != 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}': incorrect brace sequence in first argument", func_name); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}': incorrect brace sequence in first argument", func_name);

View File

@ -111,15 +111,17 @@ void registerStorageExternalDistributed(StorageFactory & factory)
std::unordered_set<StoragePtr> shards; std::unordered_set<StoragePtr> shards;
ASTs inner_engine_args(engine_args.begin() + 1, engine_args.end()); ASTs inner_engine_args(engine_args.begin() + 1, engine_args.end());
String addresses_expr = checkAndGetLiteralArgument<String>(engine_args[1], "addresses");
Strings shards_addresses = get_addresses(addresses_expr);
auto engine_name = checkAndGetLiteralArgument<String>(engine_args[0], "engine_name"); auto engine_name = checkAndGetLiteralArgument<String>(engine_args[0], "engine_name");
if (engine_name == "URL") if (engine_name == "URL")
{ {
auto configuration = StorageURL::getConfiguration(inner_engine_args, context);
auto shards_addresses = get_addresses(configuration.addresses_expr);
auto format_settings = StorageURL::getFormatSettingsFromArgs(args); auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
for (const auto & shard_address : shards_addresses) for (const auto & shard_address : shards_addresses)
{ {
inner_engine_args.at(0) = std::make_shared<ASTLiteral>(shard_address);
auto configuration = StorageURL::getConfiguration(inner_engine_args, context);
auto uri_options = parseRemoteDescription(shard_address, 0, shard_address.size(), '|', max_addresses); auto uri_options = parseRemoteDescription(shard_address, 0, shard_address.size(), '|', max_addresses);
if (uri_options.size() > 1) if (uri_options.size() > 1)
{ {
@ -140,13 +142,12 @@ void registerStorageExternalDistributed(StorageFactory & factory)
else if (engine_name == "MySQL") else if (engine_name == "MySQL")
{ {
MySQLSettings mysql_settings; MySQLSettings mysql_settings;
auto configuration = StorageMySQL::getConfiguration(inner_engine_args, context, mysql_settings);
auto shards_addresses = get_addresses(configuration.addresses_expr);
for (const auto & shard_address : shards_addresses) for (const auto & shard_address : shards_addresses)
{ {
auto current_configuration{configuration}; inner_engine_args.at(0) = std::make_shared<ASTLiteral>(shard_address);
current_configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_address, max_addresses, 3306); auto configuration = StorageMySQL::getConfiguration(inner_engine_args, context, mysql_settings);
auto pool = createMySQLPoolWithFailover(current_configuration, mysql_settings); configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_address, max_addresses, 3306);
auto pool = createMySQLPoolWithFailover(configuration, mysql_settings);
shards.insert(std::make_shared<StorageMySQL>( shards.insert(std::make_shared<StorageMySQL>(
args.table_id, std::move(pool), configuration.database, configuration.table, args.table_id, std::move(pool), configuration.database, configuration.table,
/* replace_query = */ false, /* on_duplicate_clause = */ "", /* replace_query = */ false, /* on_duplicate_clause = */ "",
@ -157,14 +158,13 @@ void registerStorageExternalDistributed(StorageFactory & factory)
#if USE_LIBPQXX #if USE_LIBPQXX
else if (engine_name == "PostgreSQL") else if (engine_name == "PostgreSQL")
{ {
auto configuration = StoragePostgreSQL::getConfiguration(inner_engine_args, context);
auto shards_addresses = get_addresses(configuration.addresses_expr);
for (const auto & shard_address : shards_addresses) for (const auto & shard_address : shards_addresses)
{ {
auto current_configuration{configuration}; inner_engine_args.at(0) = std::make_shared<ASTLiteral>(shard_address);
current_configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_address, max_addresses, 5432); auto configuration = StoragePostgreSQL::getConfiguration(inner_engine_args, context);
configuration.addresses = parseRemoteDescriptionForExternalDatabase(shard_address, max_addresses, 5432);
auto pool = std::make_shared<postgres::PoolWithFailover>( auto pool = std::make_shared<postgres::PoolWithFailover>(
current_configuration, configuration,
settings.postgresql_connection_pool_size, settings.postgresql_connection_pool_size,
settings.postgresql_connection_pool_wait_timeout, settings.postgresql_connection_pool_wait_timeout,
settings.postgresql_connection_pool_retries, settings.postgresql_connection_pool_retries,