ClickHouse/src/Server/TCPHandler.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

2342 lines
83 KiB
C++
Raw Normal View History

#include "Interpreters/AsynchronousInsertQueue.h"
#include "Interpreters/Context_fwd.h"
#include "Interpreters/SquashingTransform.h"
#include "Parsers/ASTInsertQuery.h"
2021-08-31 13:50:56 +00:00
#include <algorithm>
#include <exception>
2021-08-31 13:50:56 +00:00
#include <iterator>
2021-09-02 14:27:19 +00:00
#include <memory>
2021-08-31 13:50:56 +00:00
#include <mutex>
#include <vector>
2021-10-31 15:11:46 +00:00
#include <string_view>
2013-01-13 22:13:54 +00:00
#include <Poco/Net/NetException.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
2018-08-31 00:59:48 +00:00
#include <Common/setThreadName.h>
#include <Common/OpenSSLHelpers.h>
#include <IO/Progress.h>
2018-12-28 18:15:26 +00:00
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
2020-12-02 21:05:51 +00:00
#include <IO/LimitReadBuffer.h>
2018-09-20 20:51:21 +00:00
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
2021-10-15 20:18:20 +00:00
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
2021-08-01 14:12:34 +00:00
#include <Interpreters/Session.h>
#include <Server/TCPServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
2021-04-12 21:42:52 +00:00
#include <Storages/StorageS3Cluster.h>
#include <Core/ExternalTable.h>
#include <Core/ServerSettings.h>
#include <Access/AccessControl.h>
2021-08-01 14:12:34 +00:00
#include <Access/Credentials.h>
#include <DataTypes/DataTypeLowCardinality.h>
2018-12-21 12:17:30 +00:00
#include <Compression/CompressionFactory.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/thread_local_rng.h>
#include <fmt/format.h>
2020-05-20 19:01:36 +00:00
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
2021-09-10 14:52:24 +00:00
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
2021-09-15 19:35:48 +00:00
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
2019-03-26 18:28:37 +00:00
2023-03-15 00:09:29 +00:00
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
# include <Poco/Net/SecureStreamSocketImpl.h>
#endif
2023-11-13 09:09:23 +00:00
#include <Core/Protocol.h>
#include <Storages/MergeTree/RequestResponse.h>
2012-03-09 15:46:52 +00:00
#include "TCPHandler.h"
2023-11-13 09:09:23 +00:00
#include <Common/config_version.h>
2021-10-31 15:11:46 +00:00
using namespace std::literals;
using namespace DB;
2021-10-31 15:11:46 +00:00
namespace CurrentMetrics
{
extern const Metric QueryThread;
extern const Metric ReadTaskRequestsSent;
extern const Metric MergeTreeReadTaskRequestsSent;
extern const Metric MergeTreeAllRangesAnnouncementsSent;
}
namespace ProfileEvents
{
extern const Event ReadTaskRequestsSent;
extern const Event MergeTreeReadTaskRequestsSent;
extern const Event MergeTreeAllRangesAnnouncementsSent;
extern const Event ReadTaskRequestsSentElapsedMicroseconds;
extern const Event MergeTreeReadTaskRequestsSentElapsedMicroseconds;
extern const Event MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds;
}
2023-08-22 03:52:57 +00:00
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CLIENT_HAS_CONNECTED_TO_WRONG_PORT;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_PACKET_FROM_CLIENT;
extern const int POCO_EXCEPTION;
extern const int SOCKET_TIMEOUT;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int UNKNOWN_PROTOCOL;
extern const int AUTHENTICATION_FAILED;
extern const int QUERY_WAS_CANCELLED;
extern const int CLIENT_INFO_DOES_NOT_MATCH;
extern const int TIMEOUT_EXCEEDED;
extern const int SUPPORT_IS_DISABLED;
extern const int UNSUPPORTED_METHOD;
2023-08-22 03:52:57 +00:00
}
namespace
{
NameToNameMap convertToQueryParameters(const Settings & passed_params)
{
NameToNameMap query_parameters;
for (const auto & param : passed_params)
{
std::string value;
ReadBufferFromOwnString buf(param.getValueString());
readQuoted(value, buf);
query_parameters.emplace(param.getName(), value);
}
return query_parameters;
}
2023-08-30 06:14:39 +00:00
// This function corrects the wrong client_name from the old client.
2023-08-29 23:50:41 +00:00
// Old clients 28.7 and some intermediate versions of 28.7 were sending different ClientInfo.client_name
// "ClickHouse client" was sent with the hello message.
// "ClickHouse" or "ClickHouse " was sent with the query message.
2023-08-30 06:14:39 +00:00
void correctQueryClientInfo(const ClientInfo & session_client_info, ClientInfo & client_info)
2023-08-29 23:50:41 +00:00
{
if (client_info.getVersionNumber() <= VersionNumber(23, 8, 1) &&
2023-08-30 06:14:39 +00:00
session_client_info.client_name == "ClickHouse client" &&
2023-08-29 23:50:41 +00:00
(client_info.client_name == "ClickHouse" || client_info.client_name == "ClickHouse "))
{
client_info.client_name = "ClickHouse client";
}
}
2023-08-22 03:52:57 +00:00
void validateClientInfo(const ClientInfo & session_client_info, const ClientInfo & client_info)
{
// Secondary query may contain different client_info.
// In the case of select from distributed table or 'select * from remote' from non-tcp handler. Server sends the initial client_info data.
//
// Example 1: curl -q -s --max-time 60 -sS "http://127.0.0.1:8123/?" -d "SELECT 1 FROM remote('127.0.0.1', system.one)"
// HTTP handler initiates TCP connection with remote 127.0.0.1 (session on remote 127.0.0.1 use TCP interface)
// HTTP handler sends client_info with HTTP interface and HTTP data by TCP protocol in Protocol::Client::Query message.
//
// Example 2: select * from <distributed_table> --host shard_1 // distributed table has 2 shards: shard_1, shard_2
// shard_1 receives a message with 'ClickHouse client' client_name
// shard_1 initiates TCP connection with shard_2 with 'ClickHouse server' client_name.
// shard_1 sends 'ClickHouse client' client_name in Protocol::Client::Query message to shard_2.
if (client_info.query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
return;
if (session_client_info.interface != client_info.interface)
{
throw Exception(
DB::ErrorCodes::CLIENT_INFO_DOES_NOT_MATCH,
"Client info's interface does not match: {} not equal to {}",
toString(session_client_info.interface),
toString(client_info.interface));
}
if (session_client_info.interface == ClientInfo::Interface::TCP)
{
if (session_client_info.client_name != client_info.client_name)
throw Exception(
DB::ErrorCodes::CLIENT_INFO_DOES_NOT_MATCH,
"Client info's client_name does not match: {} not equal to {}",
session_client_info.client_name,
client_info.client_name);
// TCP handler got patch version 0 always for backward compatibility.
if (!session_client_info.clientVersionEquals(client_info, false))
throw Exception(
DB::ErrorCodes::CLIENT_INFO_DOES_NOT_MATCH,
"Client info's version does not match: {} not equal to {}",
session_client_info.getVersionStr(),
client_info.getVersionStr());
// os_user, quota_key, client_trace_context can be different.
}
}
}
2012-03-09 15:46:52 +00:00
namespace DB
{
TCPHandler::TCPHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool parse_proxy_protocol_,
std::string server_display_name_,
std::string host_name_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, parse_proxy_protocol(parse_proxy_protocol_)
2024-01-23 17:04:50 +00:00
, log(getLogger("TCPHandler"))
, read_event(read_event_)
, write_event(write_event_)
2024-02-02 14:04:27 +00:00
, server_display_name(std::move(server_display_name_))
, host_name(std::move(host_name_))
{
}
TCPHandler::TCPHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
TCPProtocolStackData & stack_data,
std::string server_display_name_,
std::string host_name_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
2024-01-23 17:04:50 +00:00
, log(getLogger("TCPHandler"))
, forwarded_for(stack_data.forwarded_for)
, certificate(stack_data.certificate)
, read_event(read_event_)
, write_event(write_event_)
, default_database(stack_data.default_database)
2024-02-02 14:04:27 +00:00
, server_display_name(std::move(server_display_name_))
, host_name(std::move(host_name_))
{
if (!forwarded_for.empty())
LOG_TRACE(log, "Forwarded client address: {}", forwarded_for);
}
TCPHandler: catch exceptions from the WriteBuffer in destructor For TCPHandler it is safe thing todo. Otherwise *San will report [1]: 2021.01.24 15:33:40.103996 [ 270 ] {} <Trace> BaseDaemon: Received signal -1 2021.01.24 15:33:40.110693 [ 270 ] {} <Fatal> BaseDaemon: (version 21.2.1.5789, build id: FF421B087D1E2EAA19FA17B5AB3AE413832744E0) (from thread 48318) Terminate called for uncaught exception: 2021.01.24 15:33:40.114845 [ 270 ] {} <Trace> BaseDaemon: Received signal 6 2021.01.24 15:33:40.138738 [ 218027 ] {} <Fatal> BaseDaemon: ######################################## 2021.01.24 15:33:40.138838 [ 218027 ] {} <Fatal> BaseDaemon: (version 21.2.1.5789, build id: FF421B087D1E2EAA19FA17B5AB3AE413832744E0) (from thread 48318) (no query) Received signal Aborted (6) 2021.01.24 15:33:40.138912 [ 218027 ] {} <Fatal> BaseDaemon: 2021.01.24 15:33:40.139277 [ 218027 ] {} <Fatal> BaseDaemon: Stack trace: 0x7f185474118b 0x7f1854720859 0xaddc0cc 0x2af9fab8 0x2af9fa04 0xa91758b 0x1e418bb5 0x20725b4f 0x20725d9e 0x266b47a3 0x269772f5 0x26971847 0x7f18548f6609 0x7f185481d293 2021.01.24 15:33:40.139637 [ 218027 ] {} <Fatal> BaseDaemon: 3. raise @ 0x4618b in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:40.140113 [ 218027 ] {} <Fatal> BaseDaemon: 4. abort @ 0x25859 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:40.144121 [ 218027 ] {} <Fatal> BaseDaemon: 5. ./obj-x86_64-linux-gnu/../base/daemon/BaseDaemon.cpp:0: terminate_handler() @ 0xaddc0cc in /usr/bin/clickhouse 2021.01.24 15:33:40.151208 [ 218027 ] {} <Fatal> BaseDaemon: 6. ./obj-x86_64-linux-gnu/../contrib/libcxxabi/src/cxa_handlers.cpp:61: std::__terminate(void (*)()) @ 0x2af9fab8 in /usr/bin/clickhouse 2021.01.24 15:33:40.153085 [ 218027 ] {} <Fatal> BaseDaemon: 7. ./obj-x86_64-linux-gnu/../contrib/libcxxabi/src/cxa_handlers.cpp:0: std::terminate() @ 0x2af9fa04 in /usr/bin/clickhouse 2021.01.24 15:33:40.155209 [ 218027 ] {} <Fatal> BaseDaemon: 8. ? @ 0xa91758b in /usr/bin/clickhouse 2021.01.24 15:33:40.156621 [ 218027 ] {} <Fatal> BaseDaemon: 9. ./obj-x86_64-linux-gnu/../src/IO/WriteBufferFromPocoSocket.cpp:0: DB::WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket() @ 0x1e418bb5 in /usr/bin/clickhouse 2021.01.24 15:33:40.161041 [ 218027 ] {} <Fatal> BaseDaemon: 10. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2518: DB::TCPHandler::~TCPHandler() @ 0x20725b4f in /usr/bin/clickhouse 2021.01.24 15:33:40.164557 [ 218027 ] {} <Fatal> BaseDaemon: 11. ./obj-x86_64-linux-gnu/../src/Server/TCPHandler.h:101: DB::TCPHandler::~TCPHandler() @ 0x20725d9e in /usr/bin/clickhouse 2021.01.24 15:33:40.165921 [ 218027 ] {} <Fatal> BaseDaemon: 12. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/include/Poco/AtomicCounter.h:314: Poco::Net::TCPServerDispatcher::run() @ 0x266b47a3 in /usr/bin/clickhouse 2021.01.24 15:33:40.167347 [ 218027 ] {} <Fatal> BaseDaemon: 13. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:0: Poco::PooledThread::run() @ 0x269772f5 in /usr/bin/clickhouse 2021.01.24 15:33:40.169401 [ 218027 ] {} <Fatal> BaseDaemon: 14. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:0: Poco::ThreadImpl::runnableEntry(void*) @ 0x26971847 in /usr/bin/clickhouse 2021.01.24 15:33:40.169498 [ 218027 ] {} <Fatal> BaseDaemon: 15. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so 2021.01.24 15:33:40.169566 [ 218027 ] {} <Fatal> BaseDaemon: 16. __clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:41.027601 [ 218027 ] {} <Fatal> BaseDaemon: Calculated checksum of the binary: 63D7491B39260494BA0D785E1860B427. There is no information about the reference checksum. [1]: https://clickhouse-test-reports.s3.yandex.net/19451/1e16bd6f337985a82fbdf4eded695dc6e663af58/stress_test_(address).html#fail1 v2: Fix catching errors in WriteBufferFromPocoSocket destructor
2021-01-28 04:07:51 +00:00
TCPHandler::~TCPHandler()
{
try
{
state.reset();
if (out)
out->next();
TCPHandler: catch exceptions from the WriteBuffer in destructor For TCPHandler it is safe thing todo. Otherwise *San will report [1]: 2021.01.24 15:33:40.103996 [ 270 ] {} <Trace> BaseDaemon: Received signal -1 2021.01.24 15:33:40.110693 [ 270 ] {} <Fatal> BaseDaemon: (version 21.2.1.5789, build id: FF421B087D1E2EAA19FA17B5AB3AE413832744E0) (from thread 48318) Terminate called for uncaught exception: 2021.01.24 15:33:40.114845 [ 270 ] {} <Trace> BaseDaemon: Received signal 6 2021.01.24 15:33:40.138738 [ 218027 ] {} <Fatal> BaseDaemon: ######################################## 2021.01.24 15:33:40.138838 [ 218027 ] {} <Fatal> BaseDaemon: (version 21.2.1.5789, build id: FF421B087D1E2EAA19FA17B5AB3AE413832744E0) (from thread 48318) (no query) Received signal Aborted (6) 2021.01.24 15:33:40.138912 [ 218027 ] {} <Fatal> BaseDaemon: 2021.01.24 15:33:40.139277 [ 218027 ] {} <Fatal> BaseDaemon: Stack trace: 0x7f185474118b 0x7f1854720859 0xaddc0cc 0x2af9fab8 0x2af9fa04 0xa91758b 0x1e418bb5 0x20725b4f 0x20725d9e 0x266b47a3 0x269772f5 0x26971847 0x7f18548f6609 0x7f185481d293 2021.01.24 15:33:40.139637 [ 218027 ] {} <Fatal> BaseDaemon: 3. raise @ 0x4618b in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:40.140113 [ 218027 ] {} <Fatal> BaseDaemon: 4. abort @ 0x25859 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:40.144121 [ 218027 ] {} <Fatal> BaseDaemon: 5. ./obj-x86_64-linux-gnu/../base/daemon/BaseDaemon.cpp:0: terminate_handler() @ 0xaddc0cc in /usr/bin/clickhouse 2021.01.24 15:33:40.151208 [ 218027 ] {} <Fatal> BaseDaemon: 6. ./obj-x86_64-linux-gnu/../contrib/libcxxabi/src/cxa_handlers.cpp:61: std::__terminate(void (*)()) @ 0x2af9fab8 in /usr/bin/clickhouse 2021.01.24 15:33:40.153085 [ 218027 ] {} <Fatal> BaseDaemon: 7. ./obj-x86_64-linux-gnu/../contrib/libcxxabi/src/cxa_handlers.cpp:0: std::terminate() @ 0x2af9fa04 in /usr/bin/clickhouse 2021.01.24 15:33:40.155209 [ 218027 ] {} <Fatal> BaseDaemon: 8. ? @ 0xa91758b in /usr/bin/clickhouse 2021.01.24 15:33:40.156621 [ 218027 ] {} <Fatal> BaseDaemon: 9. ./obj-x86_64-linux-gnu/../src/IO/WriteBufferFromPocoSocket.cpp:0: DB::WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket() @ 0x1e418bb5 in /usr/bin/clickhouse 2021.01.24 15:33:40.161041 [ 218027 ] {} <Fatal> BaseDaemon: 10. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2518: DB::TCPHandler::~TCPHandler() @ 0x20725b4f in /usr/bin/clickhouse 2021.01.24 15:33:40.164557 [ 218027 ] {} <Fatal> BaseDaemon: 11. ./obj-x86_64-linux-gnu/../src/Server/TCPHandler.h:101: DB::TCPHandler::~TCPHandler() @ 0x20725d9e in /usr/bin/clickhouse 2021.01.24 15:33:40.165921 [ 218027 ] {} <Fatal> BaseDaemon: 12. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/include/Poco/AtomicCounter.h:314: Poco::Net::TCPServerDispatcher::run() @ 0x266b47a3 in /usr/bin/clickhouse 2021.01.24 15:33:40.167347 [ 218027 ] {} <Fatal> BaseDaemon: 13. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:0: Poco::PooledThread::run() @ 0x269772f5 in /usr/bin/clickhouse 2021.01.24 15:33:40.169401 [ 218027 ] {} <Fatal> BaseDaemon: 14. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:0: Poco::ThreadImpl::runnableEntry(void*) @ 0x26971847 in /usr/bin/clickhouse 2021.01.24 15:33:40.169498 [ 218027 ] {} <Fatal> BaseDaemon: 15. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so 2021.01.24 15:33:40.169566 [ 218027 ] {} <Fatal> BaseDaemon: 16. __clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:41.027601 [ 218027 ] {} <Fatal> BaseDaemon: Calculated checksum of the binary: 63D7491B39260494BA0D785E1860B427. There is no information about the reference checksum. [1]: https://clickhouse-test-reports.s3.yandex.net/19451/1e16bd6f337985a82fbdf4eded695dc6e663af58/stress_test_(address).html#fail1 v2: Fix catching errors in WriteBufferFromPocoSocket destructor
2021-01-28 04:07:51 +00:00
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2012-03-09 15:46:52 +00:00
void TCPHandler::runImpl()
{
2018-08-31 00:59:48 +00:00
setThreadName("TCPHandler");
ThreadStatus thread_status;
2018-08-31 00:59:48 +00:00
2021-08-01 14:12:34 +00:00
extractConnectionSettingsFromContext(server.context());
2021-08-01 14:12:34 +00:00
socket().setReceiveTimeout(receive_timeout);
socket().setSendTimeout(send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket(), read_event);
out = std::make_shared<WriteBufferFromPocoSocket>(socket(), write_event);
2020-12-02 21:05:51 +00:00
/// Support for PROXY protocol
if (parse_proxy_protocol && !receiveProxyHeader())
return;
2016-01-06 20:37:50 +00:00
if (in->eof())
{
LOG_INFO(log, "Client has not sent any data.");
2016-01-06 20:37:50 +00:00
return;
}
/// User will be authenticated here. It will also set settings from user profile into connection_context.
2013-08-10 09:04:45 +00:00
try
{
receiveHello();
2023-07-30 22:09:03 +00:00
/// In interserver mode queries are executed without a session context.
if (!is_interserver_mode)
session->makeSessionContext();
sendHello();
2022-08-03 20:36:52 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM)
2022-08-03 19:44:08 +00:00
receiveAddendum();
2023-07-30 22:09:03 +00:00
if (!is_interserver_mode)
{
/// If session created, then settings in session context has been updated.
/// So it's better to update the connection settings for flexibility.
extractConnectionSettingsFromContext(session->sessionContext());
/// When connecting, the default database could be specified.
if (!default_database.empty())
session->sessionContext()->setCurrentDatabase(default_database);
}
2013-08-10 09:04:45 +00:00
}
2017-03-09 04:26:17 +00:00
catch (const Exception & e) /// Typical for an incorrect username, password, or address.
2013-08-10 09:04:45 +00:00
{
if (e.code() == ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT)
{
LOG_DEBUG(log, "Client has connected to wrong port.");
return;
}
2016-01-06 20:37:50 +00:00
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
{
LOG_INFO(log, "Client has gone away.");
2016-01-06 20:37:50 +00:00
return;
}
2013-08-10 09:04:45 +00:00
try
{
/// We try to send error information to the client.
2021-08-01 14:12:34 +00:00
sendException(e, send_exception_with_stack_trace);
2013-08-10 09:04:45 +00:00
}
2023-09-25 20:19:09 +00:00
catch (...) {} // NOLINT(bugprone-empty-catch)
2013-08-10 09:04:45 +00:00
throw;
}
while (tcp_server.isOpen())
2012-03-09 15:46:52 +00:00
{
2019-07-04 22:57:26 +00:00
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
2019-07-04 22:23:45 +00:00
{
Stopwatch idle_time;
UInt64 timeout_ms = std::min(poll_interval, idle_connection_timeout) * 1000000;
while (tcp_server.isOpen() && !server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_ms))
2019-07-04 22:23:45 +00:00
{
if (idle_time.elapsedSeconds() > idle_connection_timeout)
2019-07-04 23:03:20 +00:00
{
LOG_TRACE(log, "Closing idle connection");
2019-07-04 22:23:45 +00:00
return;
2019-07-04 23:03:20 +00:00
}
2019-07-04 22:23:45 +00:00
}
}
/// If we need to shut down, or client disconnects.
if (!tcp_server.isOpen() || server.isCancelled() || in->eof())
2022-08-05 11:33:27 +00:00
{
LOG_TEST(log, "Closing connection (open: {}, cancelled: {}, eof: {})", tcp_server.isOpen(), server.isCancelled(), in->eof());
2012-08-16 17:50:54 +00:00
break;
2022-08-05 11:33:27 +00:00
}
2012-05-09 08:16:09 +00:00
state.reset();
/// Initialized later.
std::optional<CurrentThread::QueryScope> query_scope;
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
2017-03-09 04:26:17 +00:00
/** An exception during the execution of request (it must be sent over the network to the client).
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
2022-04-15 23:56:45 +00:00
std::unique_ptr<DB::Exception> exception;
bool network_error = false;
bool query_duration_already_logged = false;
2023-08-15 13:37:09 +00:00
auto log_query_duration = [this, &query_duration_already_logged]()
{
if (query_duration_already_logged)
return;
query_duration_already_logged = true;
auto elapsed_sec = state.watch.elapsedSeconds();
/// We already logged more detailed info if we read some rows
if (elapsed_sec < 1.0 && state.progress.read_rows)
return;
LOG_DEBUG(log, "Processed in {} sec.", elapsed_sec);
};
2012-05-08 05:42:05 +00:00
try
{
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
SCOPE_EXIT({state.timeout_setter.reset();});
/** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
* It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice.
*/
if (!receivePacket())
continue;
/** If part_uuids got received in previous packet, trying to read again.
*/
2021-08-01 14:12:34 +00:00
if (state.empty() && state.part_uuids_to_ignore && !receivePacket())
continue;
2022-07-07 09:44:19 +00:00
/// Set up tracing context for this query on current thread
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>("TCPHandler",
query_context->getClientInfo().client_trace_context,
query_context->getSettingsRef(),
query_context->getOpenTelemetrySpanLog());
2023-03-12 13:31:08 +00:00
thread_trace_context->root_span.kind = OpenTelemetry::SERVER;
2023-03-03 00:22:04 +00:00
query_scope.emplace(query_context, /* fatal_error_callback */ [this]
{
std::lock_guard lock(fatal_error_mutex);
sendLogs();
});
2021-08-01 14:12:34 +00:00
/// If query received, then settings in query_context has been updated.
/// So it's better to update the connection settings for flexibility.
extractConnectionSettingsFromContext(query_context);
/// Sync timeouts on client and server during current query to avoid dangling queries on server
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), send_timeout, receive_timeout);
2018-08-24 07:30:53 +00:00
/// Should we send internal logs to client?
2019-07-10 12:19:17 +00:00
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
2020-09-17 12:15:05 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
2019-08-09 13:02:01 +00:00
&& client_logs_level != LogsLevel::none)
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
2019-07-10 12:19:17 +00:00
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
2022-07-13 08:15:37 +00:00
state.logs_queue->setSourceRegexp(query_context->getSettingsRef().send_logs_source_regexp);
2019-08-09 13:02:01 +00:00
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
}
2021-11-08 13:43:34 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
{
state.profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
CurrentThread::attachInternalProfileEventsQueue(state.profile_queue);
}
2021-08-01 14:12:34 +00:00
query_context->setExternalTablesInitializer([this] (ContextPtr context)
2018-08-24 00:07:25 +00:00
{
if (context != query_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in external tables initializer");
/// Get blocks of temporary tables
2021-08-01 14:12:34 +00:00
readData();
/// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset();
2018-11-26 00:56:50 +00:00
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
});
2019-05-28 18:30:10 +00:00
/// Send structure of columns to client for function input()
query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage)
2019-05-28 18:30:10 +00:00
{
if (context != query_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in Input initializer");
2019-05-28 18:30:10 +00:00
auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
2019-05-28 18:30:10 +00:00
state.need_receive_data_for_input = true;
/// Send ColumnsDescription for input storage.
2020-09-17 12:15:05 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
2019-05-28 18:30:10 +00:00
&& query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
sendTableColumns(metadata_snapshot->getColumns());
2019-05-28 18:30:10 +00:00
}
/// Send block to the client - input storage structure.
state.input_header = metadata_snapshot->getSampleBlock();
2019-05-28 18:30:10 +00:00
sendData(state.input_header);
sendTimezone();
2019-05-28 18:30:10 +00:00
});
2021-08-01 14:12:34 +00:00
query_context->setInputBlocksReaderCallback([this] (ContextPtr context) -> Block
2019-05-28 18:30:10 +00:00
{
if (context != query_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in InputBlocksReader");
2019-05-28 18:30:10 +00:00
2021-08-01 14:12:34 +00:00
if (!readDataNext())
2019-05-28 18:30:10 +00:00
{
state.block_in.reset();
state.maybe_compressed_in.reset();
return Block();
2019-05-28 20:16:24 +00:00
}
2019-05-28 18:30:10 +00:00
return state.block_for_input;
});
customizeContext(query_context);
2021-04-12 17:07:01 +00:00
/// This callback is needed for requesting read tasks inside pipeline for distributed processing
2021-04-13 10:59:02 +00:00
query_context->setReadTaskCallback([this]() -> String
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent);
2021-04-08 19:00:39 +00:00
std::lock_guard lock(task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return {};
2021-04-10 02:21:18 +00:00
sendReadTaskRequestAssumeLocked();
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsSent);
auto res = receiveReadTaskResponseAssumeLocked();
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsSentElapsedMicroseconds, watch.elapsedMicroseconds());
return res;
});
2023-02-03 13:34:18 +00:00
query_context->setMergeTreeAllRangesCallback([this](InitialAllRangesAnnouncement announcement)
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
2023-02-03 13:34:18 +00:00
std::lock_guard lock(task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
2023-02-03 13:34:18 +00:00
return;
sendMergeTreeAllRangesAnnouncementAssumeLocked(announcement);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSent);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, watch.elapsedMicroseconds());
2023-02-03 13:34:18 +00:00
});
query_context->setMergeTreeReadTaskCallback([this](ParallelReadRequest request) -> std::optional<ParallelReadResponse>
{
Stopwatch watch;
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex);
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return std::nullopt;
sendMergeTreeReadTaskRequestAssumeLocked(std::move(request));
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsSent);
auto res = receivePartitionMergeTreeReadTaskResponseAssumeLocked();
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsSentElapsedMicroseconds, watch.elapsedMicroseconds());
return res;
});
/// Processing Query
2023-11-07 14:58:25 +00:00
std::tie(state.parsed_query, state.io) = executeQuery(state.query, query_context, QueryFlags{}, state.stage);
2012-05-09 15:15:45 +00:00
after_check_cancelled.restart();
after_send_progress.restart();
2023-02-25 00:18:34 +00:00
auto finish_or_cancel = [this]()
{
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
2023-02-25 00:18:34 +00:00
state.io.onCancelOrConnectionLoss();
else
state.io.onFinish();
};
2021-09-15 19:35:48 +00:00
if (state.io.pipeline.pushing())
2020-10-10 01:43:07 +00:00
{
2022-10-21 03:33:17 +00:00
/// FIXME: check explicitly that insert query suggests to receive data via native protocol,
2020-10-10 01:43:07 +00:00
state.need_receive_data_for_insert = true;
2021-08-01 14:12:34 +00:00
processInsertQuery();
2023-02-25 00:18:34 +00:00
finish_or_cancel();
2020-10-10 01:43:07 +00:00
}
2021-09-16 17:40:42 +00:00
else if (state.io.pipeline.pulling())
{
processOrdinaryQueryWithProcessors();
2023-02-25 00:18:34 +00:00
finish_or_cancel();
2021-09-16 17:40:42 +00:00
}
2021-09-17 11:40:03 +00:00
else if (state.io.pipeline.completed())
2019-05-28 18:30:10 +00:00
{
2021-09-22 13:29:58 +00:00
{
2022-10-21 03:33:17 +00:00
CompletedPipelineExecutor executor(state.io.pipeline);
/// Should not check for cancel in case of input.
if (!state.need_receive_data_for_input)
2021-09-22 13:29:58 +00:00
{
2022-10-21 03:33:17 +00:00
auto callback = [this]()
{
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
2023-01-10 15:34:52 +00:00
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
return true;
2021-09-22 13:29:58 +00:00
2022-10-21 03:33:17 +00:00
sendProgress();
sendSelectProfileEvents();
sendLogs();
2021-09-22 13:29:58 +00:00
2022-10-21 03:33:17 +00:00
return false;
};
2021-09-22 13:29:58 +00:00
2022-10-21 03:33:17 +00:00
executor.setCancelCallback(callback, interactive_delay / 1000);
}
executor.execute();
2021-09-22 13:29:58 +00:00
}
2023-02-25 00:18:34 +00:00
finish_or_cancel();
std::lock_guard lock(task_callback_mutex);
/// Send final progress after calling onFinish(), since it will update the progress.
///
2022-08-08 04:55:41 +00:00
/// NOTE: we cannot send Progress for regular INSERT (with VALUES)
2022-04-17 23:02:49 +00:00
/// without breaking protocol compatibility, but it can be done
/// by increasing revision.
sendProgress();
2022-08-08 05:12:08 +00:00
sendSelectProfileEvents();
2019-05-28 18:30:10 +00:00
}
else
{
2023-02-25 00:18:34 +00:00
finish_or_cancel();
}
/// Do it before sending end of stream, to have a chance to show log message in client.
query_scope->logPeakMemoryUsage();
2023-08-15 13:37:09 +00:00
log_query_duration();
if (state.is_connection_closed)
break;
{
std::lock_guard lock(task_callback_mutex);
sendLogs();
sendEndOfStream();
}
Fix memory tracking for two-level GROUP BY when not all rows read from Aggregator (TCP) Example of such cases: - SELECT GROUP BY LIMIT - SELECT GROUP BY with subsequent MEMORY_LIMIT_EXCEEDED error And it should be two-level aggregation, since otherwise there will be only one hashtable which will be cleared correctly, only if you have two-level GROUP BY some of hashtables will not be cleared since nobody consume rows. Before this patch: 09:39.015292 [ 3070801 ] {609a0610-e377-4132-9cf3-f49454cf3c96} <Information> executeQuery: Read 1000000 rows, 7.63 MiB in 0.707 sec., 1413826 rows/sec., 10.79 MiB/sec. 09:39.015348 [ 3070801 ] {609a0610-e377-4132-9cf3-f49454cf3c96} <Debug> MemoryTracker: Peak memory usage (for query): 51.93 MiB. 09:39.015942 [ 3070801 ] {} <Trace> Aggregator: Destroying aggregate states <-- **problem** 09:39.017057 [ 3070801 ] {} <Trace> Aggregator: Destroying aggregate states <-- 09:39.017961 [ 3070801 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 51.93 MiB. 09:39.018029 [ 3070801 ] {} <Information> TCPHandler: Processed in 0.711 sec. After this patch: 16:24.544030 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Information> executeQuery: Read 1000000 rows, 7.63 MiB in 0.599 sec., 1670199 rows/sec., 12.74 MiB/sec. 16:24.544084 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Debug> MemoryTracker: Peak memory usage (for query): 72.11 MiB. 16:24.544398 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Trace> Aggregator: Destroying aggregate states 16:24.545485 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Trace> Aggregator: Destroying aggregate states 16:24.547053 [ 3087333 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 72.11 MiB. 16:24.547093 [ 3087333 ] {} <Information> TCPHandler: Processed in 0.603 sec.
2020-05-18 18:13:56 +00:00
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
2012-12-06 17:32:48 +00:00
state.reset();
2023-05-05 18:21:25 +00:00
last_sent_snapshots = ProfileEvents::ThreadIdToCountersSnapshot{};
Fix memory tracking for two-level GROUP BY when not all rows read from Aggregator (TCP) Example of such cases: - SELECT GROUP BY LIMIT - SELECT GROUP BY with subsequent MEMORY_LIMIT_EXCEEDED error And it should be two-level aggregation, since otherwise there will be only one hashtable which will be cleared correctly, only if you have two-level GROUP BY some of hashtables will not be cleared since nobody consume rows. Before this patch: 09:39.015292 [ 3070801 ] {609a0610-e377-4132-9cf3-f49454cf3c96} <Information> executeQuery: Read 1000000 rows, 7.63 MiB in 0.707 sec., 1413826 rows/sec., 10.79 MiB/sec. 09:39.015348 [ 3070801 ] {609a0610-e377-4132-9cf3-f49454cf3c96} <Debug> MemoryTracker: Peak memory usage (for query): 51.93 MiB. 09:39.015942 [ 3070801 ] {} <Trace> Aggregator: Destroying aggregate states <-- **problem** 09:39.017057 [ 3070801 ] {} <Trace> Aggregator: Destroying aggregate states <-- 09:39.017961 [ 3070801 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 51.93 MiB. 09:39.018029 [ 3070801 ] {} <Information> TCPHandler: Processed in 0.711 sec. After this patch: 16:24.544030 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Information> executeQuery: Read 1000000 rows, 7.63 MiB in 0.599 sec., 1670199 rows/sec., 12.74 MiB/sec. 16:24.544084 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Debug> MemoryTracker: Peak memory usage (for query): 72.11 MiB. 16:24.544398 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Trace> Aggregator: Destroying aggregate states 16:24.545485 [ 3087333 ] {79da208a-b3c0-48d4-9943-c974a3cbb6ea} <Trace> Aggregator: Destroying aggregate states 16:24.547053 [ 3087333 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 72.11 MiB. 16:24.547093 [ 3087333 ] {} <Information> TCPHandler: Processed in 0.603 sec.
2020-05-18 18:13:56 +00:00
query_scope.reset();
thread_trace_context.reset();
2012-03-19 12:57:56 +00:00
}
catch (const Exception & e)
2012-03-19 12:57:56 +00:00
{
2023-11-15 23:13:05 +00:00
/// Authentication failure with interserver secret
/// - early exit without trying to send the exception to the client.
/// Because the server should not try to skip (parse, decompress) the remaining packets sent by the client,
/// as it will lead to additional work and unneeded exposure to unauthenticated connections.
/// Note that the exception AUTHENTICATION_FAILED can be here in two cases:
/// 1. The authentication in receiveHello is skipped with "interserver secret",
/// postponed to receiving the query, and then failed.
/// 2. Receiving exception from a query using a table function to authenticate with another server.
/// In this case, the user is already authenticated with this server,
/// is_interserver_mode is false, and we can send the exception to the client normally.
if (is_interserver_mode && e.code() == ErrorCodes::AUTHENTICATION_FAILED)
throw;
state.io.onException();
2022-04-15 23:56:45 +00:00
exception.reset(e.clone());
2012-05-28 19:34:55 +00:00
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
/// If there is UNEXPECTED_PACKET_FROM_CLIENT emulate network_error
/// to break the loop, but do not throw to send the exception to
/// the client.
if (e.code() == ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT)
network_error = true;
/// If a timeout occurred, try to inform client about it and close the session
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
network_error = true;
2022-08-23 14:10:44 +00:00
if (network_error)
LOG_TEST(log, "Going to close connection due to exception: {}", e.message());
2012-05-08 05:42:05 +00:00
}
2013-01-13 22:13:54 +00:00
catch (const Poco::Net::NetException & e)
{
/** We can get here if there was an error during connection to the client,
* or in connection with a remote server that was used to process the request.
* It is not possible to distinguish between these two cases.
* Although in one of them, we have to send exception to the client, but in the other - we can not.
* We will try to send exception to the client in any case - see below.
*/
state.io.onException();
2022-04-15 23:56:45 +00:00
exception = std::make_unique<DB::Exception>(Exception::CreateFromPocoTag{}, e);
2013-01-13 22:13:54 +00:00
}
catch (const Poco::Exception & e)
2012-05-08 05:42:05 +00:00
{
state.io.onException();
2022-04-15 23:56:45 +00:00
exception = std::make_unique<DB::Exception>(Exception::CreateFromPocoTag{}, e);
2012-05-08 05:42:05 +00:00
}
2020-07-17 14:25:13 +00:00
// Server should die on std logic errors in debug, like with assert()
// or ErrorCodes::LOGICAL_ERROR. This helps catch these errors in
// tests.
2022-08-05 11:33:27 +00:00
#ifdef ABORT_ON_LOGICAL_ERROR
catch (const std::logic_error & e)
{
2020-07-17 14:20:33 +00:00
state.io.onException();
2022-04-15 23:56:45 +00:00
exception = std::make_unique<DB::Exception>(Exception::CreateFromSTDTag{}, e);
2020-07-17 14:20:33 +00:00
sendException(*exception, send_exception_with_stack_trace);
2020-07-17 14:25:13 +00:00
std::abort();
}
2020-07-17 14:25:13 +00:00
#endif
2013-01-13 22:13:54 +00:00
catch (const std::exception & e)
2012-05-08 05:42:05 +00:00
{
state.io.onException();
2022-04-15 23:56:45 +00:00
exception = std::make_unique<DB::Exception>(Exception::CreateFromSTDTag{}, e);
2012-05-08 05:42:05 +00:00
}
catch (...)
{
state.io.onException();
exception = std::make_unique<DB::Exception>(Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception"));
2012-03-19 12:57:56 +00:00
}
2013-01-13 22:13:54 +00:00
try
{
if (exception)
{
if (thread_trace_context)
2022-07-07 09:44:32 +00:00
thread_trace_context->root_span.addAttribute(*exception);
try
{
/// Try to send logs to client, but it could be risky too
/// Assume that we can't break output here
sendLogs();
}
catch (...)
{
tryLogCurrentException(log, "Can't send logs to client");
}
const auto & e = *exception;
LOG_ERROR(log, getExceptionMessageAndPattern(e, send_exception_with_stack_trace));
2018-08-24 07:30:53 +00:00
sendException(*exception, send_exception_with_stack_trace);
}
2013-01-13 22:13:54 +00:00
}
catch (...)
{
/** Could not send exception information to the client. */
network_error = true;
LOG_WARNING(log, "Client has gone away.");
2013-01-13 22:13:54 +00:00
}
/// Interserver authentication is done only after we read the query.
/// This fact can be abused by producing exception before or while we read the query.
2023-11-23 12:12:16 +00:00
/// To avoid any potential exploits, we simply close connection on any exceptions
/// that happen before the first query is authenticated with the cluster secret.
if (is_interserver_mode && exception && !is_interserver_authenticated)
exception->rethrow();
try
{
2021-08-01 14:12:34 +00:00
/// A query packet is always followed by one or more data packets.
/// If some of those data packets are left, try to skip them.
if (exception && !state.empty() && !state.read_all_data)
skipData();
}
catch (...)
{
network_error = true;
2021-08-01 14:12:34 +00:00
LOG_WARNING(log, "Can't skip data packets after query failure.");
}
2023-08-15 13:37:09 +00:00
log_query_duration();
2022-08-08 04:55:41 +00:00
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
state.reset();
query_scope.reset();
thread_trace_context.reset();
/// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
query_context.reset();
2022-05-18 19:57:20 +00:00
if (is_interserver_mode)
{
/// We don't really have session in interserver mode, new one is created for each query. It's better to reset it now.
session.reset();
}
if (network_error)
break;
2012-03-19 12:57:56 +00:00
}
2012-03-11 08:52:56 +00:00
}
2021-08-01 14:12:34 +00:00
void TCPHandler::extractConnectionSettingsFromContext(const ContextPtr & context)
{
const auto & settings = context->getSettingsRef();
send_exception_with_stack_trace = settings.calculate_text_stack_trace;
send_timeout = settings.send_timeout;
receive_timeout = settings.receive_timeout;
poll_interval = settings.poll_interval;
idle_connection_timeout = settings.idle_connection_timeout;
interactive_delay = settings.interactive_delay;
sleep_in_send_tables_status = settings.sleep_in_send_tables_status_ms;
unknown_packet_in_send_data = settings.unknown_packet_in_send_data;
sleep_after_receiving_query = settings.sleep_after_receiving_query_ms;
2021-08-01 14:12:34 +00:00
}
bool TCPHandler::readDataNext()
2019-05-28 18:30:10 +00:00
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
2021-08-01 14:12:34 +00:00
/// Poll interval should not be greater than receive_timeout
2022-08-05 11:33:27 +00:00
constexpr UInt64 min_timeout_us = 5000; // 5 ms
UInt64 timeout_us = std::max(min_timeout_us, std::min(poll_interval * 1000000, static_cast<UInt64>(receive_timeout.totalMicroseconds())));
2021-08-01 14:12:34 +00:00
bool read_ok = false;
2019-05-28 18:30:10 +00:00
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (true)
{
2022-08-05 11:33:27 +00:00
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_us))
2021-08-01 14:12:34 +00:00
{
/// If client disconnected.
if (in->eof())
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.is_connection_closed = true;
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
2021-08-01 14:12:34 +00:00
break;
}
/// We accept and process data.
read_ok = receivePacket();
2021-04-06 19:18:45 +00:00
break;
2021-08-01 14:12:34 +00:00
}
2019-05-28 18:30:10 +00:00
/// Do we need to shut down?
if (server.isCancelled())
2021-08-01 14:12:34 +00:00
break;
2019-05-28 18:30:10 +00:00
/** Have we waited for data for too long?
* If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added.
*/
2021-04-29 16:11:20 +00:00
Float64 elapsed = watch.elapsedSeconds();
2021-08-01 14:12:34 +00:00
if (elapsed > static_cast<Float64>(receive_timeout.totalSeconds()))
2019-05-28 18:30:10 +00:00
{
2020-11-10 18:22:26 +00:00
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Timeout exceeded while receiving data from client. Waited for {} seconds, timeout is {} seconds.",
2021-08-01 14:12:34 +00:00
static_cast<size_t>(elapsed), receive_timeout.totalSeconds());
2019-05-28 18:30:10 +00:00
}
}
2021-08-01 14:12:34 +00:00
if (read_ok)
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
{
2021-08-01 14:12:34 +00:00
sendLogs();
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendInsertProfileEvents();
}
2021-08-01 14:12:34 +00:00
else
state.read_all_data = true;
2019-05-28 18:30:10 +00:00
2021-08-01 14:12:34 +00:00
return read_ok;
2019-05-28 18:30:10 +00:00
}
2021-08-01 14:12:34 +00:00
void TCPHandler::readData()
2012-05-21 06:49:05 +00:00
{
2021-08-01 14:12:34 +00:00
sendLogs();
2021-08-01 14:12:34 +00:00
while (readDataNext())
;
2023-02-21 19:04:40 +00:00
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
2023-02-21 19:04:40 +00:00
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
2019-05-28 18:30:10 +00:00
}
2021-08-01 14:12:34 +00:00
void TCPHandler::skipData()
2019-05-28 18:30:10 +00:00
{
2021-08-01 14:12:34 +00:00
state.skipping_data = true;
SCOPE_EXIT({ state.skipping_data = false; });
2021-08-01 14:12:34 +00:00
while (readDataNext())
;
2023-02-21 19:04:40 +00:00
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
2023-02-21 19:04:40 +00:00
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
void TCPHandler::startInsertQuery()
{
/// Send ColumnsDescription for insertion table
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{
const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!table_id.empty())
{
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, query_context);
sendTableColumns(storage_ptr->getInMemoryMetadataPtr()->getColumns());
}
}
}
/// Send block to the client - table structure.
sendData(state.io.pipeline.getHeader());
sendLogs();
}
AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(AsynchronousInsertQueue & insert_queue)
{
using PushResult = AsynchronousInsertQueue::PushResult;
startInsertQuery();
SquashingTransform squashing(0, query_context->getSettingsRef().async_insert_max_data_size);
while (readDataNext())
{
auto result = squashing.add(std::move(state.block_for_insert));
if (result)
{
return PushResult
{
.status = PushResult::TOO_MUCH_DATA,
.insert_block = std::move(result),
};
}
}
auto result = squashing.add({});
return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context);
}
2021-08-01 14:12:34 +00:00
void TCPHandler::processInsertQuery()
{
2021-09-15 19:35:48 +00:00
size_t num_threads = state.io.pipeline.getNumThreads();
2021-09-10 14:52:24 +00:00
auto run_executor = [&](auto & executor, Block processed_data)
{
/// Made above the rest of the lines,
/// so that in case of `start` function throws an exception,
/// client receive exception before sending data.
executor.start();
if (processed_data)
executor.push(std::move(processed_data));
else
startInsertQuery();
2021-09-10 14:52:24 +00:00
while (readDataNext())
executor.push(std::move(state.block_for_insert));
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
2023-02-21 19:04:40 +00:00
executor.cancel();
else
executor.finish();
};
Block processed_block;
const auto & settings = query_context->getSettingsRef();
auto * insert_queue = query_context->getAsynchronousInsertQueue();
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*state.parsed_query);
bool async_insert_enabled = settings.async_insert;
if (insert_query.table_id)
if (auto table = DatabaseCatalog::instance().tryGetTable(insert_query.table_id, query_context))
async_insert_enabled |= table->areAsynchronousInsertsEnabled();
if (insert_queue && async_insert_enabled && !insert_query.select)
{
auto result = processAsyncInsertQuery(*insert_queue);
if (result.status == AsynchronousInsertQueue::PushResult::OK)
{
if (settings.wait_for_async_insert)
{
size_t timeout_ms = settings.wait_for_async_insert_timeout.totalMilliseconds();
auto wait_status = result.future.wait_for(std::chrono::milliseconds(timeout_ms));
if (wait_status == std::future_status::deferred)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: got future in deferred state");
if (wait_status == std::future_status::timeout)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout ({} ms) exceeded)", timeout_ms);
result.future.get();
}
sendInsertProfileEvents();
return;
}
else if (result.status == AsynchronousInsertQueue::PushResult::TOO_MUCH_DATA)
{
LOG_DEBUG(log, "Setting async_insert=1, but INSERT query will be executed synchronously because it has too much data");
processed_block = std::move(result.insert_block);
}
}
if (num_threads > 1)
{
PushingAsyncPipelineExecutor executor(state.io.pipeline);
run_executor(executor, std::move(processed_block));
}
2021-09-10 14:52:24 +00:00
else
{
2021-09-15 19:35:48 +00:00
PushingPipelineExecutor executor(state.io.pipeline);
run_executor(executor, processed_block);
2021-09-10 14:52:24 +00:00
}
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendInsertProfileEvents();
2012-05-21 06:49:05 +00:00
}
void TCPHandler::processOrdinaryQueryWithProcessors()
2019-03-26 18:28:37 +00:00
{
auto & pipeline = state.io.pipeline;
2021-02-05 10:15:02 +00:00
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
2022-12-07 17:00:10 +00:00
{
std::lock_guard lock(task_callback_mutex);
2021-02-05 10:15:02 +00:00
sendPartUUIDs();
2022-12-07 17:00:10 +00:00
}
2019-03-26 18:28:37 +00:00
/// Send header-block, to allow client to prepare output format for data to send.
{
2020-05-14 21:03:38 +00:00
const auto & header = pipeline.getHeader();
2019-03-26 18:28:37 +00:00
if (header)
2022-12-07 17:00:10 +00:00
{
std::lock_guard lock(task_callback_mutex);
2019-03-26 18:28:37 +00:00
sendData(header);
2022-12-07 17:00:10 +00:00
}
2019-03-26 18:28:37 +00:00
}
/// Defer locking to cover a part of the scope below and everything after it
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
2019-03-26 18:28:37 +00:00
{
2023-10-21 01:14:22 +00:00
PullingAsyncPipelineExecutor executor(pipeline);
2020-05-14 21:03:38 +00:00
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
2020-05-14 21:03:38 +00:00
Block block;
2021-08-01 14:12:34 +00:00
while (executor.pull(block, interactive_delay / 1000))
2019-03-26 18:28:37 +00:00
{
std::unique_lock lock(task_callback_mutex);
2021-04-10 02:21:18 +00:00
auto cancellation_status = getQueryCancellationStatus();
if (cancellation_status == CancellationStatus::FULLY_CANCELLED)
2019-03-26 18:28:37 +00:00
{
/// Several callback like callback for parallel reading could be called from inside the pipeline
/// and we have to unlock the mutex from our side to prevent deadlock.
lock.unlock();
2020-01-23 10:04:18 +00:00
/// A packet was received requesting to stop execution of the request.
2020-05-14 21:03:38 +00:00
executor.cancel();
2020-01-23 10:04:18 +00:00
break;
}
else if (cancellation_status == CancellationStatus::READ_CANCELLED)
{
executor.cancelReading();
}
2019-03-26 18:28:37 +00:00
2021-08-01 14:12:34 +00:00
if (after_send_progress.elapsed() / 1000 >= interactive_delay)
2020-01-23 10:04:18 +00:00
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendSelectProfileEvents();
2020-01-23 10:04:18 +00:00
}
2019-03-26 18:28:37 +00:00
2020-01-23 10:04:18 +00:00
sendLogs();
2019-04-05 10:52:07 +00:00
2020-05-14 21:03:38 +00:00
if (block)
2020-01-23 10:04:18 +00:00
{
if (!state.io.null_format)
sendData(block);
2019-03-26 18:28:37 +00:00
}
2020-01-23 10:04:18 +00:00
}
2019-03-26 18:28:37 +00:00
/// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together
/// with sendProgress() out of the scope
progress_lock.lock();
2020-01-23 10:04:18 +00:00
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (getQueryCancellationStatus() != CancellationStatus::FULLY_CANCELLED)
2020-01-23 10:04:18 +00:00
{
2020-05-14 21:03:38 +00:00
sendTotals(executor.getTotalsBlock());
sendExtremes(executor.getExtremesBlock());
sendProfileInfo(executor.getProfileInfo());
2020-01-23 10:04:18 +00:00
sendProgress();
sendLogs();
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendSelectProfileEvents();
2019-05-14 12:53:20 +00:00
}
2020-01-23 10:04:18 +00:00
if (state.is_connection_closed)
return;
2020-01-23 10:04:18 +00:00
sendData({});
last_sent_snapshots.clear();
2019-03-26 18:28:37 +00:00
}
2020-03-03 19:01:20 +00:00
sendProgress();
2019-03-26 18:28:37 +00:00
}
2012-07-21 07:02:55 +00:00
void TCPHandler::processTablesStatusRequest()
{
TablesStatusRequest request;
2020-09-17 12:15:05 +00:00
request.read(*in, client_tcp_protocol_version);
2022-08-04 16:04:06 +00:00
ContextPtr context_to_resolve_table_names;
if (is_interserver_mode)
{
/// In interserver mode session context does not exists, because authentication is done for each query.
/// We also cannot create query context earlier, because it cannot be created before authentication,
/// but query is not received yet. So we have to do this trick.
ContextMutablePtr fake_interserver_context = Context::createCopy(server.context());
if (!default_database.empty())
fake_interserver_context->setCurrentDatabase(default_database);
context_to_resolve_table_names = fake_interserver_context;
}
else
{
2022-08-04 16:04:06 +00:00
assert(session);
context_to_resolve_table_names = session->sessionContext();
}
TablesStatusResponse response;
for (const QualifiedTableName & table_name: request.tables)
{
2021-08-01 14:12:34 +00:00
auto resolved_id = context_to_resolve_table_names->tryResolveStorageID({table_name.database, table_name.table});
StoragePtr table = DatabaseCatalog::instance().tryGetTable(resolved_id, context_to_resolve_table_names);
if (!table)
continue;
TableStatus status;
if (auto * replicated_table = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
status.is_replicated = true;
status.absolute_delay = static_cast<UInt32>(replicated_table->getAbsoluteDelay());
}
else
2023-02-19 22:15:09 +00:00
status.is_replicated = false;
response.table_states_by_id.emplace(table_name, std::move(status));
}
2021-02-17 17:34:52 +00:00
writeVarUInt(Protocol::Server::TablesStatusResponse, *out);
2021-01-19 19:21:06 +00:00
/// For testing hedged requests
if (unlikely(sleep_in_send_tables_status.totalMilliseconds()))
2021-01-19 19:21:06 +00:00
{
2021-02-17 17:34:52 +00:00
out->next();
2021-08-01 14:12:34 +00:00
std::chrono::milliseconds ms(sleep_in_send_tables_status.totalMilliseconds());
2021-03-22 21:21:52 +00:00
std::this_thread::sleep_for(ms);
2021-01-19 19:21:06 +00:00
}
2020-09-17 12:15:05 +00:00
response.write(*out, client_tcp_protocol_version);
}
2019-09-03 13:55:26 +00:00
void TCPHandler::receiveUnexpectedTablesStatusRequest()
{
TablesStatusRequest skip_request;
2020-09-17 12:15:05 +00:00
skip_request.read(*in, client_tcp_protocol_version);
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet TablesStatusRequest received from client");
}
void TCPHandler::sendPartUUIDs()
{
auto uuids = query_context->getPartUUIDs()->get();
if (!uuids.empty())
{
for (const auto & uuid : uuids)
LOG_TRACE(log, "Sending UUID: {}", toString(uuid));
writeVarUInt(Protocol::Server::PartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->next();
}
}
2021-03-22 17:12:31 +00:00
2021-04-10 02:21:18 +00:00
void TCPHandler::sendReadTaskRequestAssumeLocked()
2021-03-22 17:12:31 +00:00
{
writeVarUInt(Protocol::Server::ReadTaskRequest, *out);
2021-03-22 17:12:31 +00:00
out->next();
}
void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement)
2023-02-03 13:34:18 +00:00
{
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
2023-02-03 13:34:18 +00:00
announcement.serialize(*out);
out->next();
}
void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request)
{
writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out);
request.serialize(*out);
out->next();
}
2021-10-15 20:18:20 +00:00
void TCPHandler::sendProfileInfo(const ProfileInfo & info)
2013-05-22 14:57:43 +00:00
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
2019-03-26 18:28:37 +00:00
info.write(*out);
out->next();
2013-05-22 14:57:43 +00:00
}
2019-03-26 18:28:37 +00:00
void TCPHandler::sendTotals(const Block & totals)
{
if (totals)
{
initBlockOutput(totals);
writeVarUInt(Protocol::Server::Totals, *out);
writeStringBinary("", *out);
state.block_out->write(totals);
state.maybe_compressed_out->next();
out->next();
}
}
2019-03-26 18:28:37 +00:00
void TCPHandler::sendExtremes(const Block & extremes)
{
if (extremes)
{
initBlockOutput(extremes);
writeVarUInt(Protocol::Server::Extremes, *out);
writeStringBinary("", *out);
state.block_out->write(extremes);
state.maybe_compressed_out->next();
out->next();
}
}
2021-08-30 11:04:59 +00:00
void TCPHandler::sendProfileEvents()
{
2022-03-01 07:54:23 +00:00
Block block;
ProfileEvents::getProfileEvents(host_name, state.profile_queue, block, last_sent_snapshots);
2022-03-05 06:22:56 +00:00
if (block.rows() != 0)
2021-09-07 12:07:24 +00:00
{
2021-09-02 14:27:19 +00:00
initProfileEventsBlockOutput(block);
writeVarUInt(Protocol::Server::ProfileEvents, *out);
writeStringBinary("", *out);
state.profile_events_block_out->write(block);
out->next();
}
2021-08-30 11:04:59 +00:00
}
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
void TCPHandler::sendSelectProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
return;
sendProfileEvents();
}
void TCPHandler::sendInsertProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT)
return;
if (query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
return;
Send profile events for INSERT queries (previously only SELECT was supported) Reproducer: echo "1" | clickhouse-client --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 However, clickhouse-local is differnt, it does sent the periodically, but only if query was long enough, i.e.: # yes | head -n100000 | clickhouse-local --query="insert into function null('foo String') format TSV" --print-profile-events --profile-events-delay-ms=-1 [s1.ch] 2022.05.20 15:20:27 [ 0 ] ContextLock: 10 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] DiskReadElapsedMicroseconds: 29 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocBytes: 200000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] IOBufferAllocs: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertQuery: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedBytes: 1000000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] InsertedRows: 100000 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] MemoryTrackerUsage: 1521975 (gauge) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSCPUVirtualTimeMicroseconds: 102148 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSReadChars: 135700 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] OSWriteChars: 8 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] Query: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RWLockAcquiredReadLocks: 2 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorRead: 5 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] ReadBufferFromFileDescriptorReadBytes: 134464 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] RealTimeMicroseconds: 293747 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] SoftPageFaults: 382 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] TableFunctionExecute: 1 (increment) [s1.ch] 2022.05.20 15:20:27 [ 0 ] UserTimeMicroseconds: 102148 (increment) v2: Proper support ProfileEvents in INSERTs (with protocol change) v3: Receive profile events on INSERT queries Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-20 15:19:40 +00:00
sendProfileEvents();
}
2021-08-30 11:04:59 +00:00
void TCPHandler::sendTimezone()
{
2023-04-12 15:45:11 +00:00
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES)
return;
const String & tz = query_context->getSettingsRef().session_timezone.value;
2023-04-12 15:45:11 +00:00
LOG_DEBUG(log, "TCPHandler::sendTimezone(): {}", tz);
writeVarUInt(Protocol::Server::TimezoneUpdate, *out);
writeStringBinary(tz, *out);
out->next();
}
2020-12-02 21:05:51 +00:00
bool TCPHandler::receiveProxyHeader()
{
if (in->eof())
{
LOG_WARNING(log, "Client has not sent any data.");
return false;
}
String forwarded_address;
/// Only PROXYv1 is supported.
/// Validation of protocol is not fully performed.
2023-02-22 16:54:35 +00:00
LimitReadBuffer limit_in(*in, 107, /* trow_exception */ true, /* exact_limit */ {}); /// Maximum length from the specs.
2020-12-02 21:05:51 +00:00
assertString("PROXY ", limit_in);
if (limit_in.eof())
{
LOG_WARNING(log, "Incomplete PROXY header is received.");
return false;
}
/// TCP4 / TCP6 / UNKNOWN
if ('T' == *limit_in.position())
{
assertString("TCP", limit_in);
if (limit_in.eof())
{
LOG_WARNING(log, "Incomplete PROXY header is received.");
return false;
}
if ('4' != *limit_in.position() && '6' != *limit_in.position())
{
LOG_WARNING(log, "Unexpected protocol in PROXY header is received.");
return false;
}
++limit_in.position();
assertChar(' ', limit_in);
/// Read the first field and ignore other.
readStringUntilWhitespace(forwarded_address, limit_in);
/// Skip until \r\n
while (!limit_in.eof() && *limit_in.position() != '\r')
++limit_in.position();
assertString("\r\n", limit_in);
}
else if (checkString("UNKNOWN", limit_in))
{
/// This is just a health check, there is no subsequent data in this connection.
while (!limit_in.eof() && *limit_in.position() != '\r')
++limit_in.position();
assertString("\r\n", limit_in);
return false;
}
else
{
LOG_WARNING(log, "Unexpected protocol in PROXY header is received.");
return false;
}
LOG_TRACE(log, "Forwarded client address from PROXY header: {}", forwarded_address);
2022-05-21 00:05:02 +00:00
forwarded_for = std::move(forwarded_address);
2020-12-02 21:05:51 +00:00
return true;
}
namespace
{
std::string formatHTTPErrorResponseWhenUserIsConnectedToWrongPort(const Poco::Util::AbstractConfiguration& config)
{
std::string result = fmt::format(
"HTTP/1.0 400 Bad Request\r\n\r\n"
"Port {} is for clickhouse-client program\r\n",
config.getString("tcp_port"));
if (config.has("http_port"))
{
result += fmt::format(
"You must use port {} for HTTP.\r\n",
config.getString("http_port"));
}
return result;
}
[[ maybe_unused ]] String createChallenge()
{
#if USE_SSL
pcg64_fast rng(randomSeed());
UInt64 rand = rng();
return encodeSHA256(&rand, sizeof(rand));
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Can't generate challenge, because ClickHouse was built without OpenSSL");
#endif
}
}
2022-05-21 00:05:02 +00:00
std::unique_ptr<Session> TCPHandler::makeSession()
{
auto interface = is_interserver_mode ? ClientInfo::Interface::TCP_INTERSERVER : ClientInfo::Interface::TCP;
auto res = std::make_unique<Session>(server.context(), interface, socket().secure(), certificate);
2022-05-21 00:05:02 +00:00
res->setForwardedFor(forwarded_for);
res->setClientName(client_name);
res->setClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
res->setConnectionClientVersion(client_version_major, client_version_minor, client_version_patch, client_tcp_protocol_version);
res->setClientInterface(interface);
2022-05-21 00:05:02 +00:00
return res;
}
String TCPHandler::prepareStringForSshValidation(String username, String challenge)
{
String output;
output.append(std::to_string(client_tcp_protocol_version));
output.append(default_database);
output.append(username);
output.append(challenge);
return output;
}
2012-05-21 06:49:05 +00:00
void TCPHandler::receiveHello()
2012-05-16 18:03:00 +00:00
{
2017-03-09 00:56:38 +00:00
/// Receive `hello` packet.
2012-05-16 18:03:00 +00:00
UInt64 packet_type = 0;
String user;
2013-08-10 09:04:45 +00:00
String password;
String default_db;
2012-05-21 06:49:05 +00:00
readVarUInt(packet_type, *in);
2012-05-16 18:03:00 +00:00
if (packet_type != Protocol::Client::Hello)
{
2017-03-09 00:56:38 +00:00
/** If you accidentally accessed the HTTP protocol for a port destined for an internal TCP protocol,
* Then instead of the packet type, there will be G (GET) or P (POST), in most cases.
*/
if (packet_type == 'G' || packet_type == 'P')
{
writeString(formatHTTPErrorResponseWhenUserIsConnectedToWrongPort(server.config()), *out);
throw Exception(ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT, "Client has connected to wrong port");
}
else
2023-04-01 16:23:59 +00:00
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT,
"Unexpected packet from client (expected Hello, got {})", packet_type);
}
2012-05-21 06:49:05 +00:00
readStringBinary(client_name, *in);
readVarUInt(client_version_major, *in);
readVarUInt(client_version_minor, *in);
// NOTE For backward compatibility of the protocol, client cannot send its version_patch.
2020-09-17 12:15:05 +00:00
readVarUInt(client_tcp_protocol_version, *in);
readStringBinary(default_db, *in);
if (!default_db.empty())
default_database = default_db;
2016-07-31 03:53:16 +00:00
readStringBinary(user, *in);
readStringBinary(password, *in);
if (user.empty())
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet from client (no user in Hello package)");
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.",
2020-05-23 22:21:29 +00:00
client_name,
client_version_major, client_version_minor, client_version_patch,
2020-09-17 12:15:05 +00:00
client_tcp_protocol_version,
2020-05-23 22:21:29 +00:00
(!default_database.empty() ? ", database: " + default_database : ""),
(!user.empty() ? ", user: " + user : "")
);
is_interserver_mode = (user == EncodedUserInfo::USER_INTERSERVER_MARKER) && password.empty();
2021-08-01 14:12:34 +00:00
if (is_interserver_mode)
{
if (client_tcp_protocol_version < DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2)
LOG_WARNING(LogFrequencyLimiter(log, 10),
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster.");
receiveClusterNameAndSalt();
2021-08-01 14:12:34 +00:00
return;
}
2021-08-01 14:12:34 +00:00
is_ssh_based_auth = startsWith(user, EncodedUserInfo::SSH_KEY_AUTHENTICAION_MARKER) && password.empty();
if (is_ssh_based_auth)
{
user.erase(0, String(EncodedUserInfo::SSH_KEY_AUTHENTICAION_MARKER).size());
}
2022-05-21 00:05:02 +00:00
session = makeSession();
const auto & client_info = session->getClientInfo();
2023-03-15 00:09:29 +00:00
#if USE_SSL
2023-03-14 22:49:10 +00:00
/// Authentication with SSL user certificate
if (dynamic_cast<Poco::Net::SecureStreamSocketImpl*>(socket().impl()))
{
Poco::Net::SecureStreamSocket secure_socket(socket());
if (secure_socket.havePeerCertificate())
{
try
{
session->authenticate(
SSLCertificateCredentials{user, secure_socket.peerCertificate().commonName()},
getClientAddress(client_info));
return;
}
catch (const Exception & e)
{
if (e.code() != DB::ErrorCodes::AUTHENTICATION_FAILED)
throw;
tryLogCurrentException(log, "SSL authentication failed, falling back to password authentication");
}
}
}
/// Perform handshake for SSH authentication
if (is_ssh_based_auth)
{
if (session->getAuthenticationTypeOrLogInFailure(user) != AuthenticationType::SSH_KEY)
throw Exception(ErrorCodes::AUTHENTICATION_FAILED, "Expected authentication with SSH key");
if (client_tcp_protocol_version < DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot authenticate user with SSH key, because client version is too old");
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::SSHChallengeRequest)
throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Server expected to receive a packet for requesting a challenge string");
auto challenge = createChallenge();
writeVarUInt(Protocol::Server::SSHChallenge, *out);
writeStringBinary(challenge, *out);
out->next();
String signature;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::SSHChallengeResponse)
throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Server expected to receive a packet with a response for a challenge");
readStringBinary(signature, *in);
auto cred = SshCredentials(user, signature, prepareStringForSshValidation(user, challenge));
session->authenticate(cred, getClientAddress(client_info));
return;
}
2023-03-15 00:09:29 +00:00
#endif
session->authenticate(user, password, getClientAddress(client_info));
2012-05-16 18:03:00 +00:00
}
2022-08-03 19:44:08 +00:00
void TCPHandler::receiveAddendum()
{
2022-08-03 20:36:52 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY)
readStringBinary(quota_key, *in);
2023-07-30 22:09:03 +00:00
if (!is_interserver_mode)
session->setQuotaClientKey(quota_key);
2022-08-03 19:44:08 +00:00
}
2012-05-16 18:03:00 +00:00
void TCPHandler::receiveUnexpectedHello()
{
UInt64 skip_uint_64;
String skip_string;
readStringBinary(skip_string, *in);
readVarUInt(skip_uint_64, *in);
readVarUInt(skip_uint_64, *in);
2020-04-15 01:58:10 +00:00
readVarUInt(skip_uint_64, *in);
readStringBinary(skip_string, *in);
readStringBinary(skip_string, *in);
readStringBinary(skip_string, *in);
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Hello received from client");
}
2012-05-21 06:49:05 +00:00
void TCPHandler::sendHello()
2012-03-11 08:52:56 +00:00
{
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Server::Hello, *out);
2023-08-09 03:02:50 +00:00
writeStringBinary(VERSION_NAME, *out);
writeVarUInt(VERSION_MAJOR, *out);
writeVarUInt(VERSION_MINOR, *out);
2020-09-17 12:15:05 +00:00
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
writeStringBinary(DateLUT::instance().getTimeZone(), *out);
2020-09-17 12:15:05 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
2024-02-02 14:04:27 +00:00
writeStringBinary(server_display_name, *out);
2020-09-17 12:15:05 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
2023-08-09 03:02:50 +00:00
writeVarUInt(VERSION_PATCH, *out);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES)
{
auto rules = server.context()->getAccessControl().getPasswordComplexityRules();
writeVarUInt(rules.size(), *out);
for (const auto & [original_pattern, exception_message] : rules)
{
writeStringBinary(original_pattern, *out);
writeStringBinary(exception_message, *out);
}
}
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2)
{
chassert(!nonce.has_value());
/// Contains lots of stuff (including time), so this should be enough for NONCE.
nonce.emplace(thread_local_rng());
writeIntBinary(nonce.value(), *out);
}
2012-05-21 06:49:05 +00:00
out->next();
2012-03-11 08:52:56 +00:00
}
2012-05-17 19:15:53 +00:00
2012-05-21 06:49:05 +00:00
bool TCPHandler::receivePacket()
2012-03-11 08:52:56 +00:00
{
UInt64 packet_type = 0;
2021-04-06 19:18:45 +00:00
readVarUInt(packet_type, *in);
2021-03-19 21:49:18 +00:00
switch (packet_type)
{
case Protocol::Client::IgnoredPartUUIDs:
/// Part uuids packet if any comes before query.
2021-08-01 14:12:34 +00:00
if (!state.empty() || state.part_uuids_to_ignore)
receiveUnexpectedIgnoredPartUUIDs();
receiveIgnoredPartUUIDs();
return true;
2021-08-01 14:12:34 +00:00
case Protocol::Client::Query:
if (!state.empty())
receiveUnexpectedQuery();
receiveQuery();
return true;
case Protocol::Client::Data:
2019-10-19 20:36:35 +00:00
case Protocol::Client::Scalar:
2021-08-01 14:12:34 +00:00
if (state.skipping_data)
return receiveUnexpectedData(false);
if (state.empty())
2021-08-01 14:12:34 +00:00
receiveUnexpectedData(true);
2019-10-19 20:36:35 +00:00
return receiveData(packet_type == Protocol::Client::Scalar);
case Protocol::Client::Ping:
writeVarUInt(Protocol::Server::Pong, *out);
out->next();
return false;
case Protocol::Client::Cancel:
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the query.");
return false;
case Protocol::Client::Hello:
receiveUnexpectedHello();
case Protocol::Client::TablesStatusRequest:
if (!state.empty())
2019-09-03 13:55:26 +00:00
receiveUnexpectedTablesStatusRequest();
processTablesStatusRequest();
out->next();
return false;
default:
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet {} from client", toString(packet_type));
2012-03-09 15:46:52 +00:00
}
}
2023-03-15 19:53:58 +00:00
void TCPHandler::receiveIgnoredPartUUIDs()
{
2021-08-01 14:12:34 +00:00
readVectorBinary(state.part_uuids_to_ignore.emplace(), *in);
}
2023-03-15 19:53:58 +00:00
2021-08-01 14:12:34 +00:00
void TCPHandler::receiveUnexpectedIgnoredPartUUIDs()
{
std::vector<UUID> skip_part_uuids;
readVectorBinary(skip_part_uuids, *in);
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet IgnoredPartUUIDs received from client");
}
2023-03-15 19:53:58 +00:00
2021-04-13 10:59:02 +00:00
String TCPHandler::receiveReadTaskResponseAssumeLocked()
{
UInt64 packet_type = 0;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::ReadTaskResponse)
2021-04-10 02:21:18 +00:00
{
if (packet_type == Protocol::Client::Cancel)
{
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the read task.");
2021-04-10 02:21:18 +00:00
return {};
}
else
{
throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Received {} packet after requesting read task",
Protocol::Client::toString(packet_type));
2021-04-10 02:21:18 +00:00
}
}
UInt64 version;
readVarUInt(version, *in);
if (version != DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol version for distributed processing mismatched");
String response;
readStringBinary(response, *in);
return response;
}
2023-03-16 14:23:17 +00:00
2023-02-03 13:34:18 +00:00
std::optional<ParallelReadResponse> TCPHandler::receivePartitionMergeTreeReadTaskResponseAssumeLocked()
{
UInt64 packet_type = 0;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::MergeTreeReadTaskResponse)
{
if (packet_type == Protocol::Client::Cancel)
{
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the MergeTree read task.");
return std::nullopt;
}
else
{
throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Received {} packet after requesting read task",
Protocol::Client::toString(packet_type));
}
}
2023-02-03 13:34:18 +00:00
ParallelReadResponse response;
response.deserialize(*in);
return response;
}
2023-03-15 19:53:58 +00:00
void TCPHandler::receiveClusterNameAndSalt()
{
2021-04-06 19:18:45 +00:00
readStringBinary(cluster, *in);
readStringBinary(salt, *in, 32);
}
2012-05-17 19:15:53 +00:00
2012-05-21 06:49:05 +00:00
void TCPHandler::receiveQuery()
2012-03-11 08:52:56 +00:00
{
UInt64 stage = 0;
UInt64 compression = 0;
state.is_empty = false;
2016-07-31 03:53:16 +00:00
readStringBinary(state.query_id, *in);
2022-05-17 16:22:52 +00:00
/// In interserver mode,
Do not allow to reuse previous credentials in case of inter-server secret Before this patch INSERT via Buffer/Kafka may re-use previously set user for that connection, while this is not correct, it should reset the user, and use global context. Note, before [1] there was a fallback to default user, but that code had been removed, and now it got back. [1]: 0159c74f217ec764060c480819e3ccc9d5a99a63 ("Secure inter-cluster query execution (with initial_user as current query user) [v3]") Also note, that context for Buffer table (and others) cannot be changed, since they don't have any user only profile. I've tested this patch manually using the following: create table dist (key Int) engine=Distributed(test_cluster_two_shards_secure, default, data, key); create table buffer (key Int) engine=Buffer(default, dist, 1, 0, 0, 0, 0, 0, 0); create table data (key Int) engine=Memory(); # to start the connection with readonly user $ clickhouse-client --user readonly -q 'select * from dist' $ clickhouse-client -q 'insert into buffer values (1)' # before this patch this produces errors like: # 2021.09.27 23:46:48.384920 [ 19474 ] {} <Error> default.dist.DirectoryMonitor: Code: 164. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: readonly: Cannot execute query in readonly mode. Stack trace: v2: reset the authentication instead of using default user (as suggested by @vitlibar) v3: reset Session::user and introduce ClientInfo::resetAuthentication (as suggested by @vitlibar) v4: reset the session every time in interserver mode (suggested by @vitlibar)
2021-09-30 07:15:38 +00:00
/// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka,
2022-04-17 23:02:49 +00:00
/// (i.e. when the INSERT is done with the global context without user),
Do not allow to reuse previous credentials in case of inter-server secret Before this patch INSERT via Buffer/Kafka may re-use previously set user for that connection, while this is not correct, it should reset the user, and use global context. Note, before [1] there was a fallback to default user, but that code had been removed, and now it got back. [1]: 0159c74f217ec764060c480819e3ccc9d5a99a63 ("Secure inter-cluster query execution (with initial_user as current query user) [v3]") Also note, that context for Buffer table (and others) cannot be changed, since they don't have any user only profile. I've tested this patch manually using the following: create table dist (key Int) engine=Distributed(test_cluster_two_shards_secure, default, data, key); create table buffer (key Int) engine=Buffer(default, dist, 1, 0, 0, 0, 0, 0, 0); create table data (key Int) engine=Memory(); # to start the connection with readonly user $ clickhouse-client --user readonly -q 'select * from dist' $ clickhouse-client -q 'insert into buffer values (1)' # before this patch this produces errors like: # 2021.09.27 23:46:48.384920 [ 19474 ] {} <Error> default.dist.DirectoryMonitor: Code: 164. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: readonly: Cannot execute query in readonly mode. Stack trace: v2: reset the authentication instead of using default user (as suggested by @vitlibar) v3: reset Session::user and introduce ClientInfo::resetAuthentication (as suggested by @vitlibar) v4: reset the session every time in interserver mode (suggested by @vitlibar)
2021-09-30 07:15:38 +00:00
/// so it is better to reset session to avoid using old user.
if (is_interserver_mode)
{
2022-05-21 00:05:02 +00:00
session = makeSession();
Do not allow to reuse previous credentials in case of inter-server secret Before this patch INSERT via Buffer/Kafka may re-use previously set user for that connection, while this is not correct, it should reset the user, and use global context. Note, before [1] there was a fallback to default user, but that code had been removed, and now it got back. [1]: 0159c74f217ec764060c480819e3ccc9d5a99a63 ("Secure inter-cluster query execution (with initial_user as current query user) [v3]") Also note, that context for Buffer table (and others) cannot be changed, since they don't have any user only profile. I've tested this patch manually using the following: create table dist (key Int) engine=Distributed(test_cluster_two_shards_secure, default, data, key); create table buffer (key Int) engine=Buffer(default, dist, 1, 0, 0, 0, 0, 0, 0); create table data (key Int) engine=Memory(); # to start the connection with readonly user $ clickhouse-client --user readonly -q 'select * from dist' $ clickhouse-client -q 'insert into buffer values (1)' # before this patch this produces errors like: # 2021.09.27 23:46:48.384920 [ 19474 ] {} <Error> default.dist.DirectoryMonitor: Code: 164. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: readonly: Cannot execute query in readonly mode. Stack trace: v2: reset the authentication instead of using default user (as suggested by @vitlibar) v3: reset Session::user and introduce ClientInfo::resetAuthentication (as suggested by @vitlibar) v4: reset the session every time in interserver mode (suggested by @vitlibar)
2021-09-30 07:15:38 +00:00
}
2021-08-01 14:12:34 +00:00
/// Read client info.
ClientInfo client_info = session->getClientInfo();
2020-09-17 12:15:05 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
2023-08-22 03:52:57 +00:00
{
2020-09-17 12:15:05 +00:00
client_info.read(*in, client_tcp_protocol_version);
2023-08-30 06:14:39 +00:00
correctQueryClientInfo(session->getClientInfo(), client_info);
const auto & config_ref = Context::getGlobalContextInstance()->getServerSettings();
if (config_ref.validate_tcp_client_information)
validateClientInfo(session->getClientInfo(), client_info);
2023-08-22 03:52:57 +00:00
}
/// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS)
? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY;
Settings passed_settings;
passed_settings.read(*in, settings_format);
/// Interserver secret.
std::string received_hash;
2020-09-17 14:55:41 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
{
readStringBinary(received_hash, *in, 32);
}
readVarUInt(stage, *in);
state.stage = QueryProcessingStage::Enum(stage);
readVarUInt(compression, *in);
state.compression = static_cast<Protocol::Compression>(compression);
2021-08-01 14:12:34 +00:00
last_block_in.compression = state.compression;
readStringBinary(state.query, *in);
Settings passed_params;
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
passed_params.read(*in, settings_format);
2021-08-01 14:12:34 +00:00
if (is_interserver_mode)
{
2022-05-18 12:06:52 +00:00
client_info.interface = ClientInfo::Interface::TCP_INTERSERVER;
#if USE_SSL
String cluster_secret;
try
{
cluster_secret = server.context()->getCluster(cluster)->getSecret();
}
catch (const Exception & e)
{
auto exception = Exception::createRuntime(ErrorCodes::AUTHENTICATION_FAILED, e.message());
session->onAuthenticationFailure(/* user_name= */ std::nullopt, socket().peerAddress(), exception);
throw exception; /// NOLINT
}
2022-05-18 12:06:52 +00:00
if (salt.empty() || cluster_secret.empty())
2022-05-17 16:22:52 +00:00
{
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED, "Interserver authentication failed (no salt/cluster secret)");
session->onAuthenticationFailure(/* user_name= */ std::nullopt, socket().peerAddress(), exception);
throw exception; /// NOLINT
}
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 && !nonce.has_value())
{
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED, "Interserver authentication failed (no nonce)");
session->onAuthenticationFailure(/* user_name= */ std::nullopt, socket().peerAddress(), exception);
2022-05-18 19:57:20 +00:00
throw exception; /// NOLINT
2022-05-17 16:22:52 +00:00
}
std::string data(salt);
// For backward compatibility
if (nonce.has_value())
data += std::to_string(nonce.value());
data += cluster_secret;
data += state.query;
data += state.query_id;
data += client_info.initial_user;
std::string calculated_hash = encodeSHA256(data);
2022-05-17 16:22:52 +00:00
assert(calculated_hash.size() == 32);
2022-05-17 16:22:52 +00:00
/// TODO maybe also check that peer address actually belongs to the cluster?
if (calculated_hash != received_hash)
2022-05-17 16:22:52 +00:00
{
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED, "Interserver authentication failed");
2022-05-23 19:55:17 +00:00
session->onAuthenticationFailure(/* user_name */ std::nullopt, socket().peerAddress(), exception);
2022-05-18 19:57:20 +00:00
throw exception; /// NOLINT
2022-05-17 16:22:52 +00:00
}
2022-05-18 12:06:52 +00:00
/// NOTE Usually we get some fields of client_info (including initial_address and initial_user) from user input,
/// so we should not rely on that. However, in this particular case we got client_info from other clickhouse-server, so it's ok.
Do not allow to reuse previous credentials in case of inter-server secret Before this patch INSERT via Buffer/Kafka may re-use previously set user for that connection, while this is not correct, it should reset the user, and use global context. Note, before [1] there was a fallback to default user, but that code had been removed, and now it got back. [1]: 0159c74f217ec764060c480819e3ccc9d5a99a63 ("Secure inter-cluster query execution (with initial_user as current query user) [v3]") Also note, that context for Buffer table (and others) cannot be changed, since they don't have any user only profile. I've tested this patch manually using the following: create table dist (key Int) engine=Distributed(test_cluster_two_shards_secure, default, data, key); create table buffer (key Int) engine=Buffer(default, dist, 1, 0, 0, 0, 0, 0, 0); create table data (key Int) engine=Memory(); # to start the connection with readonly user $ clickhouse-client --user readonly -q 'select * from dist' $ clickhouse-client -q 'insert into buffer values (1)' # before this patch this produces errors like: # 2021.09.27 23:46:48.384920 [ 19474 ] {} <Error> default.dist.DirectoryMonitor: Code: 164. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: readonly: Cannot execute query in readonly mode. Stack trace: v2: reset the authentication instead of using default user (as suggested by @vitlibar) v3: reset Session::user and introduce ClientInfo::resetAuthentication (as suggested by @vitlibar) v4: reset the session every time in interserver mode (suggested by @vitlibar)
2021-09-30 07:15:38 +00:00
if (client_info.initial_user.empty())
{
LOG_DEBUG(log, "User (no user, interserver mode) (client: {})", getClientAddress(client_info).toString());
Do not allow to reuse previous credentials in case of inter-server secret Before this patch INSERT via Buffer/Kafka may re-use previously set user for that connection, while this is not correct, it should reset the user, and use global context. Note, before [1] there was a fallback to default user, but that code had been removed, and now it got back. [1]: 0159c74f217ec764060c480819e3ccc9d5a99a63 ("Secure inter-cluster query execution (with initial_user as current query user) [v3]") Also note, that context for Buffer table (and others) cannot be changed, since they don't have any user only profile. I've tested this patch manually using the following: create table dist (key Int) engine=Distributed(test_cluster_two_shards_secure, default, data, key); create table buffer (key Int) engine=Buffer(default, dist, 1, 0, 0, 0, 0, 0, 0); create table data (key Int) engine=Memory(); # to start the connection with readonly user $ clickhouse-client --user readonly -q 'select * from dist' $ clickhouse-client -q 'insert into buffer values (1)' # before this patch this produces errors like: # 2021.09.27 23:46:48.384920 [ 19474 ] {} <Error> default.dist.DirectoryMonitor: Code: 164. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: readonly: Cannot execute query in readonly mode. Stack trace: v2: reset the authentication instead of using default user (as suggested by @vitlibar) v3: reset Session::user and introduce ClientInfo::resetAuthentication (as suggested by @vitlibar) v4: reset the session every time in interserver mode (suggested by @vitlibar)
2021-09-30 07:15:38 +00:00
}
else
{
LOG_DEBUG(log, "User (initial, interserver mode): {} (client: {})", client_info.initial_user, getClientAddress(client_info).toString());
/// In case of inter-server mode authorization is done with the
/// initial address of the client, not the real address from which
/// the query was come, since the real address is the address of
/// the initiator server, while we are interested in client's
/// address.
2021-08-01 14:12:34 +00:00
session->authenticate(AlwaysAllowCredentials{client_info.initial_user}, client_info.initial_address);
}
is_interserver_authenticated = true;
#else
2023-01-24 00:30:26 +00:00
auto exception = Exception(ErrorCodes::AUTHENTICATION_FAILED,
"Inter-server secret support is disabled, because ClickHouse was built without SSL library");
2022-05-23 19:55:17 +00:00
session->onAuthenticationFailure(/* user_name */ std::nullopt, socket().peerAddress(), exception);
2022-05-18 19:57:20 +00:00
throw exception; /// NOLINT
#endif
}
2021-08-01 14:12:34 +00:00
query_context = session->makeQueryContext(std::move(client_info));
/// Sets the default database if it wasn't set earlier for the session context.
2022-08-04 16:04:06 +00:00
if (is_interserver_mode && !default_database.empty())
2021-08-01 14:12:34 +00:00
query_context->setCurrentDatabase(default_database);
if (state.part_uuids_to_ignore)
query_context->getIgnoredPartUUIDs()->add(*state.part_uuids_to_ignore);
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
2022-05-06 15:04:03 +00:00
query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); });
///
/// Settings
///
auto settings_changes = passed_settings.changes();
query_kind = query_context->getClientInfo().query_kind;
2021-08-01 14:12:34 +00:00
if (query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
/// Throw an exception if the passed settings violate the constraints.
2023-07-30 22:09:03 +00:00
query_context->checkSettingsConstraints(settings_changes, SettingSource::QUERY);
}
else
{
/// Quietly clamp to the constraints if it's not an initial query.
2023-07-30 22:09:03 +00:00
query_context->clampToSettingsConstraints(settings_changes, SettingSource::QUERY);
}
query_context->applySettingsChanges(settings_changes);
2020-09-08 13:19:27 +00:00
2021-08-01 14:12:34 +00:00
/// Use the received query id, or generate a random default. It is convenient
/// to also generate the default OpenTelemetry trace id at the same time, and
/// set the trace parent.
/// Notes:
/// 1) ClientInfo might contain upstream trace id, so we decide whether to use
/// the default ids after we have received the ClientInfo.
/// 2) There is the opentelemetry_start_trace_probability setting that
/// controls when we start a new trace. It can be changed via Native protocol,
/// so we have to apply the changes first.
query_context->setCurrentQueryId(state.query_id);
query_context->addQueryParameters(convertToQueryParameters(passed_params));
/// For testing hedged requests
if (unlikely(sleep_after_receiving_query.totalMilliseconds()))
{
std::chrono::milliseconds ms(sleep_after_receiving_query.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
2012-03-11 08:52:56 +00:00
}
void TCPHandler::receiveUnexpectedQuery()
{
UInt64 skip_uint_64;
String skip_string;
readStringBinary(skip_string, *in);
ClientInfo skip_client_info;
2020-09-17 12:15:05 +00:00
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
skip_client_info.read(*in, client_tcp_protocol_version);
Settings skip_settings;
2020-09-17 12:15:05 +00:00
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY;
skip_settings.read(*in, settings_format);
std::string skip_hash;
2020-09-17 14:55:41 +00:00
bool interserver_secret = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET;
if (interserver_secret)
readStringBinary(skip_hash, *in, 32);
readVarUInt(skip_uint_64, *in);
2021-08-01 14:12:34 +00:00
readVarUInt(skip_uint_64, *in);
2021-08-01 14:12:34 +00:00
last_block_in.compression = static_cast<Protocol::Compression>(skip_uint_64);
readStringBinary(skip_string, *in);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
skip_settings.read(*in, settings_format);
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Query received from client");
}
2012-05-21 06:49:05 +00:00
2019-10-19 20:36:35 +00:00
bool TCPHandler::receiveData(bool scalar)
{
initBlockInput();
2017-03-09 00:56:38 +00:00
/// The name of the temporary table for writing data, default to empty string
2020-02-12 18:14:12 +00:00
auto temporary_id = StorageID::createEmpty();
readStringBinary(temporary_id.table_name, *in);
2017-03-09 00:56:38 +00:00
/// Read one block from the network and write it down
Block block = state.block_in->read();
2021-08-01 14:12:34 +00:00
if (!block)
{
2021-08-01 14:12:34 +00:00
state.read_all_data = true;
return false;
}
2021-02-10 22:23:27 +00:00
2021-08-01 14:12:34 +00:00
if (scalar)
{
/// Scalar value
query_context->addScalar(temporary_id.table_name, block);
}
else if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
{
/// Data for external tables
2021-02-10 22:23:27 +00:00
2021-08-01 14:12:34 +00:00
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
StoragePtr storage;
/// If such a table does not exist, create it.
if (resolved)
2021-02-10 22:23:27 +00:00
{
2021-08-01 14:12:34 +00:00
storage = DatabaseCatalog::instance().getTable(resolved, query_context);
2021-02-10 22:23:27 +00:00
}
else
{
2021-08-01 14:12:34 +00:00
NamesAndTypesList columns = block.getNamesAndTypesList();
auto temporary_table = TemporaryTableHolder(query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
2021-08-01 14:12:34 +00:00
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false));
PushingPipelineExecutor executor(temporary_table_out);
executor.start();
executor.push(block);
executor.finish();
2021-08-01 14:12:34 +00:00
}
else if (state.need_receive_data_for_input)
{
/// 'input' table function.
state.block_for_input = block;
}
else
2021-08-01 14:12:34 +00:00
{
/// INSERT query.
state.block_for_insert = block;
2021-08-01 14:12:34 +00:00
}
return true;
}
2023-03-15 19:53:58 +00:00
2021-08-01 14:12:34 +00:00
bool TCPHandler::receiveUnexpectedData(bool throw_exception)
{
String skip_external_table_name;
readStringBinary(skip_external_table_name, *in);
std::shared_ptr<ReadBuffer> maybe_compressed_in;
if (last_block_in.compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
else
maybe_compressed_in = in;
2021-10-08 17:21:19 +00:00
auto skip_block_in = std::make_shared<NativeReader>(*maybe_compressed_in, client_tcp_protocol_version);
2022-05-20 15:29:54 +00:00
bool read_ok = !!skip_block_in->read();
2021-08-01 14:12:34 +00:00
if (!read_ok)
state.read_all_data = true;
2021-08-01 14:12:34 +00:00
if (throw_exception)
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Data received from client");
2021-08-01 14:12:34 +00:00
return read_ok;
}
void TCPHandler::initBlockInput()
2012-03-11 08:52:56 +00:00
{
2012-03-19 12:57:56 +00:00
if (!state.block_in)
{
/// 'allow_different_codecs' is set to true, because some parts of compressed data can be precompressed in advance
/// with another codec that the rest of the data. Example: data sent by Distributed tables.
2012-05-21 06:49:05 +00:00
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
2012-05-21 06:49:05 +00:00
else
state.maybe_compressed_in = in;
Block header;
2021-09-16 17:40:42 +00:00
if (state.io.pipeline.pushing())
header = state.io.pipeline.getHeader();
2019-05-28 18:30:10 +00:00
else if (state.need_receive_data_for_input)
header = state.input_header;
2021-10-08 17:21:19 +00:00
state.block_in = std::make_unique<NativeReader>(
2012-03-19 12:57:56 +00:00
*state.maybe_compressed_in,
header,
2020-09-17 12:15:05 +00:00
client_tcp_protocol_version);
2012-03-19 12:57:56 +00:00
}
}
2023-03-16 14:23:17 +00:00
void TCPHandler::initBlockOutput(const Block & block)
{
if (!state.block_out)
2012-03-19 12:57:56 +00:00
{
2021-08-01 14:12:34 +00:00
const Settings & query_settings = query_context->getSettingsRef();
if (!state.maybe_compressed_out)
{
2020-05-04 00:11:49 +00:00
std::string method = Poco::toUpper(query_settings.network_compression_method.toString());
2018-12-21 12:17:30 +00:00
std::optional<int> level;
if (method == "ZSTD")
2020-05-04 00:11:49 +00:00
level = query_settings.network_zstd_compression_level;
2018-12-21 12:17:30 +00:00
if (state.compression == Protocol::Compression::Enable)
{
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs, query_settings.enable_deflate_qpl_codec, query_settings.enable_zstd_qat_codec);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
2021-05-24 03:43:25 +00:00
*out, CompressionCodecFactory::instance().get(method, level));
}
else
state.maybe_compressed_out = out;
}
2021-10-08 17:21:19 +00:00
state.block_out = std::make_unique<NativeWriter>(
*state.maybe_compressed_out,
2020-09-17 12:15:05 +00:00
client_tcp_protocol_version,
block.cloneEmpty(),
2021-08-01 14:12:34 +00:00
!query_settings.low_cardinality_allow_in_native_format);
2012-03-19 12:57:56 +00:00
}
}
2012-03-11 08:52:56 +00:00
void TCPHandler::initLogsBlockOutput(const Block & block)
{
if (!state.logs_block_out)
{
/// Use uncompressed stream since log blocks usually contain only one row
2021-08-01 14:12:34 +00:00
const Settings & query_settings = query_context->getSettingsRef();
2021-10-08 17:21:19 +00:00
state.logs_block_out = std::make_unique<NativeWriter>(
*out,
2020-09-17 12:15:05 +00:00
client_tcp_protocol_version,
block.cloneEmpty(),
2021-08-01 14:12:34 +00:00
!query_settings.low_cardinality_allow_in_native_format);
}
}
2023-03-15 19:53:58 +00:00
2021-08-30 11:04:59 +00:00
void TCPHandler::initProfileEventsBlockOutput(const Block & block)
{
if (!state.profile_events_block_out)
{
const Settings & query_settings = query_context->getSettingsRef();
state.profile_events_block_out = std::make_unique<NativeWriter>(
*out,
2020-09-17 12:15:05 +00:00
client_tcp_protocol_version,
block.cloneEmpty(),
2021-08-01 14:12:34 +00:00
!query_settings.low_cardinality_allow_in_native_format);
}
}
2023-03-17 20:05:10 +00:00
void TCPHandler::decreaseCancellationStatus(const std::string & log_message)
{
2023-03-17 20:05:10 +00:00
auto prev_status = magic_enum::enum_name(state.cancellation_status);
bool partial_result_on_first_cancel = false;
if (query_context)
{
const auto & settings = query_context->getSettingsRef();
partial_result_on_first_cancel = settings.partial_result_on_first_cancel;
}
2023-03-15 13:05:38 +00:00
if (partial_result_on_first_cancel && state.cancellation_status == CancellationStatus::NOT_CANCELLED)
{
state.cancellation_status = CancellationStatus::READ_CANCELLED;
}
else
{
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
}
2023-03-17 20:05:10 +00:00
auto current_status = magic_enum::enum_name(state.cancellation_status);
LOG_INFO(log, "Change cancellation status from {} to {}. Log message: {}", prev_status, current_status, log_message);
}
QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus()
2012-05-09 08:16:09 +00:00
{
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED || state.sent_all_data)
return CancellationStatus::FULLY_CANCELLED;
2021-08-01 14:12:34 +00:00
if (after_check_cancelled.elapsed() / 1000 < interactive_delay)
return state.cancellation_status;
2012-05-09 15:15:45 +00:00
after_check_cancelled.restart();
2017-03-09 00:56:38 +00:00
/// During request execution the only packet that can come from the client is stopping the query.
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(0))
2012-05-09 08:16:09 +00:00
{
if (in->eof())
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
state.is_connection_closed = true;
return CancellationStatus::FULLY_CANCELLED;
}
2012-05-09 08:16:09 +00:00
UInt64 packet_type = 0;
2012-05-21 06:49:05 +00:00
readVarUInt(packet_type, *in);
2012-05-09 08:16:09 +00:00
switch (packet_type)
{
case Protocol::Client::Cancel:
if (state.empty())
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Cancel received from client");
2021-07-14 13:17:30 +00:00
decreaseCancellationStatus("Query was cancelled.");
return state.cancellation_status;
2012-05-09 08:16:09 +00:00
default:
throw NetException(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet from client {}", toString(packet_type));
2012-05-09 08:16:09 +00:00
}
}
return state.cancellation_status;
2012-05-09 08:16:09 +00:00
}
2023-03-15 19:53:58 +00:00
void TCPHandler::sendData(const Block & block)
2012-03-19 12:57:56 +00:00
{
initBlockOutput(block);
2012-03-11 08:52:56 +00:00
size_t prev_bytes_written_out = out->count();
size_t prev_bytes_written_compressed_out = state.maybe_compressed_out->count();
2012-05-21 06:49:05 +00:00
try
2021-02-17 17:34:52 +00:00
{
2021-04-19 14:12:08 +00:00
/// For testing hedged requests
2021-04-19 17:55:18 +00:00
if (unknown_packet_in_send_data)
2021-04-19 14:12:08 +00:00
{
constexpr UInt64 marker = (1ULL<<63) - 1;
2021-04-19 17:55:18 +00:00
--unknown_packet_in_send_data;
if (unknown_packet_in_send_data == 0)
writeVarUInt(marker, *out);
2021-04-19 14:12:08 +00:00
}
writeVarUInt(Protocol::Server::Data, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
/// For testing hedged requests
2021-08-01 14:12:34 +00:00
if (block.rows() > 0 && query_context->getSettingsRef().sleep_in_send_data_ms.totalMilliseconds())
{
out->next();
2021-08-01 14:12:34 +00:00
std::chrono::milliseconds ms(query_context->getSettingsRef().sleep_in_send_data_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
state.block_out->write(block);
state.maybe_compressed_out->next();
2021-02-17 17:34:52 +00:00
out->next();
}
catch (...)
{
/// In case of unsuccessful write, if the buffer with written data was not flushed,
/// we will rollback write to avoid breaking the protocol.
/// (otherwise the client will not be able to receive exception after unfinished data
/// as it will expect the continuation of the data).
/// It looks like hangs on client side or a message like "Data compressed with different methods".
2021-02-17 17:34:52 +00:00
if (state.compression == Protocol::Compression::Enable)
{
auto extra_bytes_written_compressed = state.maybe_compressed_out->count() - prev_bytes_written_compressed_out;
if (state.maybe_compressed_out->offset() >= extra_bytes_written_compressed)
state.maybe_compressed_out->position() -= extra_bytes_written_compressed;
}
auto extra_bytes_written_out = out->count() - prev_bytes_written_out;
if (out->offset() >= extra_bytes_written_out)
out->position() -= extra_bytes_written_out;
throw;
}
2012-03-19 12:57:56 +00:00
}
2023-03-15 19:53:58 +00:00
void TCPHandler::sendLogData(const Block & block)
{
initLogsBlockOutput(block);
writeVarUInt(Protocol::Server::Log, *out);
/// Send log tag (empty tag is the default tag)
writeStringBinary("", *out);
state.logs_block_out->write(block);
out->next();
}
void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
{
writeVarUInt(Protocol::Server::TableColumns, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
writeStringBinary(columns.toString(), *out);
out->next();
}
2018-08-24 07:30:53 +00:00
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
2012-03-19 12:57:56 +00:00
{
state.io.setAllDataSent();
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Server::Exception, *out);
2018-08-24 07:30:53 +00:00
writeException(e, *out, with_stack_trace);
2012-05-21 06:49:05 +00:00
out->next();
2012-03-19 12:57:56 +00:00
}
2012-05-21 06:49:05 +00:00
void TCPHandler::sendEndOfStream()
2012-05-08 11:19:00 +00:00
{
2012-07-15 21:43:04 +00:00
state.sent_all_data = true;
state.io.setAllDataSent();
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Server::EndOfStream, *out);
out->next();
2012-05-08 11:19:00 +00:00
}
2012-05-21 06:49:05 +00:00
void TCPHandler::updateProgress(const Progress & value)
2012-03-19 12:57:56 +00:00
{
state.progress.incrementPiecewiseAtomically(value);
}
2012-05-09 15:15:45 +00:00
void TCPHandler::sendProgress()
{
2012-05-21 06:49:05 +00:00
writeVarUInt(Protocol::Server::Progress, *out);
2022-05-06 15:04:03 +00:00
auto increment = state.progress.fetchValuesAndResetPiecewiseAtomically();
2022-08-08 04:55:41 +00:00
UInt64 current_elapsed_ns = state.watch.elapsedNanoseconds();
increment.elapsed_ns = current_elapsed_ns - state.prev_elapsed_ns;
state.prev_elapsed_ns = current_elapsed_ns;
2020-09-17 12:15:05 +00:00
increment.write(*out, client_tcp_protocol_version);
2012-05-21 06:49:05 +00:00
out->next();
2012-03-11 08:52:56 +00:00
}
2012-03-09 15:46:52 +00:00
void TCPHandler::sendLogs()
{
if (!state.logs_queue)
return;
MutableColumns logs_columns;
MutableColumns curr_logs_columns;
size_t rows = 0;
for (; state.logs_queue->tryPop(curr_logs_columns); ++rows)
{
if (rows == 0)
{
logs_columns = std::move(curr_logs_columns);
}
else
{
for (size_t j = 0; j < logs_columns.size(); ++j)
logs_columns[j]->insertRangeFrom(*curr_logs_columns[j], 0, curr_logs_columns[j]->size());
}
}
if (rows > 0)
{
Block block = InternalTextLogsQueue::getSampleBlock();
block.setColumns(std::move(logs_columns));
sendLogData(block);
}
}
2012-03-09 15:46:52 +00:00
void TCPHandler::run()
{
try
{
runImpl();
2020-10-10 17:47:34 +00:00
LOG_DEBUG(log, "Done processing connection.");
2012-03-09 15:46:52 +00:00
}
catch (Poco::Exception & e)
{
2017-03-09 00:56:38 +00:00
/// Timeout - not an error.
2021-10-31 15:11:46 +00:00
if (e.what() == "Timeout"sv)
2012-08-02 18:02:57 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Poco::Exception. Code: {}, e.code() = {}, e.displayText() = {}, e.what() = {}", ErrorCodes::POCO_EXCEPTION, e.code(), e.displayText(), e.what());
2012-08-02 18:02:57 +00:00
}
else
throw;
2012-03-09 15:46:52 +00:00
}
}
Poco::Net::SocketAddress TCPHandler::getClientAddress(const ClientInfo & client_info)
{
/// Extract the last entry from comma separated list of forwarded_for addresses.
/// Only the last proxy can be trusted (if any).
String forwarded_address = client_info.getLastForwardedFor();
if (!forwarded_address.empty() && server.config().getBool("auth_use_forwarded_address", false))
return Poco::Net::SocketAddress(forwarded_address, socket().peerAddress().port());
else
return socket().peerAddress();
}
2012-03-09 15:46:52 +00:00
}