add timeouts to remote streams

This commit is contained in:
Konstantin Podshumok 2019-03-08 02:11:41 +03:00
parent a3d2310d6f
commit f993ea6127
5 changed files with 29 additions and 13 deletions

View File

@ -7,6 +7,8 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/IStorage.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -61,17 +63,17 @@ RemoteBlockInputStream::RemoteBlockInputStream(
create_multiplexed_connections = [this, pool, throttler]()
{
const Settings & current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
std::vector<IConnectionPool::Entry> connections;
if (main_table)
{
auto try_results = pool->getManyChecked(&current_settings, pool_mode, *main_table);
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, *main_table);
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
}
else
connections = pool->getMany(&current_settings, pool_mode);
connections = pool->getMany(timeouts, &current_settings, pool_mode);
return std::make_unique<MultiplexedConnections>(
std::move(connections), current_settings, throttler);
@ -283,12 +285,14 @@ void RemoteBlockInputStream::sendQuery()
{
multiplexed_connections = create_multiplexed_connections();
if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size())
const auto& settings = context.getSettingsRef();
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;
established = true;
multiplexed_connections->sendQuery(query, "", stage, &context.getClientInfo(), true);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
multiplexed_connections->sendQuery(timeouts, query, "", stage, &context.getClientInfo(), true);
established = false;
sent_query = true;

View File

@ -96,6 +96,7 @@ private:
const String query;
Context context;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
@ -118,7 +119,7 @@ private:
*/
std::atomic<bool> finished { false };
/** Cancel query request was sent to all replicas beacuse data is not needed anymore
/** Cancel query request was sent to all replicas because data is not needed anymore
* This behaviour may occur when:
* - data size is already satisfactory (when using LIMIT, for example)
* - an exception was thrown from client side

View File

@ -6,6 +6,7 @@
#include <Common/NetException.h>
#include <Common/CurrentThread.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -18,13 +19,16 @@ namespace ErrorCodes
}
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_)
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
{
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr);
while (true)
{

View File

@ -3,6 +3,7 @@
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/Throttler.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
@ -18,7 +19,10 @@ struct Settings;
class RemoteBlockOutputStream : public IBlockOutputStream
{
public:
RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr);
RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_ = nullptr);
Block getHeader() const override { return header; }

View File

@ -43,10 +43,13 @@ private:
public:
using ExceptionCallback = std::function<void()>;
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()) :
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
UnionBlockInputStream(
BlockInputStreams inputs,
BlockInputStreamPtr additional_input_at_end,
size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()
) :
output_queue(std::min(inputs.size(), max_threads)), handler(*this),
processor(inputs, additional_input_at_end, max_threads, handler),
exception_callback(exception_callback_)
{