This commit is contained in:
kssenii 2021-09-16 01:45:43 +03:00
parent 221c09589c
commit 1650c92407
14 changed files with 209 additions and 123 deletions

View File

@ -138,13 +138,17 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & arguments = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(arguments, context, true);
StorageMySQLConfiguration configuration(common_configuration);
if (with_named_collection)
StorageMySQLConfiguration configuration;
ASTs & arguments = engine->arguments->children;
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
@ -155,8 +159,8 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MySQL database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.");
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
arguments[1] = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[1], context);
const auto & host_port = safeGetLiteralValue<String>(arguments[0], engine_name);
if (engine_name == "MySQL")
@ -263,13 +267,16 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context, true);
StoragePostgreSQLConfiguration configuration(common_configuration);
auto use_table_cache = false;
StoragePostgreSQLConfiguration configuration;
if (with_named_collection)
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "use_table_cache")
@ -320,11 +327,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", engine_name);
ASTs & engine_args = engine->arguments->children;
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context, true);
StoragePostgreSQLConfiguration configuration(common_configuration);
StoragePostgreSQLConfiguration configuration;
if (with_named_collection)
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context, true))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"MaterializedPostgreSQL Database requires only `host`, `port`, `database_name`, `username`, `password`.");

View File

@ -38,7 +38,19 @@ String ExternalDataSourceConfiguration::toString() const
}
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration & conf)
{
host = conf.host;
port = conf.port;
username = conf.username;
password = conf.username;
database = conf.database;
table = conf.table;
schema = conf.schema;
}
std::optional<std::tuple<ExternalDataSourceConfiguration, EngineArgs>>
getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine)
{
if (args.empty())
@ -106,9 +118,9 @@ getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool i
}
}
return std::make_tuple(configuration, non_common_args, true);
return std::make_tuple(configuration, non_common_args);
}
return std::make_tuple(configuration, non_common_args, false);
return std::nullopt;
}
@ -205,7 +217,16 @@ ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
}
std::tuple<URLBasedDataSourceConfiguration, EngineArgs, bool>
void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration & conf)
{
url = conf.url;
format = conf.format;
compression_method = conf.compression_method;
structure = conf.structure;
}
std::optional<std::tuple<URLBasedDataSourceConfiguration, EngineArgs>>
getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context)
{
if (args.empty())
@ -277,9 +298,9 @@ getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Storage requires {}", configuration.url.empty() ? "url" : "format");
return std::make_tuple(configuration, non_common_args, true);
return std::make_tuple(configuration, non_common_args);
}
return std::make_tuple(configuration, non_common_args, false);
return std::nullopt;
}
}

View File

@ -21,6 +21,8 @@ struct ExternalDataSourceConfiguration
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
String toString() const;
void set(const ExternalDataSourceConfiguration & conf);
};
using ExternalDataSourceConfigurationPtr = std::shared_ptr<ExternalDataSourceConfiguration>;
@ -28,27 +30,18 @@ using ExternalDataSourceConfigurationPtr = std::shared_ptr<ExternalDataSourceCon
struct StoragePostgreSQLConfiguration : ExternalDataSourceConfiguration
{
explicit StoragePostgreSQLConfiguration(const ExternalDataSourceConfiguration & common_configuration)
: ExternalDataSourceConfiguration(common_configuration) {}
String on_conflict;
};
struct StorageMySQLConfiguration : ExternalDataSourceConfiguration
{
explicit StorageMySQLConfiguration(const ExternalDataSourceConfiguration & common_configuration)
: ExternalDataSourceConfiguration(common_configuration) {}
bool replace_query = false;
String on_duplicate_clause;
};
struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
{
explicit StorageMongoDBConfiguration(const ExternalDataSourceConfiguration & common_configuration)
: ExternalDataSourceConfiguration(common_configuration) {}
String collection;
String options;
};
@ -66,7 +59,7 @@ using EngineArgs = std::vector<std::pair<String, DB::Field>>;
* If there are key-value arguments apart from common: `host`, `port`, `username`, `password`, `database`,
* i.e. storage-specific arguments, then return them back in a set: ExternalDataSource::EngineArgs.
*/
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
std::optional<std::tuple<ExternalDataSourceConfiguration, EngineArgs>>
getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false);
ExternalDataSourceConfiguration getExternalDataSourceConfiguration(
@ -96,18 +89,17 @@ struct URLBasedDataSourceConfiguration
String structure;
std::vector<std::pair<String, Field>> headers;
void set(const URLBasedDataSourceConfiguration & conf);
};
struct StorageS3Configuration : URLBasedDataSourceConfiguration
{
explicit StorageS3Configuration(const URLBasedDataSourceConfiguration & common_configuration)
: URLBasedDataSourceConfiguration(common_configuration) {}
String access_key_id;
String secret_access_key;
};
std::tuple<URLBasedDataSourceConfiguration, EngineArgs, bool>
std::optional<std::tuple<URLBasedDataSourceConfiguration, EngineArgs>>
getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context);
}

