2021-09-02 14:27:19 +00:00
|
|
|
#include <Common/ConcurrentBoundedQueue.h>
|
|
|
|
|
2021-10-15 20:18:20 +00:00
|
|
|
#include <QueryPipeline/ConnectionCollector.h>
|
|
|
|
#include <QueryPipeline/RemoteQueryExecutor.h>
|
|
|
|
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
|
2020-06-02 15:59:57 +00:00
|
|
|
|
2020-06-02 16:27:05 +00:00
|
|
|
#include <Columns/ColumnConst.h>
|
|
|
|
#include <Common/CurrentThread.h>
|
2021-08-30 11:04:59 +00:00
|
|
|
#include "Core/Protocol.h"
|
2021-12-09 10:39:28 +00:00
|
|
|
#include "IO/ReadHelpers.h"
|
2021-10-16 14:03:50 +00:00
|
|
|
#include <QueryPipeline/Pipe.h>
|
2020-06-02 16:27:05 +00:00
|
|
|
#include <Processors/Sources/SourceFromSingleChunk.h>
|
2021-11-25 15:41:50 +00:00
|
|
|
#include <Processors/Transforms/LimitsCheckingTransform.h>
|
2020-06-02 16:27:05 +00:00
|
|
|
#include <Storages/IStorage.h>
|
2020-09-10 19:55:36 +00:00
|
|
|
#include <Storages/SelectQueryInfo.h>
|
2020-06-02 16:27:05 +00:00
|
|
|
#include <Interpreters/castColumn.h>
|
|
|
|
#include <Interpreters/Cluster.h>
|
2020-12-10 22:05:02 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2020-06-02 16:27:05 +00:00
|
|
|
#include <Interpreters/InternalTextLogsQueue.h>
|
2020-12-10 22:05:02 +00:00
|
|
|
#include <IO/ConnectionTimeoutsContext.h>
|
2021-01-19 19:21:06 +00:00
|
|
|
#include <Client/MultiplexedConnections.h>
|
|
|
|
#include <Client/HedgedConnections.h>
|
2020-11-20 17:23:53 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
2021-12-09 10:39:28 +00:00
|
|
|
#include <IO/ReadBufferFromString.h>
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2021-08-31 23:47:52 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2021-08-31 23:47:52 +00:00
|
|
|
extern const Metric SyncDrainedConnections;
|
|
|
|
extern const Metric ActiveSyncDrainedConnections;
|
2021-07-14 13:17:30 +00:00
|
|
|
}
|
|
|
|
|
2020-06-02 16:27:05 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2021-04-12 17:07:01 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2020-06-02 16:27:05 +00:00
|
|
|
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
2020-11-20 17:23:53 +00:00
|
|
|
extern const int DUPLICATED_PART_UUIDS;
|
2021-10-12 21:15:05 +00:00
|
|
|
extern const int SYSTEM_ERROR;
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
2021-07-23 07:40:03 +00:00
|
|
|
RemoteQueryExecutor::RemoteQueryExecutor(
|
|
|
|
const String & query_, const Block & header_, ContextPtr context_,
|
|
|
|
const Scalars & scalars_, const Tables & external_tables_,
|
2021-12-09 10:39:28 +00:00
|
|
|
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
|
2021-07-23 07:40:03 +00:00
|
|
|
: header(header_), query(query_), context(context_), scalars(scalars_)
|
2021-12-09 10:39:28 +00:00
|
|
|
, external_tables(external_tables_), stage(stage_)
|
|
|
|
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
|
|
|
|
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
|
2021-07-23 07:40:03 +00:00
|
|
|
{}
|
|
|
|
|
2020-06-02 16:27:05 +00:00
|
|
|
RemoteQueryExecutor::RemoteQueryExecutor(
|
|
|
|
Connection & connection,
|
2021-04-08 14:22:19 +00:00
|
|
|
const String & query_, const Block & header_, ContextPtr context_,
|
|
|
|
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
|
2021-12-09 10:39:28 +00:00
|
|
|
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
|
|
|
|
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
create_connections = [this, &connection, throttler, extension_]()
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
auto res = std::make_shared<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
|
|
|
|
if (extension_ && extension_->replica_info)
|
|
|
|
res->setReplicaInfo(*extension_->replica_info);
|
|
|
|
return res;
|
2020-06-02 16:27:05 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2021-07-23 07:40:03 +00:00
|
|
|
RemoteQueryExecutor::RemoteQueryExecutor(
|
|
|
|
std::shared_ptr<Connection> connection_ptr,
|
|
|
|
const String & query_, const Block & header_, ContextPtr context_,
|
|
|
|
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
|
2021-12-09 10:39:28 +00:00
|
|
|
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
|
|
|
|
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
|
2021-07-23 07:40:03 +00:00
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
create_connections = [this, connection_ptr, throttler, extension_]()
|
2021-07-23 07:40:03 +00:00
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
auto res = std::make_shared<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
|
|
|
|
if (extension_ && extension_->replica_info)
|
|
|
|
res->setReplicaInfo(*extension_->replica_info);
|
|
|
|
return res;
|
2021-07-23 07:40:03 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2020-06-02 16:27:05 +00:00
|
|
|
RemoteQueryExecutor::RemoteQueryExecutor(
|
2021-07-14 13:17:30 +00:00
|
|
|
const ConnectionPoolWithFailoverPtr & pool_,
|
2021-01-19 19:21:06 +00:00
|
|
|
std::vector<IConnectionPool::Entry> && connections_,
|
2021-04-08 14:22:19 +00:00
|
|
|
const String & query_, const Block & header_, ContextPtr context_,
|
|
|
|
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
|
2021-12-09 10:39:28 +00:00
|
|
|
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
|
2021-04-08 14:22:19 +00:00
|
|
|
: header(header_), query(query_), context(context_)
|
2021-12-09 10:39:28 +00:00
|
|
|
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
|
|
|
|
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
|
|
|
|
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
|
|
|
|
, pool(pool_)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
create_connections = [this, connections_, throttler, extension_]() mutable {
|
|
|
|
auto res = std::make_shared<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
|
|
|
|
if (extension_ && extension_->replica_info)
|
|
|
|
res->setReplicaInfo(*extension_->replica_info);
|
|
|
|
return res;
|
2020-06-02 16:27:05 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
RemoteQueryExecutor::RemoteQueryExecutor(
|
2021-07-14 13:17:30 +00:00
|
|
|
const ConnectionPoolWithFailoverPtr & pool_,
|
2021-04-08 14:22:19 +00:00
|
|
|
const String & query_, const Block & header_, ContextPtr context_,
|
|
|
|
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
|
2021-12-09 10:39:28 +00:00
|
|
|
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
|
2021-04-08 14:22:19 +00:00
|
|
|
: header(header_), query(query_), context(context_)
|
2021-12-09 10:39:28 +00:00
|
|
|
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
|
|
|
|
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
|
|
|
|
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
|
|
|
|
, pool(pool_)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
create_connections = [this, throttler, extension_]()->std::shared_ptr<IConnections>
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
const Settings & current_settings = context->getSettingsRef();
|
2020-06-02 16:27:05 +00:00
|
|
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
|
2021-01-19 19:21:06 +00:00
|
|
|
|
2021-02-01 17:23:46 +00:00
|
|
|
#if defined(OS_LINUX)
|
|
|
|
if (current_settings.use_hedged_requests)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-01-19 19:21:06 +00:00
|
|
|
std::shared_ptr<QualifiedTableName> table_to_check = nullptr;
|
|
|
|
if (main_table)
|
|
|
|
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
|
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
auto res = std::make_shared<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
|
|
|
|
if (extension_ && extension_->replica_info)
|
|
|
|
res->setReplicaInfo(*extension_->replica_info);
|
|
|
|
return res;
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
2021-02-01 17:23:46 +00:00
|
|
|
#endif
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2021-02-01 17:23:46 +00:00
|
|
|
std::vector<IConnectionPool::Entry> connection_entries;
|
|
|
|
if (main_table)
|
|
|
|
{
|
|
|
|
auto try_results = pool->getManyChecked(timeouts, ¤t_settings, pool_mode, main_table.getQualifiedName());
|
|
|
|
connection_entries.reserve(try_results.size());
|
|
|
|
for (auto & try_result : try_results)
|
|
|
|
connection_entries.emplace_back(std::move(try_result.entry));
|
2021-01-19 19:21:06 +00:00
|
|
|
}
|
2021-02-01 17:23:46 +00:00
|
|
|
else
|
|
|
|
connection_entries = pool->getMany(timeouts, ¤t_settings, pool_mode);
|
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
auto res = std::make_shared<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
|
|
|
|
if (extension_ && extension_->replica_info)
|
|
|
|
res->setReplicaInfo(*extension_->replica_info);
|
|
|
|
return res;
|
2020-06-02 16:27:05 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
RemoteQueryExecutor::~RemoteQueryExecutor()
|
|
|
|
{
|
|
|
|
/** If interrupted in the middle of the loop of communication with replicas, then interrupt
|
|
|
|
* all connections, then read and skip the remaining packets to make sure
|
|
|
|
* these connections did not remain hanging in the out-of-sync state.
|
|
|
|
*/
|
2021-07-21 11:56:32 +00:00
|
|
|
if (established || isQueryPending())
|
2021-01-19 19:21:06 +00:00
|
|
|
connections->disconnect();
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/** If we receive a block with slightly different column types, or with excessive columns,
|
|
|
|
* we will adapt it to expected structure.
|
|
|
|
*/
|
2021-03-24 18:36:31 +00:00
|
|
|
static Block adaptBlockStructure(const Block & block, const Block & header)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
|
|
|
/// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
|
|
|
|
if (!header)
|
|
|
|
return block;
|
|
|
|
|
|
|
|
Block res;
|
|
|
|
res.info = block.info;
|
|
|
|
|
|
|
|
for (const auto & elem : header)
|
|
|
|
{
|
|
|
|
ColumnPtr column;
|
|
|
|
|
|
|
|
if (elem.column && isColumnConst(*elem.column))
|
|
|
|
{
|
|
|
|
/// We expect constant column in block.
|
|
|
|
/// If block is not empty, then get value for constant from it,
|
|
|
|
/// because it may be different for remote server for functions like version(), uptime(), ...
|
|
|
|
if (block.rows() > 0 && block.has(elem.name))
|
|
|
|
{
|
|
|
|
/// Const column is passed as materialized. Get first value from it.
|
|
|
|
///
|
|
|
|
/// TODO: check that column contains the same value.
|
|
|
|
/// TODO: serialize const columns.
|
|
|
|
auto col = block.getByName(elem.name);
|
|
|
|
col.column = block.getByName(elem.name).column->cut(0, 1);
|
|
|
|
|
|
|
|
column = castColumn(col, elem.type);
|
|
|
|
|
|
|
|
if (!isColumnConst(*column))
|
|
|
|
column = ColumnConst::create(column, block.rows());
|
|
|
|
else
|
|
|
|
/// It is not possible now. Just in case we support const columns serialization.
|
|
|
|
column = column->cloneResized(block.rows());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
column = elem.column->cloneResized(block.rows());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
column = castColumn(block.getByName(elem.name), elem.type);
|
|
|
|
|
|
|
|
res.insert({column, elem.type, elem.name});
|
|
|
|
}
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
void RemoteQueryExecutor::sendQuery()
|
|
|
|
{
|
|
|
|
if (sent_query)
|
|
|
|
return;
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
connections = create_connections();
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
const auto & settings = context->getSettingsRef();
|
2021-01-19 19:21:06 +00:00
|
|
|
if (settings.skip_unavailable_shards && 0 == connections->size())
|
2020-06-02 16:27:05 +00:00
|
|
|
return;
|
|
|
|
|
Fix "Unexpected packet Data received from client" error
Fix query cancelation in case of Distributed queries with LIMIT (when
the initator does not required to read all the data), since this cannot
be done until the query was sent (from the Query packet up to the empty
data Block), otherwise you will get:
2020.11.21 21:47:23.297161 [ 184 ] {} <Error> TCPHandler: Code: 101, e.displayText() = DB::Exception: Unexpected packet Data received from client, Stack trace:
0. /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/exception:129: Poco::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) @ 0x244f5bc9 in /usr/bin/clickhouse
1. /build/obj-x86_64-linux-gnu/../src/Common/Exception.cpp:40: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) @ 0xa14a421 in /usr/bin/clickhouse
2. /build/obj-x86_64-linux-gnu/../src/Common/NetException.h:0: DB::TCPHandler::receiveUnexpectedData() @ 0x1e032a74 in /usr/bin/clickhouse
3. /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:824: DB::TCPHandler::receivePacket() @ 0x1e024685 in /usr/bin/clickhouse
4. /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:178: DB::TCPHandler::runImpl() @ 0x1e01736b in /usr/bin/clickhouse
5. /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:0: DB::TCPHandler::run() @ 0x1e035c1b in /usr/bin/clickhouse
6. /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:57: Poco::Net::TCPServerConnection::start() @ 0x243559cf in /usr/bin/clickhouse
7. /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:0: Poco::Net::TCPServerDispatcher::run() @ 0x24356521 in /usr/bin/clickhouse
8. /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:0: Poco::PooledThread::run() @ 0x24609175 in /usr/bin/clickhouse
9. /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:0: Poco::ThreadImpl::runnableEntry(void*) @ 0x24603cb7 in /usr/bin/clickhouse
10. start_thread @ 0x9669 in /usr/lib/x86_64-linux-gnu/libpthread-2.30.so
11. __clone @ 0x1222b3 in /usr/lib/x86_64-linux-gnu/libc-2.30.so
2020-11-21 21:20:00 +00:00
|
|
|
/// Query cannot be canceled in the middle of the send query,
|
2020-11-28 05:37:54 +00:00
|
|
|
/// since there are multiple packets:
|
Fix "Unexpected packet Data received from client" error
Fix query cancelation in case of Distributed queries with LIMIT (when
the initator does not required to read all the data), since this cannot
be done until the query was sent (from the Query packet up to the empty
data Block), otherwise you will get:
2020.11.21 21:47:23.297161 [ 184 ] {} <Error> TCPHandler: Code: 101, e.displayText() = DB::Exception: Unexpected packet Data received from client, Stack trace:
0. /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/exception:129: Poco::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) @ 0x244f5bc9 in /usr/bin/clickhouse
1. /build/obj-x86_64-linux-gnu/../src/Common/Exception.cpp:40: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) @ 0xa14a421 in /usr/bin/clickhouse
2. /build/obj-x86_64-linux-gnu/../src/Common/NetException.h:0: DB::TCPHandler::receiveUnexpectedData() @ 0x1e032a74 in /usr/bin/clickhouse
3. /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:824: DB::TCPHandler::receivePacket() @ 0x1e024685 in /usr/bin/clickhouse
4. /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:178: DB::TCPHandler::runImpl() @ 0x1e01736b in /usr/bin/clickhouse
5. /build/obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:0: DB::TCPHandler::run() @ 0x1e035c1b in /usr/bin/clickhouse
6. /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:57: Poco::Net::TCPServerConnection::start() @ 0x243559cf in /usr/bin/clickhouse
7. /build/obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:0: Poco::Net::TCPServerDispatcher::run() @ 0x24356521 in /usr/bin/clickhouse
8. /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:0: Poco::PooledThread::run() @ 0x24609175 in /usr/bin/clickhouse
9. /build/obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:0: Poco::ThreadImpl::runnableEntry(void*) @ 0x24603cb7 in /usr/bin/clickhouse
10. start_thread @ 0x9669 in /usr/lib/x86_64-linux-gnu/libpthread-2.30.so
11. __clone @ 0x1222b3 in /usr/lib/x86_64-linux-gnu/libc-2.30.so
2020-11-21 21:20:00 +00:00
|
|
|
/// - Query
|
|
|
|
/// - Data (multiple times)
|
|
|
|
///
|
|
|
|
/// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw:
|
|
|
|
///
|
|
|
|
/// Unexpected packet Data received from client
|
|
|
|
///
|
|
|
|
std::lock_guard guard(was_cancelled_mutex);
|
|
|
|
|
2020-06-02 16:27:05 +00:00
|
|
|
established = true;
|
2020-11-20 17:23:53 +00:00
|
|
|
was_cancelled = false;
|
2020-06-02 16:27:05 +00:00
|
|
|
|
|
|
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
|
2021-04-10 23:33:54 +00:00
|
|
|
ClientInfo modified_client_info = context->getClientInfo();
|
2020-06-02 16:27:05 +00:00
|
|
|
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
|
2021-09-25 13:32:25 +00:00
|
|
|
/// Set initial_query_id to query_id for the clickhouse-benchmark.
|
|
|
|
///
|
|
|
|
/// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY,
|
|
|
|
/// due to it executes queries via RemoteBlockInputStream)
|
|
|
|
if (modified_client_info.initial_query_id.empty())
|
|
|
|
modified_client_info.initial_query_id = query_id;
|
2020-11-18 17:43:18 +00:00
|
|
|
if (CurrentThread::isInitialized())
|
|
|
|
{
|
|
|
|
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
|
|
|
|
}
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(duplicated_part_uuids_mutex);
|
|
|
|
if (!duplicated_part_uuids.empty())
|
2021-02-06 14:38:56 +00:00
|
|
|
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
|
2020-11-20 17:23:53 +00:00
|
|
|
}
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
|
2020-06-02 16:27:05 +00:00
|
|
|
|
|
|
|
established = false;
|
|
|
|
sent_query = true;
|
|
|
|
|
|
|
|
if (settings.enable_scalar_subquery_optimization)
|
|
|
|
sendScalars();
|
|
|
|
sendExternalTables();
|
|
|
|
}
|
|
|
|
|
|
|
|
Block RemoteQueryExecutor::read()
|
|
|
|
{
|
|
|
|
if (!sent_query)
|
|
|
|
{
|
|
|
|
sendQuery();
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()))
|
2020-06-02 16:27:05 +00:00
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
if (was_cancelled)
|
|
|
|
return Block();
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
Packet packet = connections->receivePacket();
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2020-12-02 17:02:14 +00:00
|
|
|
if (auto block = processPacket(std::move(packet)))
|
|
|
|
return *block;
|
2021-02-05 09:54:34 +00:00
|
|
|
else if (got_duplicated_part_uuids)
|
|
|
|
return std::get<Block>(restartQueryWithoutDuplicatedUUIDs());
|
2020-12-02 17:02:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-21 12:42:57 +00:00
|
|
|
std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext> & read_context [[maybe_unused]])
|
2020-12-02 17:02:14 +00:00
|
|
|
{
|
2020-12-18 13:15:03 +00:00
|
|
|
|
|
|
|
#if defined(OS_LINUX)
|
2020-12-02 17:02:14 +00:00
|
|
|
if (!sent_query)
|
|
|
|
{
|
|
|
|
sendQuery();
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()))
|
2020-12-03 12:21:10 +00:00
|
|
|
return Block();
|
2020-12-02 17:02:14 +00:00
|
|
|
}
|
|
|
|
|
2021-02-02 23:21:07 +00:00
|
|
|
if (!read_context || resent_query)
|
2020-12-16 20:27:31 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(was_cancelled_mutex);
|
2020-12-17 10:07:28 +00:00
|
|
|
if (was_cancelled)
|
|
|
|
return Block();
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
read_context = std::make_unique<ReadContext>(*connections);
|
2020-12-16 20:27:31 +00:00
|
|
|
}
|
2020-12-02 17:02:14 +00:00
|
|
|
|
2020-12-03 12:21:10 +00:00
|
|
|
do
|
|
|
|
{
|
2020-12-14 16:18:12 +00:00
|
|
|
if (!read_context->resumeRoutine())
|
2020-12-04 13:35:24 +00:00
|
|
|
return Block();
|
2020-12-02 17:02:14 +00:00
|
|
|
|
2021-01-04 07:59:01 +00:00
|
|
|
if (read_context->is_read_in_progress.load(std::memory_order_relaxed))
|
2020-12-02 17:02:14 +00:00
|
|
|
{
|
2020-12-03 12:21:10 +00:00
|
|
|
read_context->setTimer();
|
2021-01-19 19:21:06 +00:00
|
|
|
return read_context->epoll.getFileDescriptor();
|
2020-12-02 17:02:14 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
Fix possible data-race in case of query cancellation with async_socket_for_remote
RemoteQueryExecutor::cancel() (that is called in another thread), steal
the fiber, and process any pending packets, to leave the connection in
the correct state (so that it can be reused for further queries).
However this requires processing pending packets, and this will update
the RemoteQueryExecutorReadContext::packet field, which can be read in
another thread by the RemoteQueryExecutor::read().
This was pretty tricky due to fibers, but AFAICS I understand this
correctly and this should fix the race.
Note, that if you will look at the logs from the #28854, you will see,
that all those data races was triggered after query cancellation.
Fixes: #28854
Refs: #18715
v2: fix daedlock in case of duplicated parts
v3: do not hold the mutex, since was_cancelled is atomic
2021-11-11 20:35:20 +00:00
|
|
|
/// We need to check that query was not cancelled again,
|
|
|
|
/// to avoid the race between cancel() thread and read() thread.
|
|
|
|
/// (since cancel() thread will steal the fiber and may update the packet).
|
|
|
|
if (was_cancelled)
|
|
|
|
return Block();
|
|
|
|
|
2020-12-03 12:21:10 +00:00
|
|
|
if (auto data = processPacket(std::move(read_context->packet)))
|
|
|
|
return std::move(*data);
|
2021-02-05 09:54:34 +00:00
|
|
|
else if (got_duplicated_part_uuids)
|
|
|
|
return restartQueryWithoutDuplicatedUUIDs(&read_context);
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
}
|
2020-12-02 17:02:14 +00:00
|
|
|
while (true);
|
2020-12-18 13:15:03 +00:00
|
|
|
#else
|
|
|
|
return read();
|
2020-12-14 16:16:08 +00:00
|
|
|
#endif
|
2020-12-18 13:15:03 +00:00
|
|
|
}
|
2020-12-02 17:02:14 +00:00
|
|
|
|
2021-02-05 09:54:34 +00:00
|
|
|
|
|
|
|
std::variant<Block, int> RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs(std::unique_ptr<ReadContext> * read_context)
|
|
|
|
{
|
|
|
|
/// Cancel previous query and disconnect before retry.
|
|
|
|
cancel(read_context);
|
2021-02-06 15:23:41 +00:00
|
|
|
connections->disconnect();
|
2021-02-05 09:54:34 +00:00
|
|
|
|
|
|
|
/// Only resend once, otherwise throw an exception
|
|
|
|
if (!resent_query)
|
|
|
|
{
|
|
|
|
if (log)
|
|
|
|
LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
|
|
|
|
|
|
|
|
resent_query = true;
|
|
|
|
sent_query = false;
|
|
|
|
got_duplicated_part_uuids = false;
|
|
|
|
/// Consecutive read will implicitly send query first.
|
|
|
|
if (!read_context)
|
|
|
|
return read();
|
|
|
|
else
|
|
|
|
return read(*read_context);
|
|
|
|
}
|
|
|
|
throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
|
|
|
|
}
|
|
|
|
|
2020-12-02 17:02:14 +00:00
|
|
|
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
|
|
|
|
{
|
|
|
|
switch (packet.type)
|
|
|
|
{
|
2021-12-09 10:39:28 +00:00
|
|
|
case Protocol::Server::MergeTreeReadTaskRequest:
|
|
|
|
processMergeTreeReadTaskRequest(packet.request);
|
|
|
|
break;
|
2021-04-06 11:05:47 +00:00
|
|
|
case Protocol::Server::ReadTaskRequest:
|
2021-04-10 02:21:18 +00:00
|
|
|
processReadTaskRequest();
|
2021-04-06 11:05:47 +00:00
|
|
|
break;
|
2020-11-20 17:23:53 +00:00
|
|
|
case Protocol::Server::PartUUIDs:
|
|
|
|
if (!setPartUUIDs(packet.part_uuids))
|
|
|
|
got_duplicated_part_uuids = true;
|
|
|
|
break;
|
2020-12-02 17:02:14 +00:00
|
|
|
case Protocol::Server::Data:
|
|
|
|
/// If the block is not empty and is not a header block
|
|
|
|
if (packet.block && (packet.block.rows() > 0))
|
2021-03-24 18:36:31 +00:00
|
|
|
return adaptBlockStructure(packet.block, header);
|
2020-12-02 17:02:14 +00:00
|
|
|
break; /// If the block is empty - we will receive other packets before EndOfStream.
|
|
|
|
|
|
|
|
case Protocol::Server::Exception:
|
|
|
|
got_exception_from_replica = true;
|
|
|
|
packet.exception->rethrow();
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::EndOfStream:
|
2021-01-19 19:21:06 +00:00
|
|
|
if (!connections->hasActiveConnections())
|
2020-12-02 17:02:14 +00:00
|
|
|
{
|
|
|
|
finished = true;
|
|
|
|
return Block();
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::Progress:
|
|
|
|
/** We use the progress from a remote server.
|
|
|
|
* We also include in ProcessList,
|
|
|
|
* and we use it to check
|
|
|
|
* constraints (for example, the minimum speed of query execution)
|
|
|
|
* and quotas (for example, the number of lines to read).
|
|
|
|
*/
|
|
|
|
if (progress_callback)
|
|
|
|
progress_callback(packet.progress);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::ProfileInfo:
|
|
|
|
/// Use own (client-side) info about read bytes, it is more correct info than server-side one.
|
|
|
|
if (profile_info_callback)
|
|
|
|
profile_info_callback(packet.profile_info);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::Totals:
|
|
|
|
totals = packet.block;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::Extremes:
|
|
|
|
extremes = packet.block;
|
|
|
|
break;
|
|
|
|
|
|
|
|
case Protocol::Server::Log:
|
|
|
|
/// Pass logs from remote server to client
|
|
|
|
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
|
|
|
|
log_queue->pushBlock(std::move(packet.block));
|
|
|
|
break;
|
|
|
|
|
2021-08-30 11:04:59 +00:00
|
|
|
case Protocol::Server::ProfileEvents:
|
2021-09-01 14:47:12 +00:00
|
|
|
/// Pass profile events from remote server to client
|
2021-09-20 21:52:01 +00:00
|
|
|
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
|
2021-10-12 21:15:05 +00:00
|
|
|
if (!profile_queue->emplace(std::move(packet.block)))
|
|
|
|
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
|
2021-09-20 21:52:01 +00:00
|
|
|
break;
|
2021-08-30 11:04:59 +00:00
|
|
|
|
2020-12-02 17:02:14 +00:00
|
|
|
default:
|
|
|
|
got_unknown_packet_from_replica = true;
|
|
|
|
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
|
|
|
|
toString(packet.type),
|
2021-01-19 19:21:06 +00:00
|
|
|
connections->dumpAddresses());
|
2020-12-02 17:02:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return {};
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
|
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
auto query_context = context->getQueryContext();
|
|
|
|
auto duplicates = query_context->getPartUUIDs()->add(uuids);
|
2020-11-20 17:23:53 +00:00
|
|
|
|
|
|
|
if (!duplicates.empty())
|
|
|
|
{
|
|
|
|
std::lock_guard lock(duplicated_part_uuids_mutex);
|
|
|
|
duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end());
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-04-10 02:21:18 +00:00
|
|
|
void RemoteQueryExecutor::processReadTaskRequest()
|
2021-04-06 11:05:47 +00:00
|
|
|
{
|
2021-04-10 02:21:18 +00:00
|
|
|
if (!task_iterator)
|
|
|
|
throw Exception("Distributed task iterator is not initialized", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
auto response = (*task_iterator)();
|
2021-04-08 19:00:39 +00:00
|
|
|
connections->sendReadTaskResponse(response);
|
2021-04-06 11:05:47 +00:00
|
|
|
}
|
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
void RemoteQueryExecutor::processMergeTreeReadTaskRequest(PartitionReadRequest request)
|
|
|
|
{
|
|
|
|
if (!parallel_reading_coordinator)
|
|
|
|
throw Exception("Coordinator for parallel reading from replicas is not initialized", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
auto response = parallel_reading_coordinator->handleRequest(std::move(request));
|
|
|
|
connections->sendMergeTreeReadTaskResponse(response);
|
|
|
|
}
|
|
|
|
|
2020-12-07 13:47:11 +00:00
|
|
|
void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
|
|
|
/** If one of:
|
|
|
|
* - nothing started to do;
|
|
|
|
* - received all packets before EndOfStream;
|
|
|
|
* - received exception from one replica;
|
|
|
|
* - received an unknown packet from one replica;
|
|
|
|
* then you do not need to read anything.
|
|
|
|
*/
|
|
|
|
if (!isQueryPending() || hasThrownException())
|
|
|
|
return;
|
|
|
|
|
|
|
|
/** If you have not read all the data yet, but they are no longer needed.
|
|
|
|
* This may be due to the fact that the data is sufficient (for example, when using LIMIT).
|
|
|
|
*/
|
|
|
|
|
|
|
|
/// Send the request to abort the execution of the request, if not already sent.
|
2020-12-17 10:07:28 +00:00
|
|
|
tryCancel("Cancelling query because enough data has been read", read_context);
|
2021-11-29 20:09:55 +00:00
|
|
|
|
|
|
|
if (context->getSettingsRef().drain_timeout != Poco::Timespan(-1000000))
|
|
|
|
{
|
|
|
|
auto connections_left = ConnectionCollector::enqueueConnectionCleanup(pool, connections);
|
|
|
|
if (connections_left)
|
|
|
|
{
|
|
|
|
/// Drain connections synchronously and suppress errors.
|
|
|
|
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
|
|
|
|
ConnectionCollector::drainConnections(*connections_left, /* throw_error= */ false);
|
|
|
|
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-11-29 20:09:55 +00:00
|
|
|
/// Drain connections synchronously w/o suppressing errors.
|
2021-07-26 11:15:45 +00:00
|
|
|
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
|
2021-11-29 20:09:55 +00:00
|
|
|
ConnectionCollector::drainConnections(*connections, /* throw_error= */ true);
|
2021-07-26 11:15:45 +00:00
|
|
|
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
2021-11-29 20:09:55 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
finished = true;
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
2020-12-04 13:35:24 +00:00
|
|
|
void RemoteQueryExecutor::cancel(std::unique_ptr<ReadContext> * read_context)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
|
|
|
{
|
|
|
|
std::lock_guard lock(external_tables_mutex);
|
|
|
|
|
|
|
|
/// Stop sending external data.
|
|
|
|
for (auto & vec : external_tables_data)
|
|
|
|
for (auto & elem : vec)
|
|
|
|
elem->is_cancelled = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!isQueryPending() || hasThrownException())
|
|
|
|
return;
|
|
|
|
|
2020-12-17 10:07:28 +00:00
|
|
|
tryCancel("Cancelling query", read_context);
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void RemoteQueryExecutor::sendScalars()
|
|
|
|
{
|
2021-01-19 19:21:06 +00:00
|
|
|
connections->sendScalarsData(scalars);
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void RemoteQueryExecutor::sendExternalTables()
|
|
|
|
{
|
2021-01-19 19:21:06 +00:00
|
|
|
size_t count = connections->size();
|
2020-06-02 16:27:05 +00:00
|
|
|
|
|
|
|
{
|
|
|
|
std::lock_guard lock(external_tables_mutex);
|
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
external_tables_data.clear();
|
2020-06-02 16:27:05 +00:00
|
|
|
external_tables_data.reserve(count);
|
|
|
|
|
2021-11-25 15:41:50 +00:00
|
|
|
StreamLocalLimits limits;
|
|
|
|
const auto & settings = context->getSettingsRef();
|
|
|
|
limits.mode = LimitsMode::LIMITS_TOTAL;
|
|
|
|
limits.speed_limits.max_execution_time = settings.max_execution_time;
|
|
|
|
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
|
|
|
|
|
2020-06-02 16:27:05 +00:00
|
|
|
for (size_t i = 0; i < count; ++i)
|
|
|
|
{
|
|
|
|
ExternalTablesData res;
|
|
|
|
for (const auto & table : external_tables)
|
|
|
|
{
|
|
|
|
StoragePtr cur = table.second;
|
|
|
|
|
|
|
|
auto data = std::make_unique<ExternalTableData>();
|
|
|
|
data->table_name = table.first;
|
2021-11-25 15:41:50 +00:00
|
|
|
data->creating_pipe_callback = [cur, limits, context = this->context]()
|
2021-04-30 14:19:48 +00:00
|
|
|
{
|
|
|
|
SelectQueryInfo query_info;
|
|
|
|
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
|
|
|
|
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
|
2021-04-21 16:00:27 +00:00
|
|
|
context, QueryProcessingStage::Complete, metadata_snapshot, query_info);
|
2021-04-30 14:19:48 +00:00
|
|
|
|
|
|
|
Pipe pipe = cur->read(
|
|
|
|
metadata_snapshot->getColumns().getNamesOfPhysical(),
|
|
|
|
metadata_snapshot, query_info, context,
|
|
|
|
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
|
|
|
|
|
|
|
|
if (pipe.empty())
|
|
|
|
return std::make_unique<Pipe>(
|
2020-06-16 15:51:29 +00:00
|
|
|
std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), Chunk()));
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2021-11-25 15:41:50 +00:00
|
|
|
pipe.addTransform(std::make_shared<LimitsCheckingTransform>(pipe.getHeader(), limits));
|
|
|
|
|
2021-04-30 14:19:48 +00:00
|
|
|
return std::make_unique<Pipe>(std::move(pipe));
|
|
|
|
};
|
|
|
|
|
|
|
|
data->pipe = data->creating_pipe_callback();
|
2020-06-02 16:27:05 +00:00
|
|
|
res.emplace_back(std::move(data));
|
|
|
|
}
|
|
|
|
external_tables_data.push_back(std::move(res));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-19 19:21:06 +00:00
|
|
|
connections->sendExternalTablesData(external_tables_data);
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
2020-12-17 10:07:28 +00:00
|
|
|
void RemoteQueryExecutor::tryCancel(const char * reason, std::unique_ptr<ReadContext> * read_context)
|
2020-06-02 16:27:05 +00:00
|
|
|
{
|
2021-07-14 13:17:30 +00:00
|
|
|
/// Flag was_cancelled is atomic because it is checked in read().
|
|
|
|
std::lock_guard guard(was_cancelled_mutex);
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
if (was_cancelled)
|
|
|
|
return;
|
2020-06-02 16:27:05 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
was_cancelled = true;
|
2020-12-17 10:07:28 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
if (read_context && *read_context)
|
2021-08-19 06:06:54 +00:00
|
|
|
{
|
|
|
|
/// The timer should be set for query cancellation to avoid query cancellation hung.
|
|
|
|
///
|
|
|
|
/// Since in case the remote server will abnormally terminated, neither
|
|
|
|
/// FIN nor RST packet will be sent, and the initiator will not know that
|
|
|
|
/// the connection died (unless tcp_keep_alive_timeout > 0).
|
|
|
|
///
|
|
|
|
/// Also note that it is possible to get this situation even when
|
|
|
|
/// enough data already had been read.
|
|
|
|
(*read_context)->setTimer();
|
2021-07-14 13:17:30 +00:00
|
|
|
(*read_context)->cancel();
|
2021-08-19 06:06:54 +00:00
|
|
|
}
|
2020-12-17 10:07:28 +00:00
|
|
|
|
2021-07-14 13:17:30 +00:00
|
|
|
connections->sendCancel();
|
2020-06-02 16:27:05 +00:00
|
|
|
|
|
|
|
if (log)
|
2021-01-19 19:21:06 +00:00
|
|
|
LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason);
|
2020-06-02 16:27:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool RemoteQueryExecutor::isQueryPending() const
|
|
|
|
{
|
|
|
|
return sent_query && !finished;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RemoteQueryExecutor::hasThrownException() const
|
|
|
|
{
|
|
|
|
return got_exception_from_replica || got_unknown_packet_from_replica;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|