ClickHouse/dbms/programs/server/TCPHandler.h

202 lines
5.8 KiB
C++
Raw Normal View History

2012-03-09 15:46:52 +00:00
#pragma once
2017-08-09 14:33:07 +00:00
#include <Poco/Net/TCPServerConnection.h>
#include <Common/getFQDNOrHostName.h>
2017-08-09 14:33:07 +00:00
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <IO/Progress.h>
#include <DataStreams/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Client/TimeoutSetter.h>
2012-03-09 15:46:52 +00:00
2017-08-09 14:33:07 +00:00
#include "IServer.h"
2012-03-09 15:46:52 +00:00
namespace CurrentMetrics
{
extern const Metric TCPConnection;
}
2017-09-01 19:04:46 +00:00
namespace Poco { class Logger; }
2012-03-09 15:46:52 +00:00
namespace DB
{
2019-03-15 18:52:45 +00:00
class ColumnsDescription;
2012-03-11 08:52:56 +00:00
2016-07-07 01:57:48 +00:00
/// State of query processing.
2012-03-11 08:52:56 +00:00
struct QueryState
{
/// Identifier of the query.
String query_id;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
2017-10-03 14:52:08 +00:00
Protocol::Compression compression = Protocol::Compression::Disable;
/// A queue with internal logs that will be passed to client. It must be
/// destroyed after input/output blocks, because they may contain other
/// threads that use this queue.
InternalTextLogsQueuePtr logs_queue;
BlockOutputStreamPtr logs_block_out;
/// From where to read data for INSERT.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
/// Where to write result data.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
/// Query text.
String query;
/// Streams of blocks, that are processing the query.
BlockIO io;
/// Is request cancelled
bool is_cancelled = false;
/// empty or not
bool is_empty = true;
/// Data was sent.
bool sent_all_data = false;
/// Request requires data from the client (INSERT, but not INSERT SELECT).
bool need_receive_data_for_insert = false;
/// Temporary tables read
bool temporary_tables_read = false;
2019-05-28 18:30:10 +00:00
/// Request requires data from client for function input()
bool need_receive_data_for_input = false;
/// temporary place for incoming data block for input()
Block block_for_input;
/// sample block from StorageInput
Block input_header;
/// To output progress, the difference after the previous sending of progress.
Progress progress;
/// Timeouts setter for current query
std::unique_ptr<TimeoutSetter> timeout_setter;
void reset()
{
*this = QueryState();
}
bool empty()
{
return is_empty;
}
2012-03-11 08:52:56 +00:00
};
2019-09-03 13:55:26 +00:00
struct LastBlockInputParameters
{
Protocol::Compression compression = Protocol::Compression::Disable;
Block header;
};
2012-03-11 08:52:56 +00:00
2012-03-09 15:46:52 +00:00
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
2017-08-09 11:57:09 +00:00
TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
2017-09-01 19:48:43 +00:00
, log(&Poco::Logger::get("TCPHandler"))
2017-08-09 11:57:09 +00:00
, connection_context(server.context())
, query_context(server.context())
{
2018-11-26 00:56:50 +00:00
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
}
2012-03-09 15:46:52 +00:00
void run() override;
2012-03-09 15:46:52 +00:00
/// This method is called right before the query execution.
virtual void customizeContext(DB::Context & /*context*/) {}
2012-03-09 15:46:52 +00:00
private:
2017-08-09 11:57:09 +00:00
IServer & server;
2017-09-01 19:04:46 +00:00
Poco::Logger * log;
2012-03-09 15:46:52 +00:00
String client_name;
UInt64 client_version_major = 0;
UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0;
UInt64 client_revision = 0;
Context connection_context;
std::optional<Context> query_context;
2012-05-17 19:15:53 +00:00
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
2012-05-21 06:49:05 +00:00
/// Time after the last check to stop the request and send the progress.
Stopwatch after_check_cancelled;
Stopwatch after_send_progress;
2012-05-30 06:46:57 +00:00
String default_database;
2012-12-06 17:32:48 +00:00
/// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state;
2012-12-06 17:32:48 +00:00
/// Last block input parameters are saved to be able to receive unexpected data packet sent after exception.
LastBlockInputParameters last_block_in;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
2018-05-07 02:01:11 +00:00
/// It is the name of the server that will be sent to the client.
String server_display_name;
2012-05-09 08:16:09 +00:00
void runImpl();
2012-03-11 08:52:56 +00:00
void receiveHello();
bool receivePacket();
void receiveQuery();
2019-10-19 20:36:35 +00:00
bool receiveData(bool scalar);
2019-05-28 18:30:10 +00:00
bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
void readData(const Settings & global_settings);
2019-05-28 18:30:10 +00:00
std::tuple<size_t, int> getReadTimeouts(const Settings & global_settings);
2012-05-17 19:15:53 +00:00
[[noreturn]] void receiveUnexpectedData();
[[noreturn]] void receiveUnexpectedQuery();
[[noreturn]] void receiveUnexpectedHello();
2019-09-03 13:55:26 +00:00
[[noreturn]] void receiveUnexpectedTablesStatusRequest();
/// Process INSERT query
void processInsertQuery(const Settings & global_settings);
2012-05-17 19:15:53 +00:00
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();
2012-03-11 08:52:56 +00:00
2019-03-26 18:28:37 +00:00
void processOrdinaryQueryWithProcessors(size_t num_threads);
void processTablesStatusRequest();
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendTableColumns(const ColumnsDescription & columns);
2018-08-24 07:30:53 +00:00
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();
void sendEndOfStream();
2019-03-26 18:28:37 +00:00
void sendProfileInfo(const BlockStreamProfileInfo & info);
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
void initBlockOutput(const Block & block);
void initLogsBlockOutput(const Block & block);
2012-05-09 08:16:09 +00:00
bool isQueryCancelled();
2012-07-21 07:02:55 +00:00
/// This function is called from different threads.
void updateProgress(const Progress & value);
2012-03-09 15:46:52 +00:00
};
}