2012-03-19 12:57:56 +00:00
|
|
|
#include <iomanip>
|
2018-08-14 20:29:42 +00:00
|
|
|
#include <ext/scope_guard.h>
|
2013-01-13 22:13:54 +00:00
|
|
|
#include <Poco/Net/NetException.h>
|
2018-06-01 19:39:32 +00:00
|
|
|
#include <Common/CurrentThread.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Stopwatch.h>
|
2018-08-20 02:34:00 +00:00
|
|
|
#include <Common/NetException.h>
|
2018-08-31 00:59:48 +00:00
|
|
|
#include <Common/setThreadName.h>
|
2020-09-14 21:55:43 +00:00
|
|
|
#include <Common/OpenSSLHelpers.h>
|
2017-11-24 13:55:31 +00:00
|
|
|
#include <IO/Progress.h>
|
2018-12-28 18:15:26 +00:00
|
|
|
#include <Compression/CompressedReadBuffer.h>
|
|
|
|
#include <Compression/CompressedWriteBuffer.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/ReadBufferFromPocoSocket.h>
|
|
|
|
#include <IO/WriteBufferFromPocoSocket.h>
|
2020-12-02 21:05:51 +00:00
|
|
|
#include <IO/LimitReadBuffer.h>
|
2018-09-20 20:51:21 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
|
|
|
#include <IO/WriteHelpers.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/copyData.h>
|
|
|
|
#include <DataStreams/AsynchronousBlockInputStream.h>
|
|
|
|
#include <DataStreams/NativeBlockInputStream.h>
|
|
|
|
#include <DataStreams/NativeBlockOutputStream.h>
|
|
|
|
#include <Interpreters/executeQuery.h>
|
2017-04-17 16:02:48 +00:00
|
|
|
#include <Interpreters/TablesStatus.h>
|
2018-08-14 20:29:42 +00:00
|
|
|
#include <Interpreters/InternalTextLogsQueue.h>
|
2020-11-09 15:07:38 +00:00
|
|
|
#include <Interpreters/OpenTelemetrySpanLog.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/StorageMemory.h>
|
2017-04-17 16:02:48 +00:00
|
|
|
#include <Storages/StorageReplicatedMergeTree.h>
|
2020-11-20 17:23:53 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
2018-08-20 02:34:00 +00:00
|
|
|
#include <Core/ExternalTable.h>
|
2018-11-14 15:23:00 +00:00
|
|
|
#include <Storages/ColumnDefault.h>
|
2018-12-06 11:24:07 +00:00
|
|
|
#include <DataTypes/DataTypeLowCardinality.h>
|
2018-12-21 12:17:30 +00:00
|
|
|
#include <Compression/CompressionFactory.h>
|
2019-07-08 16:11:38 +00:00
|
|
|
#include <common/logger_useful.h>
|
2018-07-09 16:31:24 +00:00
|
|
|
|
2020-05-20 19:01:36 +00:00
|
|
|
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
#include "TCPHandler.h"
|
|
|
|
|
2020-04-16 12:31:57 +00:00
|
|
|
#if !defined(ARCADIA_BUILD)
|
|
|
|
# include <Common/config_version.h>
|
|
|
|
#endif
|
|
|
|
|
2018-02-16 20:53:47 +00:00
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int CLIENT_HAS_CONNECTED_TO_WRONG_PORT;
|
|
|
|
extern const int UNKNOWN_DATABASE;
|
|
|
|
extern const int UNKNOWN_EXCEPTION;
|
|
|
|
extern const int UNKNOWN_PACKET_FROM_CLIENT;
|
|
|
|
extern const int POCO_EXCEPTION;
|
|
|
|
extern const int SOCKET_TIMEOUT;
|
|
|
|
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
|
2020-09-14 21:55:43 +00:00
|
|
|
extern const int SUPPORT_IS_DISABLED;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
|
|
|
|
void TCPHandler::runImpl()
|
|
|
|
{
|
2018-08-31 00:59:48 +00:00
|
|
|
setThreadName("TCPHandler");
|
2019-01-15 18:39:54 +00:00
|
|
|
ThreadStatus thread_status;
|
2018-08-31 00:59:48 +00:00
|
|
|
|
2017-08-09 11:57:09 +00:00
|
|
|
connection_context = server.context();
|
2019-07-08 00:51:43 +00:00
|
|
|
connection_context.makeSessionContext();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
/// These timeouts can be changed after receiving query.
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
auto global_receive_timeout = connection_context.getSettingsRef().receive_timeout;
|
|
|
|
auto global_send_timeout = connection_context.getSettingsRef().send_timeout;
|
|
|
|
|
|
|
|
socket().setReceiveTimeout(global_receive_timeout);
|
|
|
|
socket().setSendTimeout(global_send_timeout);
|
2017-04-01 07:20:54 +00:00
|
|
|
socket().setNoDelay(true);
|
|
|
|
|
|
|
|
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
|
|
|
|
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
|
|
|
|
|
2020-12-02 21:05:51 +00:00
|
|
|
/// Support for PROXY protocol
|
|
|
|
if (parse_proxy_protocol && !receiveProxyHeader())
|
|
|
|
return;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (in->eof())
|
|
|
|
{
|
2020-12-11 10:51:48 +00:00
|
|
|
LOG_INFO(log, "Client has not sent any data.");
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
/// User will be authenticated here. It will also set settings from user profile into connection_context.
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
receiveHello();
|
|
|
|
}
|
|
|
|
catch (const Exception & e) /// Typical for an incorrect username, password, or address.
|
|
|
|
{
|
|
|
|
if (e.code() == ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT)
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Client has connected to wrong port.");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
|
|
|
|
{
|
2020-12-11 10:51:48 +00:00
|
|
|
LOG_INFO(log, "Client has gone away.");
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2017-04-17 16:02:48 +00:00
|
|
|
/// We try to send error information to the client.
|
2018-08-24 07:30:53 +00:00
|
|
|
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (...) {}
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// When connecting, the default database can be specified.
|
|
|
|
if (!default_database.empty())
|
|
|
|
{
|
2020-02-10 13:10:17 +00:00
|
|
|
if (!DatabaseCatalog::instance().isDatabaseExist(default_database))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-12-20 16:53:37 +00:00
|
|
|
Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_ERROR(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
|
2018-08-24 07:30:53 +00:00
|
|
|
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
connection_context.setCurrentDatabase(default_database);
|
|
|
|
}
|
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
Settings connection_settings = connection_context.getSettings();
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
sendHello();
|
|
|
|
|
|
|
|
connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
|
|
|
|
|
2020-03-08 21:29:00 +00:00
|
|
|
while (true)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-07-04 22:57:26 +00:00
|
|
|
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
|
2019-07-04 22:23:45 +00:00
|
|
|
{
|
|
|
|
Stopwatch idle_time;
|
2019-07-30 22:27:14 +00:00
|
|
|
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
|
|
|
|
std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
|
2019-07-04 22:23:45 +00:00
|
|
|
{
|
2019-07-30 22:22:45 +00:00
|
|
|
if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
|
2019-07-04 23:03:20 +00:00
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Closing idle connection");
|
2019-07-04 22:23:45 +00:00
|
|
|
return;
|
2019-07-04 23:03:20 +00:00
|
|
|
}
|
2019-07-04 22:23:45 +00:00
|
|
|
}
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-17 16:02:48 +00:00
|
|
|
/// If we need to shut down, or client disconnects.
|
2017-08-09 14:33:07 +00:00
|
|
|
if (server.isCancelled() || in->eof())
|
2017-04-01 07:20:54 +00:00
|
|
|
break;
|
|
|
|
|
2019-07-04 23:03:20 +00:00
|
|
|
/// Set context of request.
|
|
|
|
query_context = connection_context;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
Stopwatch watch;
|
|
|
|
state.reset();
|
|
|
|
|
2018-09-07 23:22:02 +00:00
|
|
|
/// Initialized later.
|
|
|
|
std::optional<CurrentThread::QueryScope> query_scope;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/** An exception during the execution of request (it must be sent over the network to the client).
|
2017-04-17 16:02:48 +00:00
|
|
|
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
|
|
|
|
*/
|
2020-01-02 06:56:53 +00:00
|
|
|
std::optional<DB::Exception> exception;
|
2018-04-04 12:11:38 +00:00
|
|
|
bool network_error = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-03-19 23:48:53 +00:00
|
|
|
bool send_exception_with_stack_trace = true;
|
2018-08-24 07:30:53 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
2018-03-28 14:07:28 +00:00
|
|
|
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
|
|
|
|
SCOPE_EXIT({state.timeout_setter.reset();});
|
|
|
|
|
2017-04-17 16:02:48 +00:00
|
|
|
/** If Query - process it. If Ping or Cancel - go back to the beginning.
|
|
|
|
* There may come settings for a separate query that modify `query_context`.
|
2020-11-20 17:23:53 +00:00
|
|
|
* It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice.
|
2017-04-17 16:02:48 +00:00
|
|
|
*/
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!receivePacket())
|
|
|
|
continue;
|
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
/** If part_uuids got received in previous packet, trying to read again.
|
|
|
|
*/
|
|
|
|
if (state.empty() && state.part_uuids && !receivePacket())
|
|
|
|
continue;
|
|
|
|
|
2019-03-06 21:32:26 +00:00
|
|
|
query_scope.emplace(*query_context);
|
2018-06-01 19:39:32 +00:00
|
|
|
|
2019-03-06 21:32:26 +00:00
|
|
|
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
|
2018-08-24 07:30:53 +00:00
|
|
|
|
2018-06-06 20:57:07 +00:00
|
|
|
/// Should we send internal logs to client?
|
2019-07-10 12:19:17 +00:00
|
|
|
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
|
2019-08-09 13:02:01 +00:00
|
|
|
&& client_logs_level != LogsLevel::none)
|
2018-06-06 20:57:07 +00:00
|
|
|
{
|
2018-06-15 17:32:35 +00:00
|
|
|
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
|
2019-07-10 12:19:17 +00:00
|
|
|
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
|
2019-08-09 13:02:01 +00:00
|
|
|
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
|
2020-06-20 11:17:15 +00:00
|
|
|
CurrentThread::setFatalErrorCallback([this]{ sendLogs(); });
|
2018-06-06 20:57:07 +00:00
|
|
|
}
|
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
|
2018-08-24 00:07:25 +00:00
|
|
|
{
|
2019-03-06 21:32:26 +00:00
|
|
|
if (&context != &*query_context)
|
2018-06-01 15:32:27 +00:00
|
|
|
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
/// Get blocks of temporary tables
|
2019-07-30 22:22:45 +00:00
|
|
|
readData(connection_settings);
|
2018-06-01 15:32:27 +00:00
|
|
|
|
|
|
|
/// Reset the input stream, as we received an empty block while receiving external table data.
|
|
|
|
/// So, the stream has been marked as cancelled and we can't read from it anymore.
|
|
|
|
state.block_in.reset();
|
2018-11-26 00:56:50 +00:00
|
|
|
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
|
2019-12-07 23:41:51 +00:00
|
|
|
|
|
|
|
state.temporary_tables_read = true;
|
2018-06-01 15:32:27 +00:00
|
|
|
});
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-05-28 18:30:10 +00:00
|
|
|
/// Send structure of columns to client for function input()
|
|
|
|
query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
|
|
|
|
{
|
2019-05-30 20:12:44 +00:00
|
|
|
if (&context != &query_context.value())
|
2019-05-28 18:30:10 +00:00
|
|
|
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2020-06-17 16:39:58 +00:00
|
|
|
auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
|
2019-05-28 18:30:10 +00:00
|
|
|
state.need_receive_data_for_input = true;
|
|
|
|
|
|
|
|
/// Send ColumnsDescription for input storage.
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
|
2019-05-28 18:30:10 +00:00
|
|
|
&& query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
|
|
|
|
{
|
2020-06-17 16:39:58 +00:00
|
|
|
sendTableColumns(metadata_snapshot->getColumns());
|
2019-05-28 18:30:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Send block to the client - input storage structure.
|
2020-06-17 16:39:58 +00:00
|
|
|
state.input_header = metadata_snapshot->getSampleBlock();
|
2019-05-28 18:30:10 +00:00
|
|
|
sendData(state.input_header);
|
|
|
|
});
|
|
|
|
|
2019-09-04 12:25:20 +00:00
|
|
|
query_context->setInputBlocksReaderCallback([&connection_settings, this] (Context & context) -> Block
|
2019-05-28 18:30:10 +00:00
|
|
|
{
|
2019-05-30 20:12:44 +00:00
|
|
|
if (&context != &query_context.value())
|
2019-05-28 18:30:10 +00:00
|
|
|
throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
size_t poll_interval;
|
|
|
|
int receive_timeout;
|
2019-09-04 12:25:20 +00:00
|
|
|
std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
|
2019-05-28 18:30:10 +00:00
|
|
|
if (!readDataNext(poll_interval, receive_timeout))
|
|
|
|
{
|
|
|
|
state.block_in.reset();
|
|
|
|
state.maybe_compressed_in.reset();
|
|
|
|
return Block();
|
2019-05-28 20:16:24 +00:00
|
|
|
}
|
2019-05-28 18:30:10 +00:00
|
|
|
return state.block_for_input;
|
|
|
|
});
|
|
|
|
|
2019-03-06 21:32:26 +00:00
|
|
|
customizeContext(*query_context);
|
2019-03-06 16:41:35 +00:00
|
|
|
|
2020-09-17 12:15:05 +00:00
|
|
|
bool may_have_embedded_data = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
|
2017-04-17 16:02:48 +00:00
|
|
|
/// Processing Query
|
2019-03-06 21:32:26 +00:00
|
|
|
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
after_check_cancelled.restart();
|
|
|
|
after_send_progress.restart();
|
|
|
|
|
2020-10-10 01:43:07 +00:00
|
|
|
if (state.io.out)
|
|
|
|
{
|
|
|
|
state.need_receive_data_for_insert = true;
|
2019-07-30 22:22:45 +00:00
|
|
|
processInsertQuery(connection_settings);
|
2020-10-10 01:43:07 +00:00
|
|
|
}
|
|
|
|
else if (state.need_receive_data_for_input) // It implies pipeline execution
|
2019-05-28 18:30:10 +00:00
|
|
|
{
|
|
|
|
/// It is special case for input(), all works for reading data from client will be done in callbacks.
|
2020-05-27 18:20:26 +00:00
|
|
|
auto executor = state.io.pipeline.execute();
|
|
|
|
executor->execute(state.io.pipeline.getNumThreads());
|
2019-05-28 18:30:10 +00:00
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
else if (state.io.pipeline.initialized())
|
2020-03-15 21:22:55 +00:00
|
|
|
processOrdinaryQueryWithProcessors();
|
2020-10-10 01:43:07 +00:00
|
|
|
else if (state.io.in)
|
2017-04-01 07:20:54 +00:00
|
|
|
processOrdinaryQuery();
|
|
|
|
|
2020-10-10 09:24:12 +00:00
|
|
|
state.io.onFinish();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-09-26 21:19:49 +00:00
|
|
|
/// Do it before sending end of stream, to have a chance to show log message in client.
|
|
|
|
query_scope->logPeakMemoryUsage();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-10-24 03:41:47 +00:00
|
|
|
if (state.is_connection_closed)
|
|
|
|
break;
|
|
|
|
|
2018-09-26 21:19:49 +00:00
|
|
|
sendLogs();
|
2018-06-06 20:57:07 +00:00
|
|
|
sendEndOfStream();
|
2018-09-07 23:22:02 +00:00
|
|
|
|
2020-05-18 18:13:56 +00:00
|
|
|
/// QueryState should be cleared before QueryScope, since otherwise
|
|
|
|
/// the MemoryTracker will be wrong for possible deallocations.
|
|
|
|
/// (i.e. deallocations from the Aggregator with two-level aggregation)
|
2017-04-01 07:20:54 +00:00
|
|
|
state.reset();
|
2020-05-18 18:13:56 +00:00
|
|
|
query_scope.reset();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
state.io.onException();
|
2020-01-02 06:56:53 +00:00
|
|
|
exception.emplace(e);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
|
|
|
|
throw;
|
2018-04-04 12:11:38 +00:00
|
|
|
|
2020-08-27 00:00:43 +00:00
|
|
|
/// If there is UNEXPECTED_PACKET_FROM_CLIENT emulate network_error
|
|
|
|
/// to break the loop, but do not throw to send the exception to
|
|
|
|
/// the client.
|
|
|
|
if (e.code() == ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT)
|
|
|
|
network_error = true;
|
|
|
|
|
2018-04-04 12:11:38 +00:00
|
|
|
/// If a timeout occurred, try to inform client about it and close the session
|
|
|
|
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
|
|
|
|
network_error = true;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (const Poco::Net::NetException & e)
|
|
|
|
{
|
2017-04-17 16:02:48 +00:00
|
|
|
/** We can get here if there was an error during connection to the client,
|
|
|
|
* or in connection with a remote server that was used to process the request.
|
|
|
|
* It is not possible to distinguish between these two cases.
|
|
|
|
* Although in one of them, we have to send exception to the client, but in the other - we can not.
|
|
|
|
* We will try to send exception to the client in any case - see below.
|
|
|
|
*/
|
2017-04-01 07:20:54 +00:00
|
|
|
state.io.onException();
|
2020-06-04 17:49:14 +00:00
|
|
|
exception.emplace(Exception::CreateFromPocoTag{}, e);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (const Poco::Exception & e)
|
|
|
|
{
|
|
|
|
state.io.onException();
|
2020-06-04 17:49:14 +00:00
|
|
|
exception.emplace(Exception::CreateFromPocoTag{}, e);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2020-07-17 14:25:13 +00:00
|
|
|
// Server should die on std logic errors in debug, like with assert()
|
|
|
|
// or ErrorCodes::LOGICAL_ERROR. This helps catch these errors in
|
|
|
|
// tests.
|
|
|
|
#ifndef NDEBUG
|
2020-07-15 12:08:01 +00:00
|
|
|
catch (const std::logic_error & e)
|
|
|
|
{
|
2020-07-17 14:20:33 +00:00
|
|
|
state.io.onException();
|
|
|
|
exception.emplace(Exception::CreateFromSTDTag{}, e);
|
|
|
|
sendException(*exception, send_exception_with_stack_trace);
|
2020-07-17 14:25:13 +00:00
|
|
|
std::abort();
|
2020-07-15 12:08:01 +00:00
|
|
|
}
|
2020-07-17 14:25:13 +00:00
|
|
|
#endif
|
2017-04-01 07:20:54 +00:00
|
|
|
catch (const std::exception & e)
|
|
|
|
{
|
|
|
|
state.io.onException();
|
2020-06-04 17:49:14 +00:00
|
|
|
exception.emplace(Exception::CreateFromSTDTag{}, e);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
state.io.onException();
|
2020-01-02 06:56:53 +00:00
|
|
|
exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (exception)
|
2018-06-06 20:57:07 +00:00
|
|
|
{
|
2018-06-09 15:52:59 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
/// Try to send logs to client, but it could be risky too
|
|
|
|
/// Assume that we can't break output here
|
|
|
|
sendLogs();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log, "Can't send logs to client");
|
|
|
|
}
|
|
|
|
|
2020-08-26 22:43:47 +00:00
|
|
|
const auto & e = *exception;
|
|
|
|
LOG_ERROR(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
|
2018-08-24 07:30:53 +00:00
|
|
|
sendException(*exception, send_exception_with_stack_trace);
|
2018-06-06 20:57:07 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2017-04-17 16:02:48 +00:00
|
|
|
/** Could not send exception information to the client. */
|
2017-04-01 07:20:54 +00:00
|
|
|
network_error = true;
|
|
|
|
LOG_WARNING(log, "Client has gone away.");
|
|
|
|
}
|
|
|
|
|
2019-12-07 23:41:51 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
if (exception && !state.temporary_tables_read)
|
|
|
|
query_context->initializeExternalTablesIfSet();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
network_error = true;
|
|
|
|
LOG_WARNING(log, "Can't read external tables after query failure.");
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
2020-05-18 18:13:56 +00:00
|
|
|
/// QueryState should be cleared before QueryScope, since otherwise
|
|
|
|
/// the MemoryTracker will be wrong for possible deallocations.
|
|
|
|
/// (i.e. deallocations from the Aggregator with two-level aggregation)
|
2017-04-01 07:20:54 +00:00
|
|
|
state.reset();
|
2020-05-18 18:13:56 +00:00
|
|
|
query_scope.reset();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2017-04-17 16:02:48 +00:00
|
|
|
/** During the processing of request, there was an exception that we caught and possibly sent to client.
|
|
|
|
* When destroying the request pipeline execution there was a second exception.
|
|
|
|
* For example, a pipeline could run in multiple threads, and an exception could occur in each of them.
|
|
|
|
* Ignore it.
|
|
|
|
*/
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
watch.stop();
|
|
|
|
|
2020-10-10 17:47:34 +00:00
|
|
|
LOG_DEBUG(log, "Processed in {} sec.", watch.elapsedSeconds());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-03-06 21:32:26 +00:00
|
|
|
/// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
|
|
|
|
query_context.reset();
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (network_error)
|
|
|
|
break;
|
|
|
|
}
|
2012-03-11 08:52:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-05-28 18:30:10 +00:00
|
|
|
bool TCPHandler::readDataNext(const size_t & poll_interval, const int & receive_timeout)
|
|
|
|
{
|
|
|
|
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
|
|
|
|
|
|
|
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
|
|
|
|
while (true)
|
|
|
|
{
|
|
|
|
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
|
|
|
|
break;
|
|
|
|
|
|
|
|
/// Do we need to shut down?
|
|
|
|
if (server.isCancelled())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/** Have we waited for data for too long?
|
|
|
|
* If we periodically poll, the receive_timeout of the socket itself does not work.
|
|
|
|
* Therefore, an additional check is added.
|
|
|
|
*/
|
|
|
|
double elapsed = watch.elapsedSeconds();
|
|
|
|
if (elapsed > receive_timeout)
|
|
|
|
{
|
2020-11-10 18:22:26 +00:00
|
|
|
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
|
|
|
|
"Timeout exceeded while receiving data from client. Waited for {} seconds, timeout is {} seconds.",
|
|
|
|
static_cast<size_t>(elapsed), receive_timeout);
|
2019-05-28 18:30:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// If client disconnected.
|
|
|
|
if (in->eof())
|
2020-10-24 03:41:47 +00:00
|
|
|
{
|
|
|
|
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
|
|
|
|
state.is_connection_closed = true;
|
2019-05-28 18:30:10 +00:00
|
|
|
return false;
|
2020-10-24 03:41:47 +00:00
|
|
|
}
|
2019-05-28 18:30:10 +00:00
|
|
|
|
|
|
|
/// We accept and process data. And if they are over, then we leave.
|
|
|
|
if (!receivePacket())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
sendLogs();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-09-04 12:25:20 +00:00
|
|
|
std::tuple<size_t, int> TCPHandler::getReadTimeouts(const Settings & connection_settings)
|
2012-05-21 06:49:05 +00:00
|
|
|
{
|
2019-03-06 21:32:26 +00:00
|
|
|
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
|
2018-04-04 12:11:38 +00:00
|
|
|
|
|
|
|
/// Poll interval should not be greater than receive_timeout
|
2019-08-09 13:02:01 +00:00
|
|
|
const size_t default_poll_interval = connection_settings.poll_interval * 1000000;
|
2018-04-04 12:11:38 +00:00
|
|
|
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
|
|
|
|
constexpr size_t min_poll_interval = 5000; // 5 ms
|
|
|
|
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
|
|
|
|
|
2019-05-28 18:30:10 +00:00
|
|
|
return std::make_tuple(poll_interval, receive_timeout.totalSeconds());
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
|
2019-09-04 12:25:20 +00:00
|
|
|
void TCPHandler::readData(const Settings & connection_settings)
|
2019-05-28 18:30:10 +00:00
|
|
|
{
|
|
|
|
size_t poll_interval;
|
|
|
|
int receive_timeout;
|
2018-04-04 12:11:38 +00:00
|
|
|
|
2019-09-04 12:25:20 +00:00
|
|
|
std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
|
2019-05-28 18:30:10 +00:00
|
|
|
sendLogs();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-10-24 03:41:47 +00:00
|
|
|
while (readDataNext(poll_interval, receive_timeout))
|
|
|
|
;
|
2014-03-04 15:31:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
void TCPHandler::processInsertQuery(const Settings & connection_settings)
|
2014-03-04 15:31:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception,
|
|
|
|
* client receive exception before sending data.
|
|
|
|
*/
|
|
|
|
state.io.out->writePrefix();
|
|
|
|
|
2018-12-04 20:03:04 +00:00
|
|
|
/// Send ColumnsDescription for insertion table
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
|
2018-11-15 15:03:13 +00:00
|
|
|
{
|
2020-03-02 20:23:58 +00:00
|
|
|
const auto & table_id = query_context->getInsertionTable();
|
2019-05-01 21:43:05 +00:00
|
|
|
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
|
2019-07-17 18:30:17 +00:00
|
|
|
{
|
2020-03-02 20:23:58 +00:00
|
|
|
if (!table_id.empty())
|
2020-06-17 16:39:58 +00:00
|
|
|
{
|
|
|
|
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, *query_context);
|
|
|
|
sendTableColumns(storage_ptr->getInMemoryMetadataPtr()->getColumns());
|
|
|
|
}
|
2019-07-17 18:30:17 +00:00
|
|
|
}
|
2018-11-15 15:03:13 +00:00
|
|
|
}
|
2018-11-14 15:23:00 +00:00
|
|
|
|
2018-12-04 20:03:04 +00:00
|
|
|
/// Send block to the client - table structure.
|
2018-12-20 14:26:54 +00:00
|
|
|
sendData(state.io.out->getHeader());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-07-30 22:22:45 +00:00
|
|
|
readData(connection_settings);
|
2017-04-01 07:20:54 +00:00
|
|
|
state.io.out->writeSuffix();
|
2012-05-21 06:49:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void TCPHandler::processOrdinaryQuery()
|
|
|
|
{
|
2020-11-19 15:52:11 +00:00
|
|
|
OpenTelemetrySpanHolder span(__PRETTY_FUNCTION__);
|
2020-11-09 15:07:38 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Pull query execution result, if exists, and send it to network.
|
|
|
|
if (state.io.in)
|
|
|
|
{
|
2020-11-20 17:23:53 +00:00
|
|
|
sendPartUUIDs();
|
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
/// This allows the client to prepare output format
|
|
|
|
if (Block header = state.io.in->getHeader())
|
|
|
|
sendData(header);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
/// Use of async mode here enables reporting progress and monitoring client cancelling the query
|
2017-04-01 07:20:54 +00:00
|
|
|
AsynchronousBlockInputStream async_in(state.io.in);
|
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
async_in.readPrefix();
|
2017-04-01 07:20:54 +00:00
|
|
|
while (true)
|
|
|
|
{
|
2019-12-11 13:30:27 +00:00
|
|
|
if (isQueryCancelled())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-12-11 13:30:27 +00:00
|
|
|
async_in.cancel(false);
|
|
|
|
break;
|
|
|
|
}
|
2018-06-06 20:57:07 +00:00
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
|
|
|
|
{
|
2020-03-04 15:16:32 +00:00
|
|
|
/// Some time passed.
|
2019-12-11 13:30:27 +00:00
|
|
|
after_send_progress.restart();
|
|
|
|
sendProgress();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
sendLogs();
|
|
|
|
|
|
|
|
if (async_in.poll(query_context->getSettingsRef().interactive_delay / 1000))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-12-11 13:30:27 +00:00
|
|
|
const auto block = async_in.read();
|
|
|
|
if (!block)
|
|
|
|
break;
|
2019-11-06 12:34:51 +00:00
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
if (!state.io.null_format)
|
|
|
|
sendData(block);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2019-12-11 13:30:27 +00:00
|
|
|
}
|
|
|
|
async_in.readSuffix();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-12-11 14:54:19 +00:00
|
|
|
/** When the data has run out, we send the profiling data and totals up to the terminating empty block,
|
2019-12-11 13:30:27 +00:00
|
|
|
* so that this information can be used in the suffix output of stream.
|
|
|
|
* If the request has been interrupted, then sendTotals and other methods should not be called,
|
|
|
|
* because we have not read all the data.
|
|
|
|
*/
|
|
|
|
if (!isQueryCancelled())
|
|
|
|
{
|
|
|
|
sendTotals(state.io.in->getTotals());
|
|
|
|
sendExtremes(state.io.in->getExtremes());
|
|
|
|
sendProfileInfo(state.io.in->getProfileInfo());
|
2019-12-12 04:46:39 +00:00
|
|
|
sendProgress();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2020-10-24 03:41:47 +00:00
|
|
|
if (state.is_connection_closed)
|
|
|
|
return;
|
|
|
|
|
2019-12-11 14:54:19 +00:00
|
|
|
sendData({});
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
Send progress after final update (like logs)
Otherwise you will get non completed progress on TCP, HTTP does not
affected.
Final updateProgress called from the executeQueryImpl::finish_callback:
(gdb) bt
0 DB::TCPHandler::updateProgress (this=0x7fffb581e000, value=...) at TCPHandler.cpp:1178
1 0x00007ffff7c62038 in DB::TCPHandler::<lambda(const DB::Progress&)>::operator()(const DB::Progress &) const (...) at TCPHandler.cpp:127
4 0x00007fffe9a86671 in DB::<lambda(DB::IBlockInputStream*, DB::IBlockOutputStream*)>::operator()(DB::IBlockInputStream *, DB::IBlockOutputStream *) (__closure=0x7fffb588f300,
stream_in=0x7fffb5800290, stream_out=0x0) at executeQuery.cpp:450
5 0x00007fffe9a8b948 in std::_Function_handler<void(DB::IBlockInputStream*, DB::IBlockOutputStream*), DB::executeQueryImpl(...)::<lambda(DB::IBlockInputStream*, DB::IBlockOutputStream*)> >::_M_invoke(const std::_Any_data &, DB::IBlockInputStream *&&, DB::IBlockOutputStream *&&) (...) at std_function.h:300
6 0x00007ffff7c7482f in std::function<>::operator()(DB::IBlockInputStream*, DB::IBlockOutputStream*) const (..) at std_function.h:688
7 0x00007ffff7c6f82b in DB::BlockIO::onFinish (this=0x7fffb5820738) at BlockIO.h:43
8 0x00007ffff7c65eb0 in DB::TCPHandler::processOrdinaryQuery (this=0x7fffb581e000) at TCPHandler.cpp:540
9 0x00007ffff7c63a50 in DB::TCPHandler::runImpl (this=0x7fffb581e000) at TCPHandler.cpp:269
10 0x00007ffff7c6a6fd in DB::TCPHandler::run (this=0x7fffb581e000) at TCPHandler.cpp:1226
...
2020-03-03 19:01:20 +00:00
|
|
|
sendProgress();
|
2012-07-21 07:02:55 +00:00
|
|
|
}
|
|
|
|
|
2019-12-11 13:30:27 +00:00
|
|
|
|
2020-03-15 21:22:55 +00:00
|
|
|
void TCPHandler::processOrdinaryQueryWithProcessors()
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
|
|
|
auto & pipeline = state.io.pipeline;
|
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
sendPartUUIDs();
|
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
/// Send header-block, to allow client to prepare output format for data to send.
|
|
|
|
{
|
2020-05-14 21:03:38 +00:00
|
|
|
const auto & header = pipeline.getHeader();
|
2019-03-26 18:28:37 +00:00
|
|
|
|
|
|
|
if (header)
|
|
|
|
sendData(header);
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
2020-05-20 19:01:36 +00:00
|
|
|
PullingAsyncPipelineExecutor executor(pipeline);
|
2020-05-14 21:03:38 +00:00
|
|
|
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
|
2019-07-09 12:46:50 +00:00
|
|
|
|
2020-05-14 21:03:38 +00:00
|
|
|
Block block;
|
|
|
|
while (executor.pull(block, query_context->getSettingsRef().interactive_delay / 1000))
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-01-23 10:04:18 +00:00
|
|
|
if (isQueryCancelled())
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-01-23 10:04:18 +00:00
|
|
|
/// A packet was received requesting to stop execution of the request.
|
2020-05-14 21:03:38 +00:00
|
|
|
executor.cancel();
|
2020-01-23 10:04:18 +00:00
|
|
|
break;
|
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-01-23 10:04:18 +00:00
|
|
|
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
|
|
|
|
{
|
|
|
|
/// Some time passed and there is a progress.
|
|
|
|
after_send_progress.restart();
|
|
|
|
sendProgress();
|
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-01-23 10:04:18 +00:00
|
|
|
sendLogs();
|
2019-04-05 10:52:07 +00:00
|
|
|
|
2020-05-14 21:03:38 +00:00
|
|
|
if (block)
|
2020-01-23 10:04:18 +00:00
|
|
|
{
|
|
|
|
if (!state.io.null_format)
|
|
|
|
sendData(block);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
2020-01-23 10:04:18 +00:00
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-01-23 10:04:18 +00:00
|
|
|
/** If data has run out, we will send the profiling data and total values to
|
|
|
|
* the last zero block to be able to use
|
|
|
|
* this information in the suffix output of stream.
|
|
|
|
* If the request was interrupted, then `sendTotals` and other methods could not be called,
|
|
|
|
* because we have not read all the data yet,
|
|
|
|
* and there could be ongoing calculations in other threads at the same time.
|
|
|
|
*/
|
|
|
|
if (!isQueryCancelled())
|
|
|
|
{
|
2020-05-14 21:03:38 +00:00
|
|
|
sendTotals(executor.getTotalsBlock());
|
|
|
|
sendExtremes(executor.getExtremesBlock());
|
|
|
|
sendProfileInfo(executor.getProfileInfo());
|
2020-01-23 10:04:18 +00:00
|
|
|
sendProgress();
|
|
|
|
sendLogs();
|
2019-05-14 12:53:20 +00:00
|
|
|
}
|
2020-01-23 10:04:18 +00:00
|
|
|
|
2020-10-24 03:41:47 +00:00
|
|
|
if (state.is_connection_closed)
|
|
|
|
return;
|
|
|
|
|
2020-01-23 10:04:18 +00:00
|
|
|
sendData({});
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
Send progress after final update (like logs)
Otherwise you will get non completed progress on TCP, HTTP does not
affected.
Final updateProgress called from the executeQueryImpl::finish_callback:
(gdb) bt
0 DB::TCPHandler::updateProgress (this=0x7fffb581e000, value=...) at TCPHandler.cpp:1178
1 0x00007ffff7c62038 in DB::TCPHandler::<lambda(const DB::Progress&)>::operator()(const DB::Progress &) const (...) at TCPHandler.cpp:127
4 0x00007fffe9a86671 in DB::<lambda(DB::IBlockInputStream*, DB::IBlockOutputStream*)>::operator()(DB::IBlockInputStream *, DB::IBlockOutputStream *) (__closure=0x7fffb588f300,
stream_in=0x7fffb5800290, stream_out=0x0) at executeQuery.cpp:450
5 0x00007fffe9a8b948 in std::_Function_handler<void(DB::IBlockInputStream*, DB::IBlockOutputStream*), DB::executeQueryImpl(...)::<lambda(DB::IBlockInputStream*, DB::IBlockOutputStream*)> >::_M_invoke(const std::_Any_data &, DB::IBlockInputStream *&&, DB::IBlockOutputStream *&&) (...) at std_function.h:300
6 0x00007ffff7c7482f in std::function<>::operator()(DB::IBlockInputStream*, DB::IBlockOutputStream*) const (..) at std_function.h:688
7 0x00007ffff7c6f82b in DB::BlockIO::onFinish (this=0x7fffb5820738) at BlockIO.h:43
8 0x00007ffff7c65eb0 in DB::TCPHandler::processOrdinaryQuery (this=0x7fffb581e000) at TCPHandler.cpp:540
9 0x00007ffff7c63a50 in DB::TCPHandler::runImpl (this=0x7fffb581e000) at TCPHandler.cpp:269
10 0x00007ffff7c6a6fd in DB::TCPHandler::run (this=0x7fffb581e000) at TCPHandler.cpp:1226
...
2020-03-03 19:01:20 +00:00
|
|
|
sendProgress();
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2012-07-21 07:02:55 +00:00
|
|
|
|
2017-04-17 16:02:48 +00:00
|
|
|
void TCPHandler::processTablesStatusRequest()
|
|
|
|
{
|
|
|
|
TablesStatusRequest request;
|
2020-09-17 12:15:05 +00:00
|
|
|
request.read(*in, client_tcp_protocol_version);
|
2017-04-17 16:02:48 +00:00
|
|
|
|
|
|
|
TablesStatusResponse response;
|
|
|
|
for (const QualifiedTableName & table_name: request.tables)
|
|
|
|
{
|
2020-03-04 20:29:52 +00:00
|
|
|
auto resolved_id = connection_context.tryResolveStorageID({table_name.database, table_name.table});
|
2020-05-28 23:01:18 +00:00
|
|
|
StoragePtr table = DatabaseCatalog::instance().tryGetTable(resolved_id, connection_context);
|
2017-04-20 16:19:30 +00:00
|
|
|
if (!table)
|
|
|
|
continue;
|
|
|
|
|
2017-04-17 16:02:48 +00:00
|
|
|
TableStatus status;
|
|
|
|
if (auto * replicated_table = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
|
|
|
|
{
|
|
|
|
status.is_replicated = true;
|
|
|
|
status.absolute_delay = replicated_table->getAbsoluteDelay();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
status.is_replicated = false;
|
|
|
|
|
|
|
|
response.table_states_by_id.emplace(table_name, std::move(status));
|
|
|
|
}
|
|
|
|
|
|
|
|
writeVarUInt(Protocol::Server::TablesStatusResponse, *out);
|
2020-09-17 12:15:05 +00:00
|
|
|
response.write(*out, client_tcp_protocol_version);
|
2017-04-17 16:02:48 +00:00
|
|
|
}
|
|
|
|
|
2019-09-03 13:55:26 +00:00
|
|
|
void TCPHandler::receiveUnexpectedTablesStatusRequest()
|
2019-09-03 09:36:16 +00:00
|
|
|
{
|
|
|
|
TablesStatusRequest skip_request;
|
2020-09-17 12:15:05 +00:00
|
|
|
skip_request.read(*in, client_tcp_protocol_version);
|
2019-09-03 09:36:16 +00:00
|
|
|
|
|
|
|
throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
}
|
2017-04-17 16:02:48 +00:00
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
void TCPHandler::sendPartUUIDs()
|
|
|
|
{
|
|
|
|
auto uuids = query_context->getPartUUIDs()->get();
|
|
|
|
if (!uuids.empty())
|
|
|
|
{
|
|
|
|
for (const auto & uuid : uuids)
|
|
|
|
LOG_TRACE(log, "Sending UUID: {}", toString(uuid));
|
|
|
|
|
|
|
|
writeVarUInt(Protocol::Server::PartUUIDs, *out);
|
|
|
|
writeVectorBinary(uuids, *out);
|
|
|
|
out->next();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info)
|
2013-05-22 14:57:43 +00:00
|
|
|
{
|
2019-01-23 14:48:50 +00:00
|
|
|
writeVarUInt(Protocol::Server::ProfileInfo, *out);
|
2019-03-26 18:28:37 +00:00
|
|
|
info.write(*out);
|
2019-01-23 14:48:50 +00:00
|
|
|
out->next();
|
2013-05-22 14:57:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
void TCPHandler::sendTotals(const Block & totals)
|
2013-09-05 20:22:43 +00:00
|
|
|
{
|
2019-01-23 14:48:50 +00:00
|
|
|
if (totals)
|
|
|
|
{
|
|
|
|
initBlockOutput(totals);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
writeVarUInt(Protocol::Server::Totals, *out);
|
|
|
|
writeStringBinary("", *out);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
state.block_out->write(totals);
|
|
|
|
state.maybe_compressed_out->next();
|
|
|
|
out->next();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2013-09-05 20:22:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
void TCPHandler::sendExtremes(const Block & extremes)
|
2013-09-07 02:03:13 +00:00
|
|
|
{
|
2019-01-23 14:48:50 +00:00
|
|
|
if (extremes)
|
|
|
|
{
|
|
|
|
initBlockOutput(extremes);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
writeVarUInt(Protocol::Server::Extremes, *out);
|
|
|
|
writeStringBinary("", *out);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-01-23 14:48:50 +00:00
|
|
|
state.block_out->write(extremes);
|
|
|
|
state.maybe_compressed_out->next();
|
|
|
|
out->next();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2013-09-07 02:03:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-12-02 21:05:51 +00:00
|
|
|
bool TCPHandler::receiveProxyHeader()
|
|
|
|
{
|
|
|
|
if (in->eof())
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Client has not sent any data.");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
String forwarded_address;
|
|
|
|
|
|
|
|
/// Only PROXYv1 is supported.
|
|
|
|
/// Validation of protocol is not fully performed.
|
|
|
|
|
|
|
|
LimitReadBuffer limit_in(*in, 107, true); /// Maximum length from the specs.
|
|
|
|
|
|
|
|
assertString("PROXY ", limit_in);
|
|
|
|
|
|
|
|
if (limit_in.eof())
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Incomplete PROXY header is received.");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// TCP4 / TCP6 / UNKNOWN
|
|
|
|
if ('T' == *limit_in.position())
|
|
|
|
{
|
|
|
|
assertString("TCP", limit_in);
|
|
|
|
|
|
|
|
if (limit_in.eof())
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Incomplete PROXY header is received.");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ('4' != *limit_in.position() && '6' != *limit_in.position())
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Unexpected protocol in PROXY header is received.");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
++limit_in.position();
|
|
|
|
assertChar(' ', limit_in);
|
|
|
|
|
|
|
|
/// Read the first field and ignore other.
|
|
|
|
readStringUntilWhitespace(forwarded_address, limit_in);
|
|
|
|
|
|
|
|
/// Skip until \r\n
|
|
|
|
while (!limit_in.eof() && *limit_in.position() != '\r')
|
|
|
|
++limit_in.position();
|
|
|
|
assertString("\r\n", limit_in);
|
|
|
|
}
|
|
|
|
else if (checkString("UNKNOWN", limit_in))
|
|
|
|
{
|
|
|
|
/// This is just a health check, there is no subsequent data in this connection.
|
|
|
|
|
|
|
|
while (!limit_in.eof() && *limit_in.position() != '\r')
|
|
|
|
++limit_in.position();
|
|
|
|
assertString("\r\n", limit_in);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Unexpected protocol in PROXY header is received.");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Forwarded client address from PROXY header: {}", forwarded_address);
|
|
|
|
connection_context.getClientInfo().forwarded_for = forwarded_address;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
void TCPHandler::receiveHello()
|
2012-05-16 18:03:00 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Receive `hello` packet.
|
|
|
|
UInt64 packet_type = 0;
|
2020-09-14 21:55:43 +00:00
|
|
|
String user;
|
2017-04-01 07:20:54 +00:00
|
|
|
String password;
|
|
|
|
|
|
|
|
readVarUInt(packet_type, *in);
|
|
|
|
if (packet_type != Protocol::Client::Hello)
|
|
|
|
{
|
|
|
|
/** If you accidentally accessed the HTTP protocol for a port destined for an internal TCP protocol,
|
2017-04-17 16:02:48 +00:00
|
|
|
* Then instead of the packet type, there will be G (GET) or P (POST), in most cases.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
if (packet_type == 'G' || packet_type == 'P')
|
|
|
|
{
|
|
|
|
writeString("HTTP/1.0 400 Bad Request\r\n\r\n"
|
|
|
|
"Port " + server.config().getString("tcp_port") + " is for clickhouse-client program.\r\n"
|
|
|
|
"You must use port " + server.config().getString("http_port") + " for HTTP.\r\n",
|
|
|
|
*out);
|
|
|
|
|
|
|
|
throw Exception("Client has connected to wrong port", ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
throw NetException("Unexpected packet from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
}
|
|
|
|
|
|
|
|
readStringBinary(client_name, *in);
|
|
|
|
readVarUInt(client_version_major, *in);
|
|
|
|
readVarUInt(client_version_minor, *in);
|
2018-07-31 21:36:18 +00:00
|
|
|
// NOTE For backward compatibility of the protocol, client cannot send its version_patch.
|
2020-09-17 12:15:05 +00:00
|
|
|
readVarUInt(client_tcp_protocol_version, *in);
|
2017-04-01 07:20:54 +00:00
|
|
|
readStringBinary(default_database, *in);
|
|
|
|
readStringBinary(user, *in);
|
|
|
|
readStringBinary(password, *in);
|
|
|
|
|
2020-09-14 21:55:43 +00:00
|
|
|
if (user.empty())
|
|
|
|
throw NetException("Unexpected packet from client (no user in Hello package)", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.",
|
2020-05-23 22:21:29 +00:00
|
|
|
client_name,
|
|
|
|
client_version_major, client_version_minor, client_version_patch,
|
2020-09-17 12:15:05 +00:00
|
|
|
client_tcp_protocol_version,
|
2020-05-23 22:21:29 +00:00
|
|
|
(!default_database.empty() ? ", database: " + default_database : ""),
|
2020-09-14 21:55:43 +00:00
|
|
|
(!user.empty() ? ", user: " + user : "")
|
|
|
|
);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-09-14 21:55:43 +00:00
|
|
|
if (user != USER_INTERSERVER_MARKER)
|
|
|
|
{
|
|
|
|
connection_context.setUser(user, password, socket().peerAddress());
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
receiveClusterNameAndSalt();
|
|
|
|
}
|
2012-05-16 18:03:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-09-03 09:36:16 +00:00
|
|
|
void TCPHandler::receiveUnexpectedHello()
|
|
|
|
{
|
|
|
|
UInt64 skip_uint_64;
|
|
|
|
String skip_string;
|
|
|
|
|
|
|
|
readStringBinary(skip_string, *in);
|
|
|
|
readVarUInt(skip_uint_64, *in);
|
|
|
|
readVarUInt(skip_uint_64, *in);
|
2020-04-15 01:58:10 +00:00
|
|
|
readVarUInt(skip_uint_64, *in);
|
2019-09-03 09:36:16 +00:00
|
|
|
readStringBinary(skip_string, *in);
|
|
|
|
readStringBinary(skip_string, *in);
|
|
|
|
readStringBinary(skip_string, *in);
|
|
|
|
|
|
|
|
throw NetException("Unexpected packet Hello received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
void TCPHandler::sendHello()
|
2012-03-11 08:52:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeVarUInt(Protocol::Server::Hello, *out);
|
|
|
|
writeStringBinary(DBMS_NAME, *out);
|
|
|
|
writeVarUInt(DBMS_VERSION_MAJOR, *out);
|
|
|
|
writeVarUInt(DBMS_VERSION_MINOR, *out);
|
2020-09-17 12:15:05 +00:00
|
|
|
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
|
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
|
2017-04-01 07:20:54 +00:00
|
|
|
writeStringBinary(DateLUT::instance().getTimeZone(), *out);
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
|
2018-03-08 07:36:58 +00:00
|
|
|
writeStringBinary(server_display_name, *out);
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
|
2018-07-31 21:36:18 +00:00
|
|
|
writeVarUInt(DBMS_VERSION_PATCH, *out);
|
2017-04-01 07:20:54 +00:00
|
|
|
out->next();
|
2012-03-11 08:52:56 +00:00
|
|
|
}
|
|
|
|
|
2012-05-17 19:15:53 +00:00
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
bool TCPHandler::receivePacket()
|
2012-03-11 08:52:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 packet_type = 0;
|
|
|
|
readVarUInt(packet_type, *in);
|
|
|
|
|
2019-09-03 09:36:16 +00:00
|
|
|
// std::cerr << "Server got packet: " << Protocol::Client::toString(packet_type) << "\n";
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
switch (packet_type)
|
|
|
|
{
|
2020-11-20 17:23:53 +00:00
|
|
|
case Protocol::Client::IgnoredPartUUIDs:
|
|
|
|
/// Part uuids packet if any comes before query.
|
|
|
|
receiveIgnoredPartUUIDs();
|
|
|
|
return true;
|
2017-04-01 07:20:54 +00:00
|
|
|
case Protocol::Client::Query:
|
|
|
|
if (!state.empty())
|
2019-09-03 09:36:16 +00:00
|
|
|
receiveUnexpectedQuery();
|
2017-04-01 07:20:54 +00:00
|
|
|
receiveQuery();
|
|
|
|
return true;
|
|
|
|
|
|
|
|
case Protocol::Client::Data:
|
2019-10-19 20:36:35 +00:00
|
|
|
case Protocol::Client::Scalar:
|
2017-04-01 07:20:54 +00:00
|
|
|
if (state.empty())
|
2019-09-03 09:36:16 +00:00
|
|
|
receiveUnexpectedData();
|
2019-10-19 20:36:35 +00:00
|
|
|
return receiveData(packet_type == Protocol::Client::Scalar);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
case Protocol::Client::Ping:
|
|
|
|
writeVarUInt(Protocol::Server::Pong, *out);
|
|
|
|
out->next();
|
|
|
|
return false;
|
|
|
|
|
|
|
|
case Protocol::Client::Cancel:
|
|
|
|
return false;
|
|
|
|
|
|
|
|
case Protocol::Client::Hello:
|
2019-09-03 09:36:16 +00:00
|
|
|
receiveUnexpectedHello();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-17 16:02:48 +00:00
|
|
|
case Protocol::Client::TablesStatusRequest:
|
|
|
|
if (!state.empty())
|
2019-09-03 13:55:26 +00:00
|
|
|
receiveUnexpectedTablesStatusRequest();
|
2017-04-17 16:02:48 +00:00
|
|
|
processTablesStatusRequest();
|
|
|
|
out->next();
|
|
|
|
return false;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
default:
|
2018-04-04 12:11:38 +00:00
|
|
|
throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2012-03-09 15:46:52 +00:00
|
|
|
}
|
|
|
|
|
2020-11-20 17:23:53 +00:00
|
|
|
void TCPHandler::receiveIgnoredPartUUIDs()
|
|
|
|
{
|
|
|
|
state.part_uuids = true;
|
|
|
|
std::vector<UUID> uuids;
|
|
|
|
readVectorBinary(uuids, *in);
|
|
|
|
|
|
|
|
if (!uuids.empty())
|
|
|
|
query_context->getIgnoredPartUUIDs()->add(uuids);
|
|
|
|
}
|
|
|
|
|
2020-09-14 21:55:43 +00:00
|
|
|
void TCPHandler::receiveClusterNameAndSalt()
|
|
|
|
{
|
|
|
|
readStringBinary(cluster, *in);
|
|
|
|
readStringBinary(salt, *in, 32);
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (salt.empty())
|
|
|
|
throw NetException("Empty salt is not allowed", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
|
|
|
|
cluster_secret = query_context->getCluster(cluster)->getSecret();
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
/// We try to send error information to the client.
|
|
|
|
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
|
|
|
|
}
|
|
|
|
catch (...) {}
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2012-05-17 19:15:53 +00:00
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
void TCPHandler::receiveQuery()
|
2012-03-11 08:52:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 stage = 0;
|
|
|
|
UInt64 compression = 0;
|
|
|
|
|
|
|
|
state.is_empty = false;
|
|
|
|
readStringBinary(state.query_id, *in);
|
|
|
|
|
|
|
|
/// Client info
|
2020-02-28 18:55:21 +00:00
|
|
|
ClientInfo & client_info = query_context->getClientInfo();
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
|
|
|
|
client_info.read(*in, client_tcp_protocol_version);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-02-28 18:55:21 +00:00
|
|
|
/// For better support of old clients, that does not send ClientInfo.
|
|
|
|
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
|
|
|
|
{
|
|
|
|
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
|
|
|
|
client_info.client_name = client_name;
|
|
|
|
client_info.client_version_major = client_version_major;
|
|
|
|
client_info.client_version_minor = client_version_minor;
|
|
|
|
client_info.client_version_patch = client_version_patch;
|
2020-09-17 12:15:05 +00:00
|
|
|
client_info.client_tcp_protocol_version = client_tcp_protocol_version;
|
2020-02-28 18:55:21 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-02-28 18:55:21 +00:00
|
|
|
/// Set fields, that are known apriori.
|
|
|
|
client_info.interface = ClientInfo::Interface::TCP;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-02-28 18:55:21 +00:00
|
|
|
/// Per query settings are also passed via TCP.
|
|
|
|
/// We need to check them before applying due to they can violate the settings constraints.
|
2020-09-17 12:15:05 +00:00
|
|
|
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
|
2020-07-20 09:57:17 +00:00
|
|
|
: SettingsWriteFormat::BINARY;
|
2020-02-28 18:55:21 +00:00
|
|
|
Settings passed_settings;
|
2020-07-20 09:57:17 +00:00
|
|
|
passed_settings.read(*in, settings_format);
|
2020-09-14 21:55:43 +00:00
|
|
|
|
|
|
|
/// Interserver secret.
|
|
|
|
std::string received_hash;
|
2020-09-17 14:55:41 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
|
2020-09-14 21:55:43 +00:00
|
|
|
{
|
|
|
|
readStringBinary(received_hash, *in, 32);
|
|
|
|
}
|
|
|
|
|
|
|
|
readVarUInt(stage, *in);
|
|
|
|
state.stage = QueryProcessingStage::Enum(stage);
|
|
|
|
|
|
|
|
readVarUInt(compression, *in);
|
|
|
|
state.compression = static_cast<Protocol::Compression>(compression);
|
|
|
|
|
|
|
|
readStringBinary(state.query, *in);
|
|
|
|
|
|
|
|
/// It is OK to check only when query != INITIAL_QUERY,
|
|
|
|
/// since only in that case the actions will be done.
|
|
|
|
if (!cluster.empty() && client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
|
|
|
|
{
|
|
|
|
#if USE_SSL
|
|
|
|
std::string data(salt);
|
|
|
|
data += cluster_secret;
|
|
|
|
data += state.query;
|
|
|
|
data += state.query_id;
|
|
|
|
data += client_info.initial_user;
|
|
|
|
|
2020-09-10 18:24:53 +00:00
|
|
|
if (received_hash.size() != 32)
|
2020-09-14 21:55:43 +00:00
|
|
|
throw NetException("Unexpected hash received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
|
2020-09-10 18:24:53 +00:00
|
|
|
std::string calculated_hash = encodeSHA256(data);
|
2020-09-14 21:55:43 +00:00
|
|
|
|
|
|
|
if (calculated_hash != received_hash)
|
|
|
|
throw NetException("Hash mismatch", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
/// TODO: change error code?
|
|
|
|
|
|
|
|
/// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka,
|
|
|
|
/// i.e. when the INSERT is done with the global context (w/o user).
|
|
|
|
if (!client_info.initial_user.empty())
|
|
|
|
{
|
2020-09-17 14:14:46 +00:00
|
|
|
query_context->setUserWithoutCheckingPassword(client_info.initial_user, client_info.initial_address);
|
2020-09-14 21:55:43 +00:00
|
|
|
LOG_DEBUG(log, "User (initial): {}", query_context->getUserName());
|
|
|
|
}
|
|
|
|
/// No need to update connection_context, since it does not requires user (it will not be used for query execution)
|
|
|
|
#else
|
|
|
|
throw Exception(
|
|
|
|
"Inter-server secret support is disabled, because ClickHouse was built without SSL library",
|
|
|
|
ErrorCodes::SUPPORT_IS_DISABLED);
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
query_context->setInitialRowPolicy();
|
|
|
|
}
|
|
|
|
|
|
|
|
///
|
|
|
|
/// Settings
|
|
|
|
///
|
2020-02-28 18:55:21 +00:00
|
|
|
auto settings_changes = passed_settings.changes();
|
|
|
|
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
|
|
|
|
{
|
|
|
|
/// Throw an exception if the passed settings violate the constraints.
|
|
|
|
query_context->checkSettingsConstraints(settings_changes);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// Quietly clamp to the constraints if it's not an initial query.
|
|
|
|
query_context->clampToSettingsConstraints(settings_changes);
|
|
|
|
}
|
2019-12-16 19:03:03 +00:00
|
|
|
query_context->applySettingsChanges(settings_changes);
|
2020-09-08 13:19:27 +00:00
|
|
|
|
|
|
|
// Use the received query id, or generate a random default. It is convenient
|
|
|
|
// to also generate the default OpenTelemetry trace id at the same time, and
|
2020-09-17 12:15:05 +00:00
|
|
|
// set the trace parent.
|
2020-09-08 13:19:27 +00:00
|
|
|
// Why is this done here and not earlier:
|
|
|
|
// 1) ClientInfo might contain upstream trace id, so we decide whether to use
|
|
|
|
// the default ids after we have received the ClientInfo.
|
|
|
|
// 2) There is the opentelemetry_start_trace_probability setting that
|
|
|
|
// controls when we start a new trace. It can be changed via Native protocol,
|
|
|
|
// so we have to apply the changes first.
|
|
|
|
query_context->setCurrentQueryId(state.query_id);
|
2018-03-28 14:07:28 +00:00
|
|
|
|
2020-10-19 21:26:10 +00:00
|
|
|
// Set parameters of initial query.
|
|
|
|
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
|
|
|
|
{
|
|
|
|
/// 'Current' fields was set at receiveHello.
|
|
|
|
client_info.initial_user = client_info.current_user;
|
|
|
|
client_info.initial_query_id = client_info.current_query_id;
|
|
|
|
client_info.initial_address = client_info.current_address;
|
|
|
|
}
|
|
|
|
|
2018-03-28 14:07:28 +00:00
|
|
|
/// Sync timeouts on client and server during current query to avoid dangling queries on server
|
2018-03-29 13:33:48 +00:00
|
|
|
/// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),
|
|
|
|
/// because settings.send_timeout is client-side setting which has opposite meaning on the server side.
|
2018-03-28 14:07:28 +00:00
|
|
|
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
|
2020-09-08 13:19:27 +00:00
|
|
|
const Settings & settings = query_context->getSettingsRef();
|
2018-03-29 13:33:48 +00:00
|
|
|
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.receive_timeout, settings.send_timeout);
|
2012-03-11 08:52:56 +00:00
|
|
|
}
|
|
|
|
|
2019-09-03 09:36:16 +00:00
|
|
|
void TCPHandler::receiveUnexpectedQuery()
|
|
|
|
{
|
|
|
|
UInt64 skip_uint_64;
|
|
|
|
String skip_string;
|
|
|
|
|
|
|
|
readStringBinary(skip_string, *in);
|
|
|
|
|
2020-03-13 14:50:26 +00:00
|
|
|
ClientInfo skip_client_info;
|
2020-09-17 12:15:05 +00:00
|
|
|
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
|
|
|
|
skip_client_info.read(*in, client_tcp_protocol_version);
|
2019-09-03 09:36:16 +00:00
|
|
|
|
2020-03-13 14:50:26 +00:00
|
|
|
Settings skip_settings;
|
2020-09-17 12:15:05 +00:00
|
|
|
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
|
2020-07-20 09:57:17 +00:00
|
|
|
: SettingsWriteFormat::BINARY;
|
|
|
|
skip_settings.read(*in, settings_format);
|
2019-09-03 09:36:16 +00:00
|
|
|
|
2020-09-14 21:55:43 +00:00
|
|
|
std::string skip_hash;
|
2020-09-17 14:55:41 +00:00
|
|
|
bool interserver_secret = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET;
|
2020-09-14 21:55:43 +00:00
|
|
|
if (interserver_secret)
|
|
|
|
readStringBinary(skip_hash, *in, 32);
|
|
|
|
|
2019-09-03 09:36:16 +00:00
|
|
|
readVarUInt(skip_uint_64, *in);
|
|
|
|
readVarUInt(skip_uint_64, *in);
|
|
|
|
readStringBinary(skip_string, *in);
|
|
|
|
|
|
|
|
throw NetException("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
}
|
2012-05-21 06:49:05 +00:00
|
|
|
|
2019-10-19 20:36:35 +00:00
|
|
|
bool TCPHandler::receiveData(bool scalar)
|
2013-09-05 20:22:43 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
initBlockInput();
|
|
|
|
|
|
|
|
/// The name of the temporary table for writing data, default to empty string
|
2020-02-12 18:14:12 +00:00
|
|
|
auto temporary_id = StorageID::createEmpty();
|
|
|
|
readStringBinary(temporary_id.table_name, *in);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Read one block from the network and write it down
|
|
|
|
Block block = state.block_in->read();
|
|
|
|
|
|
|
|
if (block)
|
|
|
|
{
|
2019-10-19 20:36:35 +00:00
|
|
|
if (scalar)
|
2020-02-12 18:14:12 +00:00
|
|
|
query_context->addScalar(temporary_id.table_name, block);
|
2019-10-19 20:36:35 +00:00
|
|
|
else
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-10-19 20:36:35 +00:00
|
|
|
/// If there is an insert request, then the data should be written directly to `state.io.out`.
|
|
|
|
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
|
|
|
|
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-02-12 18:14:12 +00:00
|
|
|
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
|
2019-10-19 20:36:35 +00:00
|
|
|
StoragePtr storage;
|
|
|
|
/// If such a table does not exist, create it.
|
2020-03-10 19:36:17 +00:00
|
|
|
if (resolved)
|
2020-05-28 23:01:18 +00:00
|
|
|
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
|
2020-03-10 19:36:17 +00:00
|
|
|
else
|
2019-10-19 20:36:35 +00:00
|
|
|
{
|
|
|
|
NamesAndTypesList columns = block.getNamesAndTypesList();
|
2020-05-29 02:08:48 +00:00
|
|
|
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
|
2020-03-10 19:36:17 +00:00
|
|
|
storage = temporary_table.getTable();
|
|
|
|
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
|
|
|
|
}
|
2020-06-15 19:08:58 +00:00
|
|
|
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
|
2019-10-19 20:36:35 +00:00
|
|
|
/// The data will be written directly to the table.
|
2020-06-15 19:08:58 +00:00
|
|
|
state.io.out = storage->write(ASTPtr(), metadata_snapshot, *query_context);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2019-10-19 20:36:35 +00:00
|
|
|
if (state.need_receive_data_for_input)
|
|
|
|
state.block_for_input = block;
|
|
|
|
else
|
|
|
|
state.io.out->write(block);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
return false;
|
2013-09-05 20:22:43 +00:00
|
|
|
}
|
|
|
|
|
2019-09-03 09:36:16 +00:00
|
|
|
void TCPHandler::receiveUnexpectedData()
|
|
|
|
{
|
|
|
|
String skip_external_table_name;
|
|
|
|
readStringBinary(skip_external_table_name, *in);
|
|
|
|
|
|
|
|
std::shared_ptr<ReadBuffer> maybe_compressed_in;
|
|
|
|
|
|
|
|
if (last_block_in.compression == Protocol::Compression::Enable)
|
2021-01-06 00:24:42 +00:00
|
|
|
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
|
2019-09-03 09:36:16 +00:00
|
|
|
else
|
|
|
|
maybe_compressed_in = in;
|
|
|
|
|
|
|
|
auto skip_block_in = std::make_shared<NativeBlockInputStream>(
|
|
|
|
*maybe_compressed_in,
|
|
|
|
last_block_in.header,
|
2020-09-17 12:15:05 +00:00
|
|
|
client_tcp_protocol_version);
|
2019-09-03 09:36:16 +00:00
|
|
|
|
2020-03-19 23:48:53 +00:00
|
|
|
skip_block_in->read();
|
2019-09-03 09:36:16 +00:00
|
|
|
throw NetException("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
}
|
2013-09-05 20:22:43 +00:00
|
|
|
|
|
|
|
void TCPHandler::initBlockInput()
|
2012-03-11 08:52:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!state.block_in)
|
|
|
|
{
|
2021-01-06 00:24:42 +00:00
|
|
|
/// 'allow_different_codecs' is set to true, because some parts of compressed data can be precompressed in advance
|
|
|
|
/// with another codec that the rest of the data. Example: data sent by Distributed tables.
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (state.compression == Protocol::Compression::Enable)
|
2021-01-06 00:24:42 +00:00
|
|
|
state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in, /* allow_different_codecs */ true);
|
2017-04-01 07:20:54 +00:00
|
|
|
else
|
|
|
|
state.maybe_compressed_in = in;
|
|
|
|
|
2018-12-06 11:24:07 +00:00
|
|
|
Block header;
|
|
|
|
if (state.io.out)
|
|
|
|
header = state.io.out->getHeader();
|
2019-05-28 18:30:10 +00:00
|
|
|
else if (state.need_receive_data_for_input)
|
|
|
|
header = state.input_header;
|
2018-12-06 11:24:07 +00:00
|
|
|
|
2019-09-03 09:36:16 +00:00
|
|
|
last_block_in.header = header;
|
|
|
|
last_block_in.compression = state.compression;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
state.block_in = std::make_shared<NativeBlockInputStream>(
|
|
|
|
*state.maybe_compressed_in,
|
2018-12-06 11:24:07 +00:00
|
|
|
header,
|
2020-09-17 12:15:05 +00:00
|
|
|
client_tcp_protocol_version);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2013-09-05 20:22:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-19 00:45:32 +00:00
|
|
|
void TCPHandler::initBlockOutput(const Block & block)
|
2013-09-05 20:22:43 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!state.block_out)
|
|
|
|
{
|
2018-06-14 15:33:59 +00:00
|
|
|
if (!state.maybe_compressed_out)
|
|
|
|
{
|
2020-05-04 00:11:49 +00:00
|
|
|
const Settings & query_settings = query_context->getSettingsRef();
|
|
|
|
|
|
|
|
std::string method = Poco::toUpper(query_settings.network_compression_method.toString());
|
2018-12-21 12:17:30 +00:00
|
|
|
std::optional<int> level;
|
2018-12-21 13:25:39 +00:00
|
|
|
if (method == "ZSTD")
|
2020-05-04 00:11:49 +00:00
|
|
|
level = query_settings.network_zstd_compression_level;
|
2018-12-21 12:17:30 +00:00
|
|
|
|
2018-06-14 15:33:59 +00:00
|
|
|
if (state.compression == Protocol::Compression::Enable)
|
2020-08-28 17:40:45 +00:00
|
|
|
{
|
|
|
|
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs);
|
|
|
|
|
2018-06-14 15:33:59 +00:00
|
|
|
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
|
2020-08-28 17:40:45 +00:00
|
|
|
*out, CompressionCodecFactory::instance().get(method, level));
|
|
|
|
}
|
2018-06-14 15:33:59 +00:00
|
|
|
else
|
|
|
|
state.maybe_compressed_out = out;
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
state.block_out = std::make_shared<NativeBlockOutputStream>(
|
|
|
|
*state.maybe_compressed_out,
|
2020-09-17 12:15:05 +00:00
|
|
|
client_tcp_protocol_version,
|
2018-12-19 16:47:30 +00:00
|
|
|
block.cloneEmpty(),
|
|
|
|
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2012-03-19 12:57:56 +00:00
|
|
|
}
|
2012-03-11 08:52:56 +00:00
|
|
|
|
2018-06-06 20:57:07 +00:00
|
|
|
void TCPHandler::initLogsBlockOutput(const Block & block)
|
|
|
|
{
|
|
|
|
if (!state.logs_block_out)
|
|
|
|
{
|
2018-06-14 15:33:59 +00:00
|
|
|
/// Use uncompressed stream since log blocks usually contain only one row
|
2018-06-06 20:57:07 +00:00
|
|
|
state.logs_block_out = std::make_shared<NativeBlockOutputStream>(
|
2018-06-14 15:33:59 +00:00
|
|
|
*out,
|
2020-09-17 12:15:05 +00:00
|
|
|
client_tcp_protocol_version,
|
2018-12-19 16:47:30 +00:00
|
|
|
block.cloneEmpty(),
|
|
|
|
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
|
2018-06-06 20:57:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
bool TCPHandler::isQueryCancelled()
|
2012-05-09 08:16:09 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (state.is_cancelled || state.sent_all_data)
|
|
|
|
return true;
|
|
|
|
|
2019-03-06 21:32:26 +00:00
|
|
|
if (after_check_cancelled.elapsed() / 1000 < query_context->getSettingsRef().interactive_delay)
|
2017-04-01 07:20:54 +00:00
|
|
|
return false;
|
|
|
|
|
|
|
|
after_check_cancelled.restart();
|
|
|
|
|
|
|
|
/// During request execution the only packet that can come from the client is stopping the query.
|
|
|
|
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(0))
|
|
|
|
{
|
2020-10-24 03:41:47 +00:00
|
|
|
if (in->eof())
|
|
|
|
{
|
|
|
|
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
|
|
|
|
state.is_cancelled = true;
|
|
|
|
state.is_connection_closed = true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 packet_type = 0;
|
|
|
|
readVarUInt(packet_type, *in);
|
|
|
|
|
|
|
|
switch (packet_type)
|
|
|
|
{
|
|
|
|
case Protocol::Client::Cancel:
|
|
|
|
if (state.empty())
|
|
|
|
throw NetException("Unexpected packet Cancel received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
|
|
|
LOG_INFO(log, "Query was cancelled.");
|
|
|
|
state.is_cancelled = true;
|
|
|
|
return true;
|
|
|
|
|
|
|
|
default:
|
|
|
|
throw NetException("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
2012-05-09 08:16:09 +00:00
|
|
|
}
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
2018-01-07 00:35:44 +00:00
|
|
|
void TCPHandler::sendData(const Block & block)
|
2012-03-19 12:57:56 +00:00
|
|
|
{
|
2018-02-19 00:45:32 +00:00
|
|
|
initBlockOutput(block);
|
2012-03-11 08:52:56 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
writeVarUInt(Protocol::Server::Data, *out);
|
2018-06-06 20:57:07 +00:00
|
|
|
/// Send external table name (empty name is the main table)
|
2017-08-16 20:27:35 +00:00
|
|
|
writeStringBinary("", *out);
|
2012-05-21 06:49:05 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
state.block_out->write(block);
|
|
|
|
state.maybe_compressed_out->next();
|
|
|
|
out->next();
|
2012-03-19 12:57:56 +00:00
|
|
|
}
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
2018-06-06 20:57:07 +00:00
|
|
|
void TCPHandler::sendLogData(const Block & block)
|
|
|
|
{
|
|
|
|
initLogsBlockOutput(block);
|
|
|
|
|
|
|
|
writeVarUInt(Protocol::Server::Log, *out);
|
|
|
|
/// Send log tag (empty tag is the default tag)
|
|
|
|
writeStringBinary("", *out);
|
|
|
|
|
|
|
|
state.logs_block_out->write(block);
|
|
|
|
out->next();
|
|
|
|
}
|
|
|
|
|
2018-12-04 20:03:04 +00:00
|
|
|
void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
|
|
|
|
{
|
|
|
|
writeVarUInt(Protocol::Server::TableColumns, *out);
|
|
|
|
|
|
|
|
/// Send external table name (empty name is the main table)
|
|
|
|
writeStringBinary("", *out);
|
|
|
|
writeStringBinary(columns.toString(), *out);
|
|
|
|
|
|
|
|
out->next();
|
|
|
|
}
|
2018-06-06 20:57:07 +00:00
|
|
|
|
2018-08-24 07:30:53 +00:00
|
|
|
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
|
2012-03-19 12:57:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeVarUInt(Protocol::Server::Exception, *out);
|
2018-08-24 07:30:53 +00:00
|
|
|
writeException(e, *out, with_stack_trace);
|
2017-04-01 07:20:54 +00:00
|
|
|
out->next();
|
2012-03-19 12:57:56 +00:00
|
|
|
}
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
|
|
|
void TCPHandler::sendEndOfStream()
|
2012-05-08 11:19:00 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
state.sent_all_data = true;
|
|
|
|
writeVarUInt(Protocol::Server::EndOfStream, *out);
|
|
|
|
out->next();
|
2012-05-08 11:19:00 +00:00
|
|
|
}
|
|
|
|
|
2012-05-21 06:49:05 +00:00
|
|
|
|
2014-10-25 18:33:52 +00:00
|
|
|
void TCPHandler::updateProgress(const Progress & value)
|
2012-03-19 12:57:56 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
state.progress.incrementPiecewiseAtomically(value);
|
2013-11-02 21:18:54 +00:00
|
|
|
}
|
2012-05-09 15:15:45 +00:00
|
|
|
|
|
|
|
|
2013-11-02 21:18:54 +00:00
|
|
|
void TCPHandler::sendProgress()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeVarUInt(Protocol::Server::Progress, *out);
|
2018-03-09 23:04:26 +00:00
|
|
|
auto increment = state.progress.fetchAndResetPiecewiseAtomically();
|
2020-09-17 12:15:05 +00:00
|
|
|
increment.write(*out, client_tcp_protocol_version);
|
2017-04-01 07:20:54 +00:00
|
|
|
out->next();
|
2012-03-11 08:52:56 +00:00
|
|
|
}
|
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
|
2018-06-06 20:57:07 +00:00
|
|
|
void TCPHandler::sendLogs()
|
|
|
|
{
|
|
|
|
if (!state.logs_queue)
|
|
|
|
return;
|
|
|
|
|
|
|
|
MutableColumns logs_columns;
|
|
|
|
MutableColumns curr_logs_columns;
|
|
|
|
size_t rows = 0;
|
|
|
|
|
|
|
|
for (; state.logs_queue->tryPop(curr_logs_columns); ++rows)
|
|
|
|
{
|
|
|
|
if (rows == 0)
|
|
|
|
{
|
|
|
|
logs_columns = std::move(curr_logs_columns);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
for (size_t j = 0; j < logs_columns.size(); ++j)
|
|
|
|
logs_columns[j]->insertRangeFrom(*curr_logs_columns[j], 0, curr_logs_columns[j]->size());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (rows > 0)
|
|
|
|
{
|
2018-06-15 17:32:35 +00:00
|
|
|
Block block = InternalTextLogsQueue::getSampleBlock();
|
2018-06-06 20:57:07 +00:00
|
|
|
block.setColumns(std::move(logs_columns));
|
|
|
|
sendLogData(block);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-03-09 15:46:52 +00:00
|
|
|
void TCPHandler::run()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
runImpl();
|
|
|
|
|
2020-10-10 17:47:34 +00:00
|
|
|
LOG_DEBUG(log, "Done processing connection.");
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (Poco::Exception & e)
|
|
|
|
{
|
|
|
|
/// Timeout - not an error.
|
|
|
|
if (!strcmp(e.what(), "Timeout"))
|
|
|
|
{
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Poco::Exception. Code: {}, e.code() = {}, e.displayText() = {}, e.what() = {}", ErrorCodes::POCO_EXCEPTION, e.code(), e.displayText(), e.what());
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
throw;
|
|
|
|
}
|
2012-03-09 15:46:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|