dbms: StorageDistributed: added failover [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-11-06 17:04:38 +00:00
parent 14af942eae
commit d7d23ac81b
7 changed files with 112 additions and 16 deletions

View File

@ -74,7 +74,7 @@ public:
}
}
throw DB::Exception("All connection tries failed. Log: \n\n" << fail_messages.rdbuf() << "\n",
throw DB::Exception("All connection tries failed. Log: \n\n" + fail_messages.str() + "\n",
ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
}
@ -107,7 +107,4 @@ private:
};
typedef SharedPtr<ConnectionPool> ConnectionPoolPtr;
typedef std::vector<ConnectionPoolPtr> ConnectionPools;
}

View File

@ -141,6 +141,9 @@ namespace ErrorCodes
PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS,
ZERO_ARRAY_OR_TUPLE_INDEX,
ALL_CONNECTION_TRIES_FAILED,
UNKNOWN_ELEMENT_IN_CONFIG,
EXCESSIVE_ELEMENT_IN_CONFIG,
NO_ELEMENTS_IN_CONFIG,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -31,6 +31,8 @@ struct Settings
size_t poll_interval;
/// Максимальное количество соединений с одним удалённым сервером в пуле.
size_t distributed_connections_pool_size;
/// Максимальное количество попыток соединения с репликами.
size_t connections_with_failover_max_tries;
Settings() :
max_block_size(DEFAULT_BLOCK_SIZE),
@ -43,7 +45,8 @@ struct Settings
receive_timeout(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
send_timeout(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
poll_interval(DBMS_DEFAULT_POLL_INTERVAL),
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE)
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES)
{
}
@ -61,6 +64,7 @@ struct Settings
else if (name == "poll_interval") poll_interval = boost::get<UInt64>(value);
else if (name == "max_distributed_connections") max_distributed_connections = boost::get<UInt64>(value);
else if (name == "distributed_connections_pool_size") distributed_connections_pool_size = boost::get<UInt64>(value);
else if (name == "connections_with_failover_max_tries") connections_with_failover_max_tries = boost::get<UInt64>(value);
else
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}

View File