View File

@ -32,10 +32,7 @@ StorageExternalDistributed::StorageExternalDistributed(
const StorageID & table_id_,
ExternalStorageEngine table_engine,
const String & cluster_description,
const String & remote_database,
const String & remote_table,
const String & username,
const String & password,
const ExternalDataSourceConfiguration & configuration,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
@ -67,15 +64,16 @@ StorageExternalDistributed::StorageExternalDistributed(
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 3306);
mysqlxx::PoolWithFailover pool(
remote_database,
configuration.database,
addresses,
username, password);
configuration.username,
configuration.password);
shard = StorageMySQL::create(
table_id_,
std::move(pool),
remote_database,
remote_table,
configuration.database,
configuration.table,
/* replace_query = */ false,
/* on_duplicate_clause = */ "",
columns_,
@ -91,24 +89,16 @@ StorageExternalDistributed::StorageExternalDistributed(
case ExternalStorageEngine::PostgreSQL:
{
addresses = parseRemoteDescriptionForExternalDatabase(shard_description, max_addresses, 5432);
ExternalDataSourceConfiguration configuration
{
.host = "",
.port = 0,
.username = username,
.password = password,
.database = remote_database,
.table = remote_table,
.schema = "",
.addresses = addresses
};
StoragePostgreSQLConfiguration postgres_conf;
postgres_conf.set(configuration);
postgres_conf.addresses = addresses;
auto pool = std::make_shared<postgres::PoolWithFailover>(
StoragePostgreSQLConfiguration(configuration),
postgres_conf,
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);
shard = StoragePostgreSQL::create(table_id_, std::move(pool), remote_table, columns_, constraints_, String{});
shard = StoragePostgreSQL::create(table_id_, std::move(pool), configuration.table, columns_, constraints_, String{});
break;
}
#endif
@ -123,13 +113,10 @@ StorageExternalDistributed::StorageExternalDistributed(
}
#else
(void)table_engine;
(void)remote_database;
(void)remote_table;
(void)username;
(void)password;
(void)shards_descriptions;
(void)configuration;
(void)cluster_description;
(void)addresses;
(void)table_engine;
#endif
}
@ -217,64 +204,113 @@ void registerStorageExternalDistributed(StorageFactory & factory)
factory.registerStorage("ExternalDistributed", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine ExternalDistributed must have at least 2 arguments: engine_name, named_collection and/or description");
if (engine_args.size() != 6)
throw Exception(
"Storage MySQLiDistributed requires 5 parameters: ExternalDistributed('engine_name', 'cluster_description', database, table, 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
const String & engine_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
const String & addresses_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
auto engine_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
StorageExternalDistributed::ExternalStorageEngine table_engine;
if (engine_name == "URL")
{
table_engine = StorageExternalDistributed::ExternalStorageEngine::URL;
else if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL",
engine_name);
ASTs inner_engine_args(engine_args.begin() + 1, engine_args.end());
String cluster_description;
if (engine_name == "URL")
{
URLBasedDataSourceConfiguration configuration;
if (auto named_collection = getURLBasedDataSourceConfiguration(inner_engine_args, args.getLocalContext()))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [name, value] : storage_specific_args)
{
if (name == "description")
cluster_description = value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unknown key-value argument {} for table engine URL", name);
}
if (cluster_description.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Engine ExternalDistribued must have `description` key-value arguement or named collection parameter");
}
else
{
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
cluster_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.format = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
configuration.compression_method = "auto";
if (engine_args.size() == 4)
configuration.compression_method = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
}
const String & format_name = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
String compression_method = "auto";
if (engine_args.size() == 4)
compression_method = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
return StorageExternalDistributed::create(
addresses_description,
cluster_description,
args.table_id,
format_name,
configuration.format,
format_settings,
compression_method,
configuration.compression_method,
args.columns,
args.constraints,
args.getContext());
}
else
{
if (engine_name == "MySQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::MySQL;
else if (engine_name == "PostgreSQL")
table_engine = StorageExternalDistributed::ExternalStorageEngine::PostgreSQL;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL, URL",
engine_name);
ExternalDataSourceConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(inner_engine_args, args.getLocalContext()))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [name, value] : storage_specific_args)
{
if (name == "description")
cluster_description = value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unknown key-value argument {} for table function URL", name);
}
if (cluster_description.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Engine ExternalDistribued must have `description` key-value arguement or named collection parameter");
}
else
{
if (engine_args.size() != 6)
throw Exception(
"Storage ExternalDistributed requires 5 parameters: "
"ExternalDistributed('engine_name', 'cluster_description', 'database', 'table', 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
cluster_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.database = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
configuration.table = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
configuration.username = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
configuration.password = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
}
const String & remote_database = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
return StorageExternalDistributed::create(
args.table_id,
table_engine,
addresses_description,
remote_database,
remote_table,
username,
password,
cluster_description,
configuration,
args.columns,
args.constraints,
args.comment,

View File

@ -11,11 +11,13 @@
namespace DB
{
struct ExternalDataSourceConfiguration;
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
/// This class unites multiple storages with replicas into multiple shards with replicas.
/// A query to external database is passed to one replica on each shard, the result is united.
/// Replicas on each shard have the same priority, traversed replicas are moved to the end of the queue.
/// TODO: try `load_balancing` setting for replicas priorities same way as for table function `remote`
/// Similar approach is used for URL storage.
class StorageExternalDistributed final : public shared_ptr_helper<StorageExternalDistributed>, public DB::IStorage
{
friend struct shared_ptr_helper<StorageExternalDistributed>;
@ -44,10 +46,7 @@ protected:
const StorageID & table_id_,
ExternalStorageEngine table_engine,
const String & cluster_description,
const String & remote_database_,
const String & remote_table_,
const String & username,
const String & password,
const ExternalDataSourceConfiguration & configuration,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,

View File

@ -106,11 +106,12 @@ Pipe StorageMongoDB::read(
StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, ContextPtr context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context);
StorageMongoDBConfiguration configuration(common_configuration);
if (with_named_collection)
StorageMongoDBConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "collection")

View File

@ -237,12 +237,14 @@ SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMeta
StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context_);
StorageMySQLConfiguration configuration(common_configuration);
StorageMySQLConfiguration configuration;
if (with_named_collection)
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context_))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "replace_query")

