dbms: Server: merged branch METR-16213 into master

This commit is contained in:
Alexey Arno 2015-10-20 17:59:29 +03:00
parent 3d11e0aa75
commit 2660fbaa21
7 changed files with 250 additions and 191 deletions

View File

@ -297,6 +297,8 @@ namespace ErrorCodes
MONGODB_INIT_FAILED = 293, MONGODB_INIT_FAILED = 293,
INVALID_BLOCK_EXTRA_INFO = 294, INVALID_BLOCK_EXTRA_INFO = 294,
RECEIVED_EMPTY_DATA = 295, RECEIVED_EMPTY_DATA = 295,
NO_REMOTE_SHARD_FOUND = 296,
SHARD_HAS_NO_CONNECTIONS = 297,
KEEPER_EXCEPTION = 999, KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,

View File

@ -13,7 +13,7 @@ namespace DB
/// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую. /// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую.
/// Поэтому храним только количество локальных узлов /// Поэтому храним только количество локальных узлов
/// В конфиге кластер включает в себя узлы <node> или <shard> /// В конфиге кластер включает в себя узлы <node> или <shard>
class Cluster : private boost::noncopyable class Cluster
{ {
public: public:
Cluster(const Settings & settings, const String & cluster_name); Cluster(const Settings & settings, const String & cluster_name);
@ -22,28 +22,13 @@ public:
Cluster(const Settings & settings, std::vector<std::vector<String>> names, Cluster(const Settings & settings, std::vector<std::vector<String>> names,
const String & username, const String & password); const String & username, const String & password);
/// количество узлов clickhouse сервера, расположенных локально Cluster(const Cluster &) = delete;
/// к локальным узлам обращаемся напрямую Cluster & operator=(const Cluster &) = delete;
size_t getLocalNodesNum() const { return local_nodes_num; }
/// используеться для выставления ограничения на размер таймаута /// используеться для выставления ограничения на размер таймаута
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
public: public:
/// Соединения с удалёнными серверами.
ConnectionPools pools;
struct ShardInfo
{
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
UInt32 shard_num;
int weight;
size_t num_local_nodes;
};
std::vector<ShardInfo> shard_info_vec;
std::vector<size_t> slot_to_shard;
struct Address struct Address
{ {
/** В конфиге адреса либо находятся в узлах <node>: /** В конфиге адреса либо находятся в узлах <node>:
@ -73,26 +58,59 @@ public:
Address(const String & host_port_, const String & user_, const String & password_); Address(const String & host_port_, const String & user_, const String & password_);
}; };
private: using Addresses = std::vector<Address>;
static bool isLocal(const Address & address); using AddressesWithFailover = std::vector<Addresses>;
struct ShardInfo
{
public:
bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return !pool.isNull(); }
size_t getLocalNodeCount() const { return local_addresses.size(); }
public:
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
UInt32 shard_num;
int weight;
Addresses local_addresses;
mutable ConnectionPoolPtr pool;
};
using ShardsInfo = std::vector<ShardInfo>;
public: public:
/// Массив шардов. Каждый шард - адреса одного сервера. const ShardsInfo & getShardsInfo() const { return shards_info; }
typedef std::vector<Address> Addresses; const Addresses & getShardsAddresses() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; }
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными). const ShardInfo * getAnyRemoteShardInfo() const { return any_remote_shard_info; }
typedef std::vector<Addresses> AddressesWithFailover;
const Addresses & getShardsInfo() const { return addresses; } /// Количество удалённых шардов.
const AddressesWithFailover & getShardsWithFailoverInfo() const { return addresses_with_failover; } size_t getRemoteShardCount() const { return remote_shard_count; }
const Addresses & getLocalShardsInfo() const { return local_addresses; }
/// Количество узлов clickhouse сервера, расположенных локально
/// к локальным узлам обращаемся напрямую.
size_t getLocalShardCount() const { return local_shard_count; }
public:
std::vector<size_t> slot_to_shard;
private: private:
Addresses addresses; void initMisc();
AddressesWithFailover addresses_with_failover;
Addresses local_addresses;
size_t local_nodes_num = 0; private:
/// Описание шардов кластера.
ShardsInfo shards_info;
/// Любой удалённый шард.
ShardInfo * any_remote_shard_info = nullptr;
/// Массив шардов. Каждый шард - адреса одного сервера.
Addresses addresses;
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
AddressesWithFailover addresses_with_failover;
size_t remote_shard_count = 0;
size_t local_shard_count = 0;
}; };
struct Clusters struct Clusters

View File

@ -39,7 +39,7 @@ public:
void write(const Block & block) override void write(const Block & block) override
{ {
if (storage.getShardingKeyExpr() && storage.cluster.shard_info_vec.size() > 1) if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1))
return writeSplit(block); return writeSplit(block);
writeImpl(block); writeImpl(block);
@ -50,7 +50,7 @@ private:
static std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) static std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster)
{ {
const auto total_weight = cluster.slot_to_shard.size(); const auto total_weight = cluster.slot_to_shard.size();
const auto num_shards = cluster.shard_info_vec.size(); const auto num_shards = cluster.getShardsInfo().size();
std::vector<IColumn::Filter> filters(num_shards); std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
@ -123,7 +123,7 @@ private:
auto filters = createFilters(block); auto filters = createFilters(block);
const auto num_shards = storage.cluster.shard_info_vec.size(); const auto num_shards = storage.cluster.getShardsInfo().size();
for (size_t i = 0; i < num_shards; ++i) for (size_t i = 0; i < num_shards; ++i)
{ {
auto target_block = block.cloneEmpty(); auto target_block = block.cloneEmpty();
@ -138,9 +138,9 @@ private:
void writeImpl(const Block & block, const size_t shard_id = 0) void writeImpl(const Block & block, const size_t shard_id = 0)
{ {
const auto & shard_info = storage.cluster.shard_info_vec[shard_id]; const auto & shard_info = storage.cluster.getShardsInfo()[shard_id];
if (shard_info.num_local_nodes) if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.num_local_nodes); writeToLocal(block, shard_info.getLocalNodeCount());
/// dir_names is empty if shard has only local addresses /// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty()) if (!shard_info.dir_names.empty())

View File

@ -132,10 +132,15 @@ private:
Settings settings = context.getSettings(); Settings settings = context.getSettings();
NamesAndTypesList res; NamesAndTypesList res;
/// Отправляем на первый попавшийся шард /// Отправляем на первый попавшийся удалённый шард.
const auto shard_info = cluster.getAnyRemoteShardInfo();
if (shard_info == nullptr)
throw Exception("No remote shard found", ErrorCodes::NO_REMOTE_SHARD_FOUND);
ConnectionPoolPtr pool = shard_info->pool;
BlockInputStreamPtr input{ BlockInputStreamPtr input{
new RemoteBlockInputStream{ new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings, nullptr, pool.get(), query, &settings, nullptr,
Tables(), QueryProcessingStage::Complete, context} Tables(), QueryProcessingStage::Complete, context}
}; };
input->readPrefix(); input->readPrefix();

View File

@ -8,34 +8,67 @@
namespace DB namespace DB
{ {
namespace
{
/// Вес шарда по-умолчанию.
static constexpr int default_weight = 1;
inline bool isLocal(const Cluster::Address & address)
{
/// Если среди реплик существует такая, что:
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
return isLocalAddress(address.resolved_address);
}
inline std::string addressToDirName(const Cluster::Address & address)
{
return
escapeForFileName(address.user) +
(address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' +
escapeForFileName(address.resolved_address.host().toString()) + ':' +
std::to_string(address.resolved_address.port());
}
inline bool beginsWith(const std::string & str1, const char * str2)
{
if (str2 == nullptr)
throw Exception("Passed null pointer to function beginsWith", ErrorCodes::LOGICAL_ERROR);
return 0 == strncmp(str1.data(), str2, strlen(str2));
}
/// Для кэширования DNS запросов. /// Для кэширования DNS запросов.
static Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port) Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
{ {
return Poco::Net::SocketAddress(host, port); return Poco::Net::SocketAddress(host, port);
} }
static Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port) Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port)
{ {
return Poco::Net::SocketAddress(host_and_port); return Poco::Net::SocketAddress(host_and_port);
} }
static Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port) Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
{ {
static SimpleCache<decltype(resolveSocketAddressImpl1), &resolveSocketAddressImpl1> cache; static SimpleCache<decltype(resolveSocketAddressImpl1), &resolveSocketAddressImpl1> cache;
return cache(host, port); return cache(host, port);
} }
static Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port) Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)
{ {
static SimpleCache<decltype(resolveSocketAddressImpl2), &resolveSocketAddressImpl2> cache; static SimpleCache<decltype(resolveSocketAddressImpl2), &resolveSocketAddressImpl2> cache;
return cache(host_and_port); return cache(host_and_port);
} }
}
/// Реализация класса Cluster::Address
Cluster::Address::Address(const String & config_prefix) Cluster::Address::Address(const String & config_prefix)
{ {
auto & config = Poco::Util::Application::instance().config(); const auto & config = Poco::Util::Application::instance().config();
host_name = config.getString(config_prefix + ".host"); host_name = config.getString(config_prefix + ".host");
port = config.getInt(config_prefix + ".port"); port = config.getInt(config_prefix + ".port");
@ -51,7 +84,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]). /// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
if (nullptr != strchr(host_port_.c_str(), ':') || !default_port) if ((nullptr != strchr(host_port_.c_str(), ':')) || !default_port)
{ {
resolved_address = resolveSocketAddress(host_port_); resolved_address = resolveSocketAddress(host_port_);
host_name = host_port_.substr(0, host_port_.find(':')); host_name = host_port_.substr(0, host_port_.find(':'));
@ -65,19 +98,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
} }
} }
/// Реализация класса Clusters
namespace
{
inline std::string addressToDirName(const Cluster::Address & address)
{
return
escapeForFileName(address.user) +
(address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' +
escapeForFileName(address.resolved_address.host().toString()) + ':' +
std::to_string(address.resolved_address.port());
}
}
Clusters::Clusters(const Settings & settings, const String & config_name) Clusters::Clusters(const Settings & settings, const String & config_name)
{ {
@ -85,17 +106,16 @@ Clusters::Clusters(const Settings & settings, const String & config_name)
Poco::Util::AbstractConfiguration::Keys config_keys; Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys); config.keys(config_name, config_keys);
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it) for (const auto & key : config_keys)
impl.emplace(std::piecewise_construct, impl.emplace(std::piecewise_construct,
std::forward_as_tuple(*it), std::forward_as_tuple(key),
std::forward_as_tuple(settings, config_name + "." + *it)); std::forward_as_tuple(settings, config_name + "." + key));
} }
/// Реализация класса Cluster
Cluster::Cluster(const Settings & settings, const String & cluster_name) Cluster::Cluster(const Settings & settings, const String & cluster_name)
{ {
/// Создать кластер.
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys; Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys); config.keys(cluster_name, config_keys);
@ -104,35 +124,56 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
UInt32 current_shard_num = 1; UInt32 current_shard_num = 1;
for (auto it = config_keys.begin(); it != config_keys.end(); ++it) for (const auto & key : config_keys)
{ {
if (0 == strncmp(it->c_str(), "node", strlen("node"))) if (beginsWith(key, "node"))
{ {
const auto & prefix = config_prefix + *it; /// Шард без реплик.
const auto weight = config.getInt(prefix + ".weight", 1);
const auto & prefix = config_prefix + key;
const auto weight = config.getInt(prefix + ".weight", default_weight);
if (weight == 0) if (weight == 0)
continue; continue;
addresses.emplace_back(prefix); addresses.emplace_back(prefix);
addresses.back().replica_num = 1; addresses.back().replica_num = 1;
const auto & address = addresses.back();
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); ShardInfo info;
if (const auto is_local = isLocal(addresses.back())) info.shard_num = current_shard_num;
shard_info_vec.push_back({{}, current_shard_num, weight, is_local}); info.weight = weight;
if (isLocal(address))
info.local_addresses.push_back(address);
else else
shard_info_vec.push_back({{addressToDirName(addresses.back())}, current_shard_num, weight, is_local}); {
info.dir_names.push_back(addressToDirName(address));
info.pool = new ConnectionPool(
settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address,
"", address.user, address.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time));
}
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back(info);
} }
else if (0 == strncmp(it->c_str(), "shard", strlen("shard"))) else if (beginsWith(key, "shard"))
{ {
/// Шард с репликами.
Poco::Util::AbstractConfiguration::Keys replica_keys; Poco::Util::AbstractConfiguration::Keys replica_keys;
config.keys(config_prefix + *it, replica_keys); config.keys(config_prefix + key, replica_keys);
addresses_with_failover.emplace_back(); addresses_with_failover.emplace_back();
Addresses & replica_addresses = addresses_with_failover.back(); Addresses & replica_addresses = addresses_with_failover.back();
UInt32 current_replica_num = 1; UInt32 current_replica_num = 1;
const auto & partial_prefix = config_prefix + *it + "."; const auto & partial_prefix = config_prefix + key + ".";
const auto weight = config.getInt(partial_prefix + ".weight", 1); const auto weight = config.getInt(partial_prefix + ".weight", default_weight);
if (weight == 0) if (weight == 0)
continue; continue;
@ -142,26 +183,20 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
* the first element of vector; otherwise we will just .emplace_back * the first element of vector; otherwise we will just .emplace_back
*/ */
std::vector<std::string> dir_names{}; std::vector<std::string> dir_names{};
size_t num_local_nodes = 0;
auto first = true; auto first = true;
for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt) for (const auto & replica_key : replica_keys)
{ {
if (0 == strncmp(jt->data(), "weight", strlen("weight")) || if (beginsWith(replica_key, "weight") || beginsWith(replica_key, "internal_replication"))
0 == strncmp(jt->data(), "internal_replication", strlen("internal_replication")))
continue; continue;
if (0 == strncmp(jt->c_str(), "replica", strlen("replica"))) if (beginsWith(replica_key, "replica"))
{ {
replica_addresses.emplace_back(partial_prefix + *jt); replica_addresses.emplace_back(partial_prefix + replica_key);
replica_addresses.back().replica_num = current_replica_num; replica_addresses.back().replica_num = current_replica_num;
++current_replica_num; ++current_replica_num;
if (isLocal(replica_addresses.back())) if (!isLocal(replica_addresses.back()))
{
++num_local_nodes;
}
else
{ {
if (internal_replication) if (internal_replication)
{ {
@ -178,40 +213,18 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
} }
} }
else else
throw Exception("Unknown element in config: " + *jt, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); throw Exception("Unknown element in config: " + replica_key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
} }
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); Addresses shard_local_addresses;
shard_info_vec.push_back({std::move(dir_names), current_shard_num, weight, num_local_nodes});
}
else
throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
++current_shard_num;
}
/// Создать соответствующие пулы соединений.
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty())
{
for (const auto & shard : addresses_with_failover)
{
ConnectionPools replicas; ConnectionPools replicas;
replicas.reserve(shard.size()); replicas.reserve(replica_addresses.size());
bool has_local_replica = false; for (const auto & replica : replica_addresses)
for (const auto & replica : shard)
{ {
if (isLocal(replica)) if (isLocal(replica))
{ shard_local_addresses.push_back(replica);
has_local_replica = true;
local_addresses.push_back(replica);
break;
}
else else
{ {
replicas.emplace_back(new ConnectionPool( replicas.emplace_back(new ConnectionPool(
@ -225,42 +238,31 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
} }
} }
if (has_local_replica) ConnectionPoolPtr shard_pool;
++local_nodes_num; if (!replicas.empty())
else shard_pool = new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool});
} }
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
++current_shard_num;
} }
else if (!addresses.empty())
{ initMisc();
for (const auto & address : addresses)
{
if (isLocal(address))
{
local_addresses.push_back(address);
++local_nodes_num;
}
else
{
pools.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address,
"", address.user, address.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
}
}
}
else
throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
} }
Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> names, Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> names,
const String & username, const String & password) const String & username, const String & password)
{ {
UInt32 current_shard_num = 1;
for (const auto & shard : names) for (const auto & shard : names)
{ {
Addresses current; Addresses current;
@ -284,8 +286,14 @@ Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> nam
saturate(settings.send_timeout, settings.limits.max_execution_time))); saturate(settings.send_timeout, settings.limits.max_execution_time)));
} }
pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries)); ConnectionPoolPtr shard_pool = new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({{}, current_shard_num, default_weight, {}, shard_pool});
++current_shard_num;
} }
initMisc();
} }
@ -294,17 +302,35 @@ Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan
if (limit.totalMicroseconds() == 0) if (limit.totalMicroseconds() == 0)
return v; return v;
else else
return v > limit ? limit : v; return (v > limit) ? limit : v;
} }
bool Cluster::isLocal(const Address & address) void Cluster::initMisc()
{ {
/// Если среди реплик существует такая, что: for (const auto & shard_info : shards_info)
/// - её порт совпадает с портом, который слушает сервер; {
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера if (!shard_info.isLocal() && !shard_info.hasRemoteConnections())
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия throw Exception("Found shard without any specified connection",
return isLocalAddress(address.resolved_address); ErrorCodes::SHARD_HAS_NO_CONNECTIONS);
}
for (const auto & shard_info : shards_info)
{
if (shard_info.isLocal())
++local_shard_count;
else
++remote_shard_count;
}
for (auto & shard_info : shards_info)
{
if (!shard_info.isLocal())
{
any_remote_shard_info = &shard_info;
break;
}
}
} }
} }

