dbms: added setting 'skip_unavailable_shards' [#METR-17059].

This commit is contained in:
Alexey Milovidov 2015-06-28 09:38:18 +03:00
parent cdf90e9510
commit 4cebee36ea
9 changed files with 27 additions and 18 deletions

View File

@ -26,13 +26,13 @@ class IConnectionPool : private boost::noncopyable
{
public:
typedef PoolBase<Connection>::Entry Entry;
virtual Entry get(Settings * settings = nullptr) = 0;
virtual Entry get(const Settings * settings = nullptr) = 0;
/** Выделяет до указанного количества соединений для работы.
* Соединения предоставляют доступ к разным репликам одного шарда.
* Выкидывает исключение, если не удалось выделить ни одного соединения.
*/
virtual std::vector<Entry> getMany(Settings * settings = nullptr)
virtual std::vector<Entry> getMany(const Settings * settings = nullptr)
{
return std::vector<Entry>{ get(settings) };
}
@ -89,7 +89,7 @@ public:
/** Выделяет соединение для работы. */
Entry get(Settings * settings = nullptr) override
Entry get(const Settings * settings = nullptr) override
{
if (settings)
return Base::get(settings->queue_max_wait_ms.totalMilliseconds());

View File

@ -21,11 +21,11 @@ namespace DB
*
* Замечание: если один из вложенных пулов заблокируется из-за переполнения, то этот пул тоже заблокируется.
*/
class ConnectionPoolWithFailover : public PoolWithFailoverBase<IConnectionPool, Settings *>, public IConnectionPool
class ConnectionPoolWithFailover : public PoolWithFailoverBase<IConnectionPool>, public IConnectionPool
{
public:
typedef IConnectionPool::Entry Entry;
typedef PoolWithFailoverBase<IConnectionPool, Settings *> Base;
typedef PoolWithFailoverBase<IConnectionPool> Base;
ConnectionPoolWithFailover(ConnectionPools & nested_pools_,
LoadBalancing load_balancing,
@ -52,7 +52,7 @@ public:
}
/** Выделяет соединение для работы. */
Entry get(Settings * settings = nullptr) override
Entry get(const Settings * settings = nullptr) override
{
applyLoadBalancing(settings);
return Base::get(settings);
@ -61,14 +61,14 @@ public:
/** Выделяет до указанного количества соединений для работы.
* Соединения предоставляют доступ к разным репликам одного шарда.
*/
std::vector<Entry> getMany(Settings * settings = nullptr) override
std::vector<Entry> getMany(const Settings * settings = nullptr) override
{
applyLoadBalancing(settings);
return Base::getMany(settings);
}
protected:
bool tryGet(ConnectionPoolPtr pool, Settings * settings, Entry & out_entry, std::stringstream & fail_message) override
bool tryGet(ConnectionPoolPtr pool, const Settings * settings, Entry & out_entry, std::stringstream & fail_message) override
{
try
{
@ -90,7 +90,7 @@ private:
std::vector<size_t> hostname_differences; /// Расстояния от имени этого хоста до имен хостов пулов.
LoadBalancing default_load_balancing;
void applyLoadBalancing(Settings * settings)
void applyLoadBalancing(const Settings * settings)
{
LoadBalancing load_balancing = default_load_balancing;
if (settings)

View File

@ -21,17 +21,17 @@ class ParallelReplicas final : private boost::noncopyable
{
public:
/// Принимает готовое соединение.
ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_);
ParallelReplicas(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_);
ParallelReplicas(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_);
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
@ -82,7 +82,7 @@ private:
void invalidateReplica(ReplicaMap::iterator it);
private:
Settings * settings;
const Settings * settings;
ReplicaMap replica_map;
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.

View File

@ -136,6 +136,9 @@ protected:
{
createParallelReplicas();
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
return Block();
established = true;
parallel_replicas->sendQuery(query, "", stage, true);

View File

@ -15,7 +15,7 @@ namespace DB
class RemoteBlockOutputStream : public IBlockOutputStream
{
public:
RemoteBlockOutputStream(Connection & connection_, const String & query_, Settings * settings_ = nullptr)
RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr)
: connection(connection_), query(query_), settings(settings_)
{
}
@ -105,7 +105,7 @@ public:
private:
Connection & connection;
String query;
Settings * settings;
const Settings * settings;
Block sample_block;
};

View File

@ -97,6 +97,9 @@ struct Settings
M(SettingUInt64, parallel_replicas_count, 0) \
M(SettingUInt64, parallel_replica_offset, 0) \
\
/** Тихо пропускать недоступные шарды. */ \
M(SettingBool, skip_unavailable_shards, false) \
\
/** Тонкие настройки для чтения из MergeTree */ \
\
/** Если из одного файла читается хотя бы столько строк, чтение можно распараллелить. */ \

View File

@ -3,7 +3,7 @@
namespace DB
{
ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_)
ParallelReplicas::ParallelReplicas(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_),
active_replica_count(1),
supports_parallel_execution(false)
@ -11,7 +11,7 @@ ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings
registerReplica(connection_);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_)
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_)
{
if (pool_ == nullptr)
@ -37,7 +37,8 @@ ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_
supports_parallel_execution = false;
pool_entry = pool_->get(settings);
registerReplica(&*pool_entry);
if (!pool_entry.isNull())
registerReplica(&*pool_entry);
}
}

View File

@ -0,0 +1 @@
SELECT count() FROM remote('{127,1}.0.0.{1,2}', system.one) SETTINGS skip_unavailable_shards = 1;