View File

@ -386,12 +386,14 @@ SinkToStoragePtr StoragePostgreSQL::write(
StoragePostgreSQLConfiguration StoragePostgreSQL::getConfiguration(ASTs engine_args, ContextPtr context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getExternalDataSourceConfiguration(engine_args, context);
StoragePostgreSQLConfiguration configuration(common_configuration);
if (with_named_collection)
StoragePostgreSQLConfiguration configuration;
if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "on_conflict")

View File

@ -735,11 +735,13 @@ void StorageS3::updateClientAndAuthSettings(ContextPtr ctx, StorageS3::ClientAut
StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(engine_args, local_context);
StorageS3Configuration configuration(common_configuration);
StorageS3Configuration configuration;
if (with_named_collection)
if (auto named_collection = getURLBasedDataSourceConfiguration(engine_args, local_context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")

View File

@ -385,11 +385,13 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(args, local_context);
URLBasedDataSourceConfiguration configuration(common_configuration);
URLBasedDataSourceConfiguration configuration;
if (with_named_collection)
if (auto named_collection = getURLBasedDataSourceConfiguration(args, local_context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
if (!storage_specific_args.empty())
{
String illegal_args;

View File

@ -38,11 +38,13 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr con
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = args_func.at(0)->children;
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(args, context);
StorageS3Configuration configuration(common_configuration);
StorageS3Configuration configuration;
if (with_named_collection)
if (auto named_collection = getURLBasedDataSourceConfiguration(args, context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")

View File

@ -24,11 +24,13 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr co
const auto & func_args = ast_function->as<ASTFunction &>();
if (!func_args.arguments)
throw Exception("Table function 'URL' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(func_args.arguments->children, context);
URLBasedDataSourceConfiguration configuration(common_configuration);
if (with_named_collection)
URLBasedDataSourceConfiguration configuration;
if (auto with_named_collection = getURLBasedDataSourceConfiguration(func_args.arguments->children, context))
{
auto [common_configuration, storage_specific_args] = with_named_collection.value();
configuration.set(common_configuration);
if (!storage_specific_args.empty())
{
String illegal_args;

View File

@ -21,5 +21,13 @@
<database>postgres</database>
<table>test_table</table>
</postgres3>
<postgres4>
<user>postgres</user>
<password>mysecretpassword</password>
<host>postgres1</host>
<port>5432</port>
<database>postgres</database>
<table>test_replicas</table>
</postgres4>
</named_collections>
</yandex>

View File

@ -6,7 +6,7 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/named_collections.xml'], with_postgres=True)
node2 = cluster.add_instance('node2', with_postgres_cluster=True)
node2 = cluster.add_instance('node2', main_configs=['configs/named_collections.xml'], with_postgres_cluster=True)
@pytest.fixture(scope="module")
@ -274,6 +274,14 @@ def test_postgres_distributed(started_cluster):
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
assert(result == 'host1\nhost3\n')
node2.query('''
CREATE TABLE test_shards2
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = ExternalDistributed('PostgreSQL', postgres4, description='postgres{1|2}:5432,postgres{3|4}:5432'); ''')
result = node2.query("SELECT DISTINCT(name) FROM test_shards2 ORDER BY name")
assert(result == 'host1\nhost3\n')
# Check all replicas are traversed
query = "SELECT name FROM ("
for i in range (3):