View File

@ -80,7 +80,7 @@ StorageDistributed::StorageDistributed(
context(context_), cluster(cluster_), context(context_), cluster(cluster_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr), sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}), sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
write_enabled(!data_path_.empty() && (cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)), write_enabled(!data_path_.empty() && (((cluster.getLocalShardCount() + cluster.getRemoteShardCount()) < 2) || sharding_key_)),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/')) path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{ {
createDirectoryMonitors(); createDirectoryMonitors();
@ -104,7 +104,7 @@ StorageDistributed::StorageDistributed(
context(context_), cluster(cluster_), context(context_), cluster(cluster_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr), sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}), sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
write_enabled(!data_path_.empty() && (cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)), write_enabled(!data_path_.empty() && (((cluster.getLocalShardCount() + cluster.getRemoteShardCount()) < 2) || sharding_key_)),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/')) path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{ {
createDirectoryMonitors(); createDirectoryMonitors();
@ -168,7 +168,7 @@ BlockInputStreams StorageDistributed::read(
/// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом. /// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом.
new_settings.max_concurrent_queries_for_user = 0; new_settings.max_concurrent_queries_for_user = 0;
size_t result_size = (cluster.pools.size() * settings.max_parallel_replicas) + cluster.getLocalNodesNum(); size_t result_size = (cluster.getRemoteShardCount() * settings.max_parallel_replicas) + cluster.getLocalShardCount();
processed_stage = result_size == 1 || settings.distributed_group_by_no_merge processed_stage = result_size == 1 || settings.distributed_group_by_no_merge
? QueryProcessingStage::Complete ? QueryProcessingStage::Complete
@ -193,26 +193,31 @@ BlockInputStreams StorageDistributed::read(
external_tables = context.getExternalTables(); external_tables = context.getExternalTables();
/// Цикл по шардам. /// Цикл по шардам.
for (auto & conn_pool : cluster.pools) for (const auto & shard_info : cluster.getShardsInfo())
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)
{ {
DB::Context new_context = context; if (shard_info.isLocal())
new_context.setSettings(new_settings);
for (size_t i = 0; i < cluster.getLocalNodesNum(); ++i)
{ {
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage); /// Добавляем запросы к локальному ClickHouse.
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными. DB::Context new_context = context;
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов, new_context.setSettings(new_settings);
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/ for (const auto & address : shard_info.local_addresses)
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in)); {
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage);
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in));
}
}
else
{
res.emplace_back(new RemoteBlockInputStream{
shard_info.pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
} }
} }
@ -274,26 +279,29 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
BlockInputStreams res; BlockInputStreams res;
/// Цикл по шардам. /// Цикл по шардам.
for (auto & conn_pool : cluster.pools) for (const auto & shard_info : cluster.getShardsInfo())
{ {
auto stream = new RemoteBlockInputStream{conn_pool, query, &new_settings, throttler}; if (shard_info.isLocal())
stream->reachAllReplicas();
stream->appendExtraInfo();
res.emplace_back(stream);
}
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)
{
DB::Context new_context = context;
new_context.setSettings(new_settings);
const auto & local_addresses = cluster.getLocalShardsInfo();
for (const auto & address : local_addresses)
{ {
InterpreterDescribeQuery interpreter(ast, new_context); /// Добавляем запросы к локальному ClickHouse.
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address)); DB::Context new_context = context;
new_context.setSettings(new_settings);
for (const auto & address : shard_info.local_addresses)
{
InterpreterDescribeQuery interpreter(ast, new_context);
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address));
res.emplace_back(stream);
}
}
if (shard_info.hasRemoteConnections())
{
auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler};
stream->reachAllReplicas();
stream->appendExtraInfo();
res.emplace_back(stream); res.emplace_back(stream);
} }
} }
@ -340,7 +348,7 @@ void StorageDistributed::requireDirectoryMonitor(const std::string & name)
size_t StorageDistributed::getShardCount() const size_t StorageDistributed::getShardCount() const
{ {
return cluster.pools.size(); return cluster.getRemoteShardCount();
} }
} }

View File

@ -72,9 +72,9 @@ BlockInputStreams StorageSystemClusters::read(
{ {
const std::string cluster_name = entry.first; const std::string cluster_name = entry.first;
const Cluster & cluster = entry.second; const Cluster & cluster = entry.second;
const auto & addresses = cluster.getShardsInfo(); const auto & addresses = cluster.getShardsAddresses();
const auto & addresses_with_failover = cluster.getShardsWithFailoverInfo(); const auto & addresses_with_failover = cluster.getShardsWithFailoverAddresses();
const auto & shards_info = cluster.shard_info_vec; const auto & shards_info = cluster.getShardsInfo();
if (!addresses.empty()) if (!addresses.empty())
{ {