@ -2,6 +2,7 @@
#include <DB/Storages/IStorage.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ConnectionPoolWithFailover.h>
#include <DB/Interpreters/Settings.h>
@ -17,12 +18,26 @@ namespace DB
class StorageDistributed : public IStorage
{
public:
/// Массив шардов. Каждый шард - адреса одного сервера.
typedef std::vector<Poco::Net::SocketAddress> Addresses;
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
typedef std::vector<Addresses> AddressesWithFailover;
StorageDistributed(
const std::string & name_, /// Имя таблицы.
NamesAndTypesListPtr columns_, /// Список столбцов.
const Addresses & addresses_, /// Адреса удалённых серверов.
const Addresses & addresses, /// Адреса удалённых серверов.
const String & remote_database_, /// БД на удалённых серверах.
const String & remote_table_, /// Имя таблицы на удалённых серверах.
const DataTypeFactory & data_type_factory_,
const Settings & settings);
/// Использовать реплики для отказоустойчивости.
StorageDistributed(
const std::string & name_, /// Имя таблицы.
NamesAndTypesListPtr columns_, /// Список столбцов.
const AddressesWithFailover & addresses, /// Адреса удалённых серверов с учётом реплик.
const String & remote_database_, /// БД на удалённых серверах.
const String & remote_table_, /// Имя таблицы на удалённых серверах.
const DataTypeFactory & data_type_factory_,
@ -48,7 +63,6 @@ public:
private:
String name;
NamesAndTypesListPtr columns;
Addresses addresses;
String remote_database;
String remote_table;
const DataTypeFactory & data_type_factory;

View File

@ -88,6 +88,8 @@ int Server::main(const std::vector<std::string> & args)
settings.max_distributed_connections = config.getInt("max_distributed_connections", settings.max_distributed_connections);
settings.distributed_connections_pool_size =
config.getInt("distributed_connections_pool_size", settings.distributed_connections_pool_size);
settings.connections_with_failover_max_tries =
config.getInt("connections_with_failover_max_tries", settings.connections_with_failover_max_tries);
global_context.setSettings(settings);

View File

@ -11,12 +11,12 @@ namespace DB
StorageDistributed::StorageDistributed(
const std::string & name_,
NamesAndTypesListPtr columns_,
const StorageDistributed::Addresses & addresses_,
const StorageDistributed::Addresses & addresses,
const String & remote_database_,
const String & remote_table_,
const DataTypeFactory & data_type_factory_,
const Settings & settings)
: name(name_), columns(columns_), addresses(addresses_),
: name(name_), columns(columns_),
remote_database(remote_database_), remote_table(remote_table_),
data_type_factory(data_type_factory_)
{
@ -27,6 +27,33 @@ StorageDistributed::StorageDistributed(
settings.connect_timeout, settings.receive_timeout, settings.send_timeout));
}
StorageDistributed::StorageDistributed(
const std::string & name_,
NamesAndTypesListPtr columns_,
const StorageDistributed::AddressesWithFailover & addresses,
const String & remote_database_,
const String & remote_table_,
const DataTypeFactory & data_type_factory_,
const Settings & settings)
: name(name_), columns(columns_),
remote_database(remote_database_), remote_table(remote_table_),
data_type_factory(data_type_factory_)
{
for (AddressesWithFailover::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
{
ConnectionPools replicas;
replicas.reserve(it->size());
for (Addresses::const_iterator jt = it->begin(); jt != it->end(); ++jt)
replicas.push_back(new ConnectionPool(
settings.distributed_connections_pool_size,
jt->host().toString(), jt->port(), "", data_type_factory, "server", Protocol::Compression::Enable,
settings.connect_timeout, settings.receive_timeout, settings.send_timeout));
pools.push_back(new ConnectionPoolWithFailover(replicas, settings.connections_with_failover_max_tries));
}
}
BlockInputStreams StorageDistributed::read(
const Names & column_names,

View File

@ -89,7 +89,22 @@ StoragePtr StorageFactory::get(
String remote_database = dynamic_cast<ASTIdentifier &>(*args[1]).name;
String remote_table = dynamic_cast<ASTIdentifier &>(*args[2]).name;
/** В конфиге адреса либо находятся в узлах <node>:
* <node>
* <host>example01-01-1</host>
* <port>9000</port>
* </node>
* ...
* либо в узлах <shard>, и внутри - <replica>
* <shard>
* <replica>
* <host>example01-01-1</host>
* <port>9000</port>
* </replica>
* </shard>
*/
StorageDistributed::Addresses addresses;
StorageDistributed::AddressesWithFailover addresses_with_failover;
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
@ -98,12 +113,46 @@ StoragePtr StorageFactory::get(
String config_prefix = "remote_servers." + config_name + ".";
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
{
if (0 == strncmp(it->c_str(), "node", strlen("node")))
{
addresses.push_back(Poco::Net::SocketAddress(
config.getString(config_prefix + *it + ".host"),
config.getInt(config_prefix + *it + ".port")));
}
else if (0 == strncmp(it->c_str(), "shard", strlen("shard")))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
config.keys(config_prefix + *it, replica_keys);
addresses_with_failover.push_back(StorageDistributed::Addresses());
StorageDistributed::Addresses & replica_addresses = addresses_with_failover.back();
for (Poco::Util::AbstractConfiguration::Keys::const_iterator jt = replica_keys.begin(); jt != replica_keys.end(); ++jt)
{
if (0 == strncmp(jt->c_str(), "replica", strlen("replica")))
replica_addresses.push_back(Poco::Net::SocketAddress(
config.getString(config_prefix + *it + "." + *jt + ".host"),
config.getInt(config_prefix + *it + "." + *jt + ".port")));
else
throw Exception("Unknown element in config: " + *jt, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
else
throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty())
return new StorageDistributed(table_name, columns, addresses_with_failover, remote_database, remote_table,
context.getDataTypeFactory(), context.getSettings());
else if (!addresses.empty())
return new StorageDistributed(table_name, columns, addresses, remote_database, remote_table,
context.getDataTypeFactory(), context.getSettings());
else
throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
else if (name == "MergeTree")
{