add timeouts to multiplexed connections

This commit is contained in:
Konstantin Podshumok 2019-03-02 02:04:33 +03:00
parent eb31345224
commit a3d2310d6f
2 changed files with 10 additions and 4 deletions

View File

@ -1,4 +1,5 @@
#include <Client/MultiplexedConnections.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -73,6 +74,7 @@ void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesDa
}
void MultiplexedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id,
UInt64 stage,
@ -91,7 +93,7 @@ void MultiplexedConnections::sendQuery(
if (!replica.connection)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
/// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0;
@ -107,13 +109,15 @@ void MultiplexedConnections::sendQuery(
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, client_info, with_pending_data);
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
replica_states[0].connection->sendQuery(timeouts, query, query_id, stage,
&modified_settings, client_info, with_pending_data);
}
sent_query = true;

View File

@ -1,9 +1,10 @@
#pragma once
#include <mutex>
#include <Common/Throttler.h>
#include <Client/Connection.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <mutex>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -31,6 +32,7 @@ public:
/// Send request to replicas.
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete,