ClickHouse/src/Server/TCPHandler.h

262 lines
8.2 KiB
C++
Raw Normal View History

2012-03-09 15:46:52 +00:00
#pragma once
2017-08-09 14:33:07 +00:00
#include <Poco/Net/TCPServerConnection.h>
2021-10-02 07:13:14 +00:00
#include <base/getFQDNOrHostName.h>
2022-02-15 08:25:07 +00:00
#include <Common/ProfileEvents.h>
2017-08-09 14:33:07 +00:00
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
2022-02-15 08:25:07 +00:00
#include <Common/ThreadStatus.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <IO/Progress.h>
2021-03-03 17:47:27 +00:00
#include <IO/TimeoutSetter.h>
2021-10-15 20:18:20 +00:00
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/Context_fwd.h>
2022-02-15 08:25:07 +00:00
#include <Interpreters/ProfileEventsExt.h>
2012-03-09 15:46:52 +00:00
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
2017-08-09 14:33:07 +00:00
#include "IServer.h"
#include "base/types.h"
2012-03-09 15:46:52 +00:00
namespace CurrentMetrics
{
extern const Metric TCPConnection;
}
2017-09-01 19:04:46 +00:00
namespace Poco { class Logger; }
2012-03-09 15:46:52 +00:00
namespace DB
{
class Session;
2021-08-01 14:12:34 +00:00
struct Settings;
2019-03-15 18:52:45 +00:00
class ColumnsDescription;
2021-10-15 20:18:20 +00:00
struct ProfileInfo;
class TCPServer;
2022-02-15 08:25:07 +00:00
class NativeWriter;
class NativeReader;
2012-03-11 08:52:56 +00:00
2016-07-07 01:57:48 +00:00
/// State of query processing.
2012-03-11 08:52:56 +00:00
struct QueryState
{
/// Identifier of the query.
String query_id;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
2017-10-03 14:52:08 +00:00
Protocol::Compression compression = Protocol::Compression::Disable;
/// A queue with internal logs that will be passed to client. It must be
/// destroyed after input/output blocks, because they may contain other
/// threads that use this queue.
InternalTextLogsQueuePtr logs_queue;
2021-10-08 17:21:19 +00:00
std::unique_ptr<NativeWriter> logs_block_out;
2021-09-02 14:27:19 +00:00
InternalProfileEventsQueuePtr profile_queue;
2021-08-30 11:04:59 +00:00
std::unique_ptr<NativeWriter> profile_events_block_out;
/// From where to read data for INSERT.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
2021-10-08 17:21:19 +00:00
std::unique_ptr<NativeReader> block_in;
/// Where to write result data.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
2021-10-08 17:21:19 +00:00
std::unique_ptr<NativeWriter> block_out;
Block block_for_insert;
/// Query text.
String query;
/// Streams of blocks, that are processing the query.
BlockIO io;
/// Is request cancelled
bool is_cancelled = false;
bool is_connection_closed = false;
/// empty or not
bool is_empty = true;
/// Data was sent.
bool sent_all_data = false;
/// Request requires data from the client (INSERT, but not INSERT SELECT).
bool need_receive_data_for_insert = false;
2021-08-01 14:12:34 +00:00
/// Data was read.
bool read_all_data = false;
/// A state got uuids to exclude from a query
2021-08-01 14:12:34 +00:00
std::optional<std::vector<UUID>> part_uuids_to_ignore;
2019-05-28 18:30:10 +00:00
/// Request requires data from client for function input()
bool need_receive_data_for_input = false;
/// temporary place for incoming data block for input()
Block block_for_input;
/// sample block from StorageInput
Block input_header;
2021-08-01 14:12:34 +00:00
/// If true, the data packets will be skipped instead of reading. Used to recover after errors.
bool skipping_data = false;
/// To output progress, the difference after the previous sending of progress.
Progress progress;
/// Timeouts setter for current query
std::unique_ptr<TimeoutSetter> timeout_setter;
void reset()
{
*this = QueryState();
}
2021-03-16 18:41:29 +00:00
bool empty() const
{
return is_empty;
}
2012-03-11 08:52:56 +00:00
};
2019-09-03 13:55:26 +00:00
struct LastBlockInputParameters
{
Protocol::Compression compression = Protocol::Compression::Disable;
};
2012-03-09 15:46:52 +00:00
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
2020-12-02 21:05:51 +00:00
/** parse_proxy_protocol_ - if true, expect and parse the header of PROXY protocol in every connection
* and set the information about forwarded address accordingly.
* See https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt
2020-12-02 21:09:46 +00:00
*
* Note: immediate IP address is always used for access control (accept-list of IP networks),
* because it allows to check the IP ranges of the trusted proxy.
* Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP.
2020-12-02 21:05:51 +00:00
*/
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_);
TCPHandler: catch exceptions from the WriteBuffer in destructor For TCPHandler it is safe thing todo. Otherwise *San will report [1]: 2021.01.24 15:33:40.103996 [ 270 ] {} <Trace> BaseDaemon: Received signal -1 2021.01.24 15:33:40.110693 [ 270 ] {} <Fatal> BaseDaemon: (version 21.2.1.5789, build id: FF421B087D1E2EAA19FA17B5AB3AE413832744E0) (from thread 48318) Terminate called for uncaught exception: 2021.01.24 15:33:40.114845 [ 270 ] {} <Trace> BaseDaemon: Received signal 6 2021.01.24 15:33:40.138738 [ 218027 ] {} <Fatal> BaseDaemon: ######################################## 2021.01.24 15:33:40.138838 [ 218027 ] {} <Fatal> BaseDaemon: (version 21.2.1.5789, build id: FF421B087D1E2EAA19FA17B5AB3AE413832744E0) (from thread 48318) (no query) Received signal Aborted (6) 2021.01.24 15:33:40.138912 [ 218027 ] {} <Fatal> BaseDaemon: 2021.01.24 15:33:40.139277 [ 218027 ] {} <Fatal> BaseDaemon: Stack trace: 0x7f185474118b 0x7f1854720859 0xaddc0cc 0x2af9fab8 0x2af9fa04 0xa91758b 0x1e418bb5 0x20725b4f 0x20725d9e 0x266b47a3 0x269772f5 0x26971847 0x7f18548f6609 0x7f185481d293 2021.01.24 15:33:40.139637 [ 218027 ] {} <Fatal> BaseDaemon: 3. raise @ 0x4618b in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:40.140113 [ 218027 ] {} <Fatal> BaseDaemon: 4. abort @ 0x25859 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:40.144121 [ 218027 ] {} <Fatal> BaseDaemon: 5. ./obj-x86_64-linux-gnu/../base/daemon/BaseDaemon.cpp:0: terminate_handler() @ 0xaddc0cc in /usr/bin/clickhouse 2021.01.24 15:33:40.151208 [ 218027 ] {} <Fatal> BaseDaemon: 6. ./obj-x86_64-linux-gnu/../contrib/libcxxabi/src/cxa_handlers.cpp:61: std::__terminate(void (*)()) @ 0x2af9fab8 in /usr/bin/clickhouse 2021.01.24 15:33:40.153085 [ 218027 ] {} <Fatal> BaseDaemon: 7. ./obj-x86_64-linux-gnu/../contrib/libcxxabi/src/cxa_handlers.cpp:0: std::terminate() @ 0x2af9fa04 in /usr/bin/clickhouse 2021.01.24 15:33:40.155209 [ 218027 ] {} <Fatal> BaseDaemon: 8. ? @ 0xa91758b in /usr/bin/clickhouse 2021.01.24 15:33:40.156621 [ 218027 ] {} <Fatal> BaseDaemon: 9. ./obj-x86_64-linux-gnu/../src/IO/WriteBufferFromPocoSocket.cpp:0: DB::WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket() @ 0x1e418bb5 in /usr/bin/clickhouse 2021.01.24 15:33:40.161041 [ 218027 ] {} <Fatal> BaseDaemon: 10. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2518: DB::TCPHandler::~TCPHandler() @ 0x20725b4f in /usr/bin/clickhouse 2021.01.24 15:33:40.164557 [ 218027 ] {} <Fatal> BaseDaemon: 11. ./obj-x86_64-linux-gnu/../src/Server/TCPHandler.h:101: DB::TCPHandler::~TCPHandler() @ 0x20725d9e in /usr/bin/clickhouse 2021.01.24 15:33:40.165921 [ 218027 ] {} <Fatal> BaseDaemon: 12. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/include/Poco/AtomicCounter.h:314: Poco::Net::TCPServerDispatcher::run() @ 0x266b47a3 in /usr/bin/clickhouse 2021.01.24 15:33:40.167347 [ 218027 ] {} <Fatal> BaseDaemon: 13. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/ThreadPool.cpp:0: Poco::PooledThread::run() @ 0x269772f5 in /usr/bin/clickhouse 2021.01.24 15:33:40.169401 [ 218027 ] {} <Fatal> BaseDaemon: 14. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread_POSIX.cpp:0: Poco::ThreadImpl::runnableEntry(void*) @ 0x26971847 in /usr/bin/clickhouse 2021.01.24 15:33:40.169498 [ 218027 ] {} <Fatal> BaseDaemon: 15. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so 2021.01.24 15:33:40.169566 [ 218027 ] {} <Fatal> BaseDaemon: 16. __clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.01.24 15:33:41.027601 [ 218027 ] {} <Fatal> BaseDaemon: Calculated checksum of the binary: 63D7491B39260494BA0D785E1860B427. There is no information about the reference checksum. [1]: https://clickhouse-test-reports.s3.yandex.net/19451/1e16bd6f337985a82fbdf4eded695dc6e663af58/stress_test_(address).html#fail1 v2: Fix catching errors in WriteBufferFromPocoSocket destructor
2021-01-28 04:07:51 +00:00
~TCPHandler() override;
2012-03-09 15:46:52 +00:00
void run() override;
2012-03-09 15:46:52 +00:00
/// This method is called right before the query execution.
virtual void customizeContext(ContextMutablePtr /*context*/) {}
2012-03-09 15:46:52 +00:00
private:
2017-08-09 11:57:09 +00:00
IServer & server;
TCPServer & tcp_server;
2020-12-02 21:05:51 +00:00
bool parse_proxy_protocol = false;
2017-09-01 19:04:46 +00:00
Poco::Logger * log;
2012-03-09 15:46:52 +00:00
String client_name;
UInt64 client_version_major = 0;
UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0;
2020-09-17 12:15:05 +00:00
UInt64 client_tcp_protocol_version = 0;
2021-08-01 14:12:34 +00:00
/// Connection settings, which are extracted from a context.
bool send_exception_with_stack_trace = true;
Poco::Timespan send_timeout = DBMS_DEFAULT_SEND_TIMEOUT_SEC;
Poco::Timespan receive_timeout = DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC;
UInt64 poll_interval = DBMS_DEFAULT_POLL_INTERVAL;
UInt64 idle_connection_timeout = 3600;
UInt64 interactive_delay = 100000;
Poco::Timespan sleep_in_send_tables_status;
UInt64 unknown_packet_in_send_data = 0;
Poco::Timespan sleep_in_receive_cancel;
std::unique_ptr<Session> session;
2021-05-31 14:49:02 +00:00
ContextMutablePtr query_context;
2012-05-17 19:15:53 +00:00
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
2012-05-21 06:49:05 +00:00
/// Time after the last check to stop the request and send the progress.
Stopwatch after_check_cancelled;
Stopwatch after_send_progress;
2012-05-30 06:46:57 +00:00
String default_database;
2012-12-06 17:32:48 +00:00
/// For inter-server secret (remote_server.*.secret)
2021-08-01 14:12:34 +00:00
bool is_interserver_mode = false;
String salt;
String cluster;
String cluster_secret;
2021-04-08 19:00:39 +00:00
std::mutex task_callback_mutex;
2021-12-12 02:35:33 +00:00
std::mutex fatal_error_mutex;
/// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state;
2012-12-06 17:32:48 +00:00
/// Last block input parameters are saved to be able to receive unexpected data packet sent after exception.
LastBlockInputParameters last_block_in;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
2018-05-07 02:01:11 +00:00
2022-02-15 08:25:07 +00:00
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
2018-05-07 02:01:11 +00:00
/// It is the name of the server that will be sent to the client.
String server_display_name;
2012-05-09 08:16:09 +00:00
void runImpl();
2012-03-11 08:52:56 +00:00
2021-08-01 14:12:34 +00:00
void extractConnectionSettingsFromContext(const ContextPtr & context);
2020-12-02 21:05:51 +00:00
bool receiveProxyHeader();
void receiveHello();
bool receivePacket();
void receiveQuery();
void receiveIgnoredPartUUIDs();
2021-04-13 10:59:02 +00:00
String receiveReadTaskResponseAssumeLocked();
std::optional<PartitionReadResponse> receivePartitionMergeTreeReadTaskResponseAssumeLocked();
2019-10-19 20:36:35 +00:00
bool receiveData(bool scalar);
2021-08-01 14:12:34 +00:00
bool readDataNext();
void readData();
void skipData();
2021-04-10 02:35:07 +00:00
void receiveClusterNameAndSalt();
2012-05-17 19:15:53 +00:00
2021-08-01 14:12:34 +00:00
bool receiveUnexpectedData(bool throw_exception = true);
[[noreturn]] void receiveUnexpectedQuery();
2021-08-01 14:12:34 +00:00
[[noreturn]] void receiveUnexpectedIgnoredPartUUIDs();
[[noreturn]] void receiveUnexpectedHello();
2019-09-03 13:55:26 +00:00
[[noreturn]] void receiveUnexpectedTablesStatusRequest();
/// Process INSERT query
2021-08-01 14:12:34 +00:00
void processInsertQuery();
2012-05-17 19:15:53 +00:00
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();
2012-03-11 08:52:56 +00:00
void processOrdinaryQueryWithProcessors();
2019-03-26 18:28:37 +00:00
void processTablesStatusRequest();
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendTableColumns(const ColumnsDescription & columns);
2018-08-24 07:30:53 +00:00
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendPartUUIDs();
2021-04-10 02:21:18 +00:00
void sendReadTaskRequestAssumeLocked();
void sendMergeTreeReadTaskRequestAssumeLocked(PartitionReadRequest request);
2021-10-15 20:18:20 +00:00
void sendProfileInfo(const ProfileInfo & info);
2019-03-26 18:28:37 +00:00
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
2021-08-30 11:04:59 +00:00
void sendProfileEvents();
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
void initBlockOutput(const Block & block);
void initLogsBlockOutput(const Block & block);
2021-08-30 11:04:59 +00:00
void initProfileEventsBlockOutput(const Block & block);
2012-05-09 08:16:09 +00:00
bool isQueryCancelled();
2012-07-21 07:02:55 +00:00
/// This function is called from different threads.
void updateProgress(const Progress & value);
2012-03-09 15:46:52 +00:00
};
}