Merge pull request #63781 from ClickHouse/ft-chunked-protocol

Add chunked wrapper to native protocol
This commit is contained in:
Yakov Olkhovskiy 2024-08-14 10:35:20 +00:00 committed by GitHub
commit f4280203a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1041 additions and 66 deletions

View File

@ -75,6 +75,8 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const String & proto_send_chunked_,
const String & proto_recv_chunked_,
const String & quota_key_,
const String & stage,
bool randomize_,
@ -128,7 +130,9 @@ public:
connections.emplace_back(std::make_unique<ConnectionPool>(
concurrency,
cur_host, cur_port,
default_database_, user_, password_, quota_key_,
default_database_, user_, password_,
proto_send_chunked_, proto_recv_chunked_,
quota_key_,
/* cluster_= */ "",
/* cluster_secret_= */ "",
/* client_name_= */ std::string(DEFAULT_CLIENT_NAME),
@ -662,6 +666,50 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
Strings hosts = options.count("host") ? options["host"].as<Strings>() : Strings({"localhost"});
String proto_send_chunked {"notchunked"};
String proto_recv_chunked {"notchunked"};
if (options.count("proto_caps"))
{
std::string proto_caps_str = options["proto_caps"].as<std::string>();
std::vector<std::string_view> proto_caps;
splitInto<','>(proto_caps, proto_caps_str);
for (auto cap_str : proto_caps)
{
std::string direction;
if (cap_str.starts_with("send_"))
{
direction = "send";
cap_str = cap_str.substr(std::string_view("send_").size());
}
else if (cap_str.starts_with("recv_"))
{
direction = "recv";
cap_str = cap_str.substr(std::string_view("recv_").size());
}
if (cap_str != "chunked" && cap_str != "notchunked" && cap_str != "chunked_optional" && cap_str != "notchunked_optional")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "proto_caps option is incorrect ({})", proto_caps_str);
if (direction.empty())
{
proto_send_chunked = cap_str;
proto_recv_chunked = cap_str;
}
else
{
if (direction == "send")
proto_send_chunked = cap_str;
else
proto_recv_chunked = cap_str;
}
}
}
Benchmark benchmark(
options["concurrency"].as<unsigned>(),
options["delay"].as<double>(),
@ -673,6 +721,8 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["database"].as<std::string>(),
options["user"].as<std::string>(),
options["password"].as<std::string>(),
proto_send_chunked,
proto_recv_chunked,
options["quota_key"].as<std::string>(),
options["stage"].as<std::string>(),
options.count("randomize"),

View File

@ -38,6 +38,21 @@
<production>{display_name} \e[1;31m:)\e[0m </production> <!-- if it matched to the substring "production" in the server display name -->
</prompt_by_server_display_name>
<!-- Chunked capabilities for native protocol by client.
Can be enabled separately for send and receive channels.
Supported modes:
- chunked - client will only work with server supporting chunked protocol;
- chunked_optional - client prefer server to enable chunked protocol, but can switch to notchunked if server does not support this;
- notchunked - client will only work with server supporting notchunked protocol (current default);
- notchunked_optional - client prefer server notchunked protocol, but can switch to chunked if server does not support this.
-->
<!--
<proto_caps>
<send>chunked_optional</send>
<recv>chunked_optional</recv>
</proto_caps>
-->
<!--
Settings adjustable via command-line parameters
can take their defaults from that config file, see examples:

View File

@ -150,6 +150,21 @@
-->
<tcp_port>9000</tcp_port>
<!-- Chunked capabilities for native protocol by server.
Can be enabled separately for send and receive channels.
Supported modes:
- chunked - server requires from client to have chunked enabled;
- chunked_optional - server supports both chunked and notchunked protocol;
- notchunked - server requires from client notchunked protocol (current default);
- notchunked_optional - server supports both chunked and notchunked protocol.
-->
<!--
<proto_caps>
<send>notchunked_optional</send>
<recv>notchunked_optional</recv>
</proto_caps>
-->
<!-- Compatibility with MySQL protocol.
ClickHouse will pretend to be MySQL for applications connecting to this port.
-->

View File

@ -158,6 +158,8 @@ void ClientApplicationBase::init(int argc, char ** argv)
("config-file,C", po::value<std::string>(), "config-file path")
("proto_caps", po::value<std::string>(), "enable/disable chunked protocol: chunked_optional, notchunked, notchunked_optional, send_chunked, send_chunked_optional, send_notchunked, send_notchunked_optional, recv_chunked, recv_chunked_optional, recv_notchunked, recv_notchunked_optional")
("query,q", po::value<std::vector<std::string>>()->multitoken(), R"(Query. Can be specified multiple times (--query "SELECT 1" --query "SELECT 2") or once with multiple comma-separated queries (--query "SELECT 1; SELECT 2;"). In the latter case, INSERT queries with non-VALUE format must be separated by empty lines.)")
("queries-file", po::value<std::vector<std::string>>()->multitoken(), "file path with queries to execute; multiple files can be specified (--queries-file file1 file2...)")
("multiquery,n", "Obsolete, does nothing")
@ -337,6 +339,41 @@ void ClientApplicationBase::init(int argc, char ** argv)
if (options.count("server_logs_file"))
server_logs_file = options["server_logs_file"].as<std::string>();
if (options.count("proto_caps"))
{
std::string proto_caps_str = options["proto_caps"].as<std::string>();
std::vector<std::string_view> proto_caps;
splitInto<','>(proto_caps, proto_caps_str);
for (auto cap_str : proto_caps)
{
std::string direction;
if (cap_str.starts_with("send_"))
{
direction = "send";
cap_str = cap_str.substr(std::string_view("send_").size());
}
else if (cap_str.starts_with("recv_"))
{
direction = "recv";
cap_str = cap_str.substr(std::string_view("recv_").size());
}
if (cap_str != "chunked" && cap_str != "notchunked" && cap_str != "chunked_optional" && cap_str != "notchunked_optional")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "proto_caps option is incorrect ({})", proto_caps_str);
if (direction.empty())
{
config().setString("proto_caps.send", std::string(cap_str));
config().setString("proto_caps.recv", std::string(cap_str));
}
else
config().setString("proto_caps." + direction, std::string(cap_str));
}
}
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
query_kind = parseQueryKind(options["query_kind"].as<std::string>());
profile_events.print = options.count("print-profile-events");

View File

@ -73,9 +73,11 @@
#include <limits>
#include <map>
#include <memory>
#include <string_view>
#include <unordered_map>
#include <Common/config_version.h>
#include <base/find_symbols.h>
#include "config.h"
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
@ -914,6 +916,8 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
}
catch (Exception & e)
{
if (server_exception)
server_exception->rethrow();
if (!is_interactive)
e.addMessage("(in query: {})", full_query);
throw;
@ -1032,19 +1036,28 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
query_interrupt_handler.start(signals_before_stop);
SCOPE_EXIT({ query_interrupt_handler.stop(); });
connection->sendQuery(
connection_parameters.timeouts,
query,
query_parameters,
client_context->getCurrentQueryId(),
query_processing_stage,
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
[&](const Progress & progress) { onProgress(progress); });
try {
connection->sendQuery(
connection_parameters.timeouts,
query,
query_parameters,
client_context->getCurrentQueryId(),
query_processing_stage,
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
[&](const Progress & progress) { onProgress(progress); });
if (send_external_tables)
sendExternalTables(parsed_query);
}
catch (const NetException &)
{
// We still want to attempt to process whatever we already received or can receive (socket receive buffer can be not empty)
receiveResult(parsed_query, signals_before_stop, settings.partial_result_on_first_cancel);
throw;
}
if (send_external_tables)
sendExternalTables(parsed_query);
receiveResult(parsed_query, signals_before_stop, settings.partial_result_on_first_cancel);
break;

View File

@ -5,8 +5,6 @@
#include <Core/Settings.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
@ -85,6 +83,7 @@ Connection::~Connection()
Connection::Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & proto_send_chunked_, const String & proto_recv_chunked_,
[[maybe_unused]] const SSHKey & ssh_private_key_,
const String & jwt_,
const String & quota_key_,
@ -95,6 +94,7 @@ Connection::Connection(const String & host_, UInt16 port_,
Protocol::Secure secure_)
: host(host_), port(port_), default_database(default_database_)
, user(user_), password(password_)
, proto_send_chunked(proto_send_chunked_), proto_recv_chunked(proto_recv_chunked_)
#if USE_SSH
, ssh_private_key(ssh_private_key_)
#endif
@ -211,10 +211,10 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
, tcp_keep_alive_timeout_in_sec);
}
in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
in = std::make_shared<ReadBufferFromPocoSocketChunked>(*socket);
in->setAsyncCallback(async_callback);
out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
out = std::make_shared<WriteBufferFromPocoSocketChunked>(*socket);
out->setAsyncCallback(async_callback);
connected = true;
setDescription();
@ -222,9 +222,61 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
sendHello();
receiveHello(timeouts.handshake_timeout);
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
/// Client side of chunked protocol negotiation.
/// Server advertises its protocol capabilities (separate for send and receive channels) by sending
/// in its 'Hello' response one of four types - chunked, notchunked, chunked_optional, notchunked_optional.
/// Not optional types are strict meaning that server only supports this type, optional means that
/// server prefer this type but capable to work in opposite.
/// Client selects which type it is going to communicate based on the settings from config or arguments,
/// and sends either "chunked" or "notchunked" protocol request in addendum section of handshake.
/// Client can detect if server's protocol capabilities are not compatible with client's settings (for example
/// server strictly requires chunked protocol but client's settings only allows notchunked protocol) - in such case
/// client should interrupt this connection. However if client continues with incompatible protocol type request, server
/// will send appropriate exception and disconnect client.
auto is_chunked = [](const String & chunked_srv_str, const String & chunked_cl_str, const String & direction)
{
bool chunked_srv = chunked_srv_str.starts_with("chunked");
bool optional_srv = chunked_srv_str.ends_with("_optional");
bool chunked_cl = chunked_cl_str.starts_with("chunked");
bool optional_cl = chunked_cl_str.ends_with("_optional");
if (optional_srv)
return chunked_cl;
if (optional_cl)
return chunked_srv;
if (chunked_cl != chunked_srv)
throw NetException(
ErrorCodes::NETWORK_ERROR,
"Incompatible protocol: {} set to {}, server requires {}",
direction,
chunked_cl ? "chunked" : "notchunked",
chunked_srv ? "chunked" : "notchunked");
return chunked_srv;
};
proto_send_chunked = is_chunked(proto_recv_chunked_srv, proto_send_chunked, "send") ? "chunked" : "notchunked";
proto_recv_chunked = is_chunked(proto_send_chunked_srv, proto_recv_chunked, "recv") ? "chunked" : "notchunked";
}
else
{
if (proto_send_chunked == "chunked" || proto_recv_chunked == "chunked")
throw NetException(
ErrorCodes::NETWORK_ERROR,
"Incompatible protocol: server's version is too old and doesn't support chunked protocol while client settings require it.");
}
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM)
sendAddendum();
if (proto_send_chunked == "chunked")
out->enableChunked();
if (proto_recv_chunked == "chunked")
in->enableChunked();
LOG_TRACE(log_wrapper.get(), "Connected to {} server version {}.{}.{}.",
server_name, server_version_major, server_version_minor, server_version_patch);
}
@ -393,6 +445,13 @@ void Connection::sendAddendum()
{
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY)
writeStringBinary(quota_key, *out);
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
writeStringBinary(proto_send_chunked, *out);
writeStringBinary(proto_recv_chunked, *out);
}
out->next();
}
@ -472,6 +531,12 @@ void Connection::receiveHello(const Poco::Timespan & handshake_timeout)
else
server_version_patch = server_revision;
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
readStringBinary(proto_send_chunked_srv, *in);
readStringBinary(proto_recv_chunked_srv, *in);
}
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES)
{
UInt64 rules_size;
@ -611,6 +676,7 @@ bool Connection::ping(const ConnectionTimeouts & timeouts)
UInt64 pong = 0;
writeVarUInt(Protocol::Client::Ping, *out);
out->finishChunk();
out->next();
if (in->eof())
@ -660,6 +726,7 @@ TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & time
writeVarUInt(Protocol::Client::TablesStatusRequest, *out);
request.write(*out, server_revision);
out->finishChunk();
out->next();
UInt64 response_type = 0;
@ -813,6 +880,8 @@ void Connection::sendQuery(
block_profile_events_in.reset();
block_out.reset();
out->finishChunk();
/// Send empty block which means end of data.
if (!with_pending_data)
{
@ -829,6 +898,7 @@ void Connection::sendCancel()
return;
writeVarUInt(Protocol::Client::Cancel, *out);
out->finishChunk();
out->next();
}
@ -854,7 +924,10 @@ void Connection::sendData(const Block & block, const String & name, bool scalar)
size_t prev_bytes = out->count();
block_out->write(block);
maybe_compressed_out->next();
if (maybe_compressed_out != out)
maybe_compressed_out->next();
if (!block)
out->finishChunk();
out->next();
if (throttler)
@ -865,6 +938,7 @@ void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
writeVarUInt(Protocol::Client::IgnoredPartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->finishChunk();
out->next();
}
@ -874,6 +948,7 @@ void Connection::sendReadTaskResponse(const String & response)
writeVarUInt(Protocol::Client::ReadTaskResponse, *out);
writeVarUInt(DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION, *out);
writeStringBinary(response, *out);
out->finishChunk();
out->next();
}
@ -882,6 +957,7 @@ void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & resp
{
writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out);
response.serialize(*out);
out->finishChunk();
out->next();
}
@ -899,6 +975,8 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
copyData(input, *out);
else
copyData(input, *out, size);
out->finishChunk();
out->next();
}
@ -927,6 +1005,8 @@ void Connection::sendScalarsData(Scalars & data)
sendData(elem.second, elem.first, true /* scalar */);
}
out->finishChunk();
out_bytes = out->count() - out_bytes;
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
double elapsed = watch.elapsedSeconds();
@ -1069,13 +1149,13 @@ std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
bool Connection::poll(size_t timeout_microseconds)
{
return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds);
return in->poll(timeout_microseconds);
}
bool Connection::hasReadPendingData() const
{
return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
return last_input_packet_type.has_value() || in->hasBufferedData();
}
@ -1349,6 +1429,8 @@ ServerConnectionPtr Connection::createConnection(const ConnectionParameters & pa
parameters.default_database,
parameters.user,
parameters.password,
parameters.proto_send_chunked,
parameters.proto_recv_chunked,
parameters.ssh_private_key,
parameters.jwt,
parameters.quota_key,

View File

@ -8,8 +8,8 @@
#include <Core/Defines.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocketChunked.h>
#include <IO/WriteBufferFromPocoSocketChunked.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/Context_fwd.h>
@ -52,6 +52,7 @@ public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & proto_send_chunked_, const String & proto_recv_chunked_,
const SSHKey & ssh_private_key_,
const String & jwt_,
const String & quota_key_,
@ -170,6 +171,10 @@ private:
String default_database;
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String proto_send_chunked_srv;
String proto_recv_chunked_srv;
#if USE_SSH
SSHKey ssh_private_key;
#endif
@ -209,8 +214,8 @@ private:
String server_display_name;
std::unique_ptr<Poco::Net::StreamSocket> socket;
std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBufferFromPocoSocket> out;
std::shared_ptr<ReadBufferFromPocoSocketChunked> in;
std::shared_ptr<WriteBufferFromPocoSocketChunked> out;
std::optional<UInt64> last_input_packet_type;
String query_id;

View File

@ -107,6 +107,9 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
}
}
proto_send_chunked = config.getString("proto_caps.send", "notchunked");
proto_recv_chunked = config.getString("proto_caps.recv", "notchunked");
quota_key = config.getString("quota_key", "");
/// By default compression is disabled if address looks like localhost.

View File

@ -20,6 +20,8 @@ struct ConnectionParameters
std::string default_database;
std::string user;
std::string password;
std::string proto_send_chunked = "notchunked";
std::string proto_recv_chunked = "notchunked";
std::string quota_key;
SSHKey ssh_private_key;
std::string jwt;

View File

@ -13,6 +13,8 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
String default_database,
String user,
String password,
String proto_send_chunked,
String proto_recv_chunked,
String quota_key,
String cluster,
String cluster_secret,
@ -22,7 +24,7 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
Priority priority)
{
Key key{
max_connections, host, port, default_database, user, password, quota_key, cluster, cluster_secret, client_name, compression, secure, priority};
max_connections, host, port, default_database, user, password, proto_send_chunked, proto_recv_chunked, quota_key, cluster, cluster_secret, client_name, compression, secure, priority};
std::lock_guard lock(mutex);
auto [it, inserted] = pools.emplace(key, ConnectionPoolPtr{});
@ -39,6 +41,8 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
default_database,
user,
password,
proto_send_chunked,
proto_recv_chunked,
quota_key,
cluster,
cluster_secret,

View File

@ -73,6 +73,8 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const String & proto_send_chunked_,
const String & proto_recv_chunked_,
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
@ -85,6 +87,8 @@ public:
, default_database(default_database_)
, user(user_)
, password(password_)
, proto_send_chunked(proto_send_chunked_)
, proto_recv_chunked(proto_recv_chunked_)
, quota_key(quota_key_)
, cluster(cluster_)
, cluster_secret(cluster_secret_)
@ -116,7 +120,9 @@ protected:
{
return std::make_shared<Connection>(
host, port,
default_database, user, password, SSHKey(), /*jwt*/ "", quota_key,
default_database, user, password,
proto_send_chunked, proto_recv_chunked,
SSHKey(), /*jwt*/ "", quota_key,
cluster, cluster_secret,
client_name, compression, secure);
}
@ -125,6 +131,8 @@ private:
String default_database;
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String quota_key;
/// For inter-server authorization
@ -150,6 +158,8 @@ public:
String default_database;
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String quota_key;
String cluster;
String cluster_secret;
@ -173,6 +183,8 @@ public:
String default_database,
String user,
String password,
String proto_send_chunked,
String proto_recv_chunked,
String quota_key,
String cluster,
String cluster_secret,
@ -190,6 +202,7 @@ inline bool operator==(const ConnectionPoolFactory::Key & lhs, const ConnectionP
{
return lhs.max_connections == rhs.max_connections && lhs.host == rhs.host && lhs.port == rhs.port
&& lhs.default_database == rhs.default_database && lhs.user == rhs.user && lhs.password == rhs.password
&& lhs.proto_send_chunked == rhs.proto_send_chunked && lhs.proto_recv_chunked == rhs.proto_recv_chunked
&& lhs.quota_key == rhs.quota_key
&& lhs.cluster == rhs.cluster && lhs.cluster_secret == rhs.cluster_secret && lhs.client_name == rhs.client_name
&& lhs.compression == rhs.compression && lhs.secure == rhs.secure && lhs.priority == rhs.priority;

View File

@ -83,6 +83,9 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468;
static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469;
/// Packets size header
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -90,6 +93,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469;
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54469;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54470;
}

View File

@ -51,6 +51,8 @@ namespace
configuration.db,
configuration.user,
configuration.password,
configuration.proto_send_chunked,
configuration.proto_recv_chunked,
configuration.quota_key,
"", /* cluster */
"", /* cluster_secret */
@ -222,7 +224,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
{
validateNamedCollection(
*named_collection, {}, ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
"secure", "host", "hostname", "port", "user", "username", "password", "quota_key", "name",
"secure", "host", "hostname", "port", "user", "username", "password", "proto_send_chunked", "proto_recv_chunked", "quota_key", "name",
"db", "database", "table","query", "where", "invalidate_query", "update_field", "update_lag"});
const auto secure = named_collection->getOrDefault("secure", false);
@ -234,6 +236,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.host = host,
.user = named_collection->getAnyOrDefault<String>({"user", "username"}, "default"),
.password = named_collection->getOrDefault<String>("password", ""),
.proto_send_chunked = named_collection->getOrDefault<String>("proto_send_chunked", "notchunked"),
.proto_recv_chunked = named_collection->getOrDefault<String>("proto_recv_chunked", "notchunked"),
.quota_key = named_collection->getOrDefault<String>("quota_key", ""),
.db = named_collection->getAnyOrDefault<String>({"db", "database"}, default_database),
.table = named_collection->getOrDefault<String>("table", ""),
@ -258,6 +262,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.host = host,
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.proto_send_chunked = config.getString(settings_config_prefix + ".proto_caps.send", "notchunked"),
.proto_recv_chunked = config.getString(settings_config_prefix + ".proto_caps.recv", "notchunked"),
.quota_key = config.getString(settings_config_prefix + ".quota_key", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table", ""),

View File

@ -23,6 +23,8 @@ public:
const std::string host;
const std::string user;
const std::string password;
const std::string proto_send_chunked;
const std::string proto_recv_chunked;
const std::string quota_key;
const std::string db;
const std::string table;

58
src/IO/NetUtils.h Normal file
View File

@ -0,0 +1,58 @@
#pragma once
#include <concepts>
#include <bit>
namespace DB
{
template<std::integral T>
constexpr T netToHost(T value) noexcept
{
if constexpr (std::endian::native != std::endian::big)
return std::byteswap(value);
return value;
}
template<std::integral T>
constexpr T hostToNet(T value) noexcept
{
if constexpr (std::endian::native != std::endian::big)
return std::byteswap(value);
return value;
}
template<std::integral T>
constexpr T toLittleEndian(T value) noexcept
{
if constexpr (std::endian::native == std::endian::big)
return std::byteswap(value);
return value;
}
template<std::integral T>
constexpr T toBigEndian(T value) noexcept
{
if constexpr (std::endian::native != std::endian::big)
return std::byteswap(value);
return value;
}
template<std::integral T>
constexpr T fromLittleEndian(T value) noexcept
{
if constexpr (std::endian::native == std::endian::big)
return std::byteswap(value);
return value;
}
template<std::integral T>
constexpr T fromBigEndian(T value) noexcept
{
if constexpr (std::endian::native != std::endian::big)
return std::byteswap(value);
return value;
}
}

View File

@ -32,7 +32,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
bool ReadBufferFromPocoSocket::nextImpl()
ssize_t ReadBufferFromPocoSocketBase::socketReceiveBytesImpl(char * ptr, size_t size)
{
ssize_t bytes_read = 0;
Stopwatch watch;
@ -43,14 +43,11 @@ bool ReadBufferFromPocoSocket::nextImpl()
ProfileEvents::increment(ProfileEvents::NetworkReceiveBytes, bytes_read);
});
CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkReceive);
/// Add more details to exceptions.
try
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkReceive);
if (internal_buffer.size() > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
/// If async_callback is specified, set socket to non-blocking mode
/// and try to read data from it, if socket is not ready for reading,
/// run async_callback and try again later.
@ -61,7 +58,7 @@ bool ReadBufferFromPocoSocket::nextImpl()
socket.setBlocking(false);
SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
bytes_read = socket.impl()->receiveBytes(ptr, static_cast<int>(size));
/// Check EAGAIN and ERR_SSL_WANT_READ/ERR_SSL_WANT_WRITE for secure socket (reading from secure socket can write too).
while (bytes_read < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(bytes_read) || checkSSLWantWrite(bytes_read)))))
@ -73,12 +70,12 @@ bool ReadBufferFromPocoSocket::nextImpl()
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
/// Try to read again.
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
bytes_read = socket.impl()->receiveBytes(ptr, static_cast<int>(size));
}
}
else
{
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
bytes_read = socket.impl()->receiveBytes(ptr, static_cast<int>(size));
}
}
catch (const Poco::Net::NetException & e)
@ -99,6 +96,16 @@ bool ReadBufferFromPocoSocket::nextImpl()
if (bytes_read < 0)
throw NetException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot read from socket (peer: {}, local: {})", peer_address.toString(), socket.address().toString());
return bytes_read;
}
bool ReadBufferFromPocoSocketBase::nextImpl()
{
if (internal_buffer.size() > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
ssize_t bytes_read = socketReceiveBytesImpl(internal_buffer.begin(), internal_buffer.size());
if (read_event != ProfileEvents::end())
ProfileEvents::increment(read_event, bytes_read);
@ -110,7 +117,7 @@ bool ReadBufferFromPocoSocket::nextImpl()
return true;
}
ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
ReadBufferFromPocoSocketBase::ReadBufferFromPocoSocketBase(Poco::Net::Socket & socket_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, socket(socket_)
, peer_address(socket.peerAddress())
@ -119,19 +126,22 @@ ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_,
{
}
ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size)
: ReadBufferFromPocoSocket(socket_, buf_size)
ReadBufferFromPocoSocketBase::ReadBufferFromPocoSocketBase(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size)
: ReadBufferFromPocoSocketBase(socket_, buf_size)
{
read_event = read_event_;
}
bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) const
bool ReadBufferFromPocoSocketBase::poll(size_t timeout_microseconds) const
{
if (available())
/// For secure socket it is important to check if any remaining data available in underlying decryption buffer -
/// read always retrieves the whole encrypted frame from the wire and puts it into underlying buffer while returning only requested size -
/// further poll() can block though there is still data to read in the underlying decryption buffer.
if (available() || socket.impl()->available())
return true;
Stopwatch watch;
bool res = socket.poll(timeout_microseconds, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR);
bool res = socket.impl()->poll(timeout_microseconds, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR);
ProfileEvents::increment(ProfileEvents::NetworkReceiveElapsedMicroseconds, watch.elapsedMicroseconds());
return res;
}

View File

@ -9,7 +9,7 @@ namespace DB
{
/// Works with the ready Poco::Net::Socket. Blocking operations.
class ReadBufferFromPocoSocket : public BufferWithOwnMemory<ReadBuffer>
class ReadBufferFromPocoSocketBase : public BufferWithOwnMemory<ReadBuffer>
{
protected:
Poco::Net::Socket & socket;
@ -25,16 +25,29 @@ protected:
bool nextImpl() override;
public:
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocketBase(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocketBase(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
bool poll(size_t timeout_microseconds) const;
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
ssize_t socketReceiveBytesImpl(char * ptr, size_t size);
private:
AsyncCallback async_callback;
std::string socket_description;
};
class ReadBufferFromPocoSocket : public ReadBufferFromPocoSocketBase
{
public:
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBufferFromPocoSocketBase(socket_, buf_size)
{}
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBufferFromPocoSocketBase(socket_, read_event_, buf_size)
{}
};
}

View File

@ -0,0 +1,166 @@
#include <IO/ReadBufferFromPocoSocketChunked.h>
#include <Common/logger_useful.h>
#include <IO/NetUtils.h>
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
{
ReadBufferFromPocoSocketChunked::ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size)
: ReadBufferFromPocoSocketChunked(socket_, ProfileEvents::end(), buf_size)
{}
ReadBufferFromPocoSocketChunked::ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size)
: ReadBufferFromPocoSocketBase(
socket_, read_event_,
std::min(buf_size, static_cast<size_t>(std::numeric_limits<decltype(chunk_left)>::max()))),
our_address(socket_.address()), log(getLogger("Protocol"))
{}
void ReadBufferFromPocoSocketChunked::enableChunked()
{
if (chunked)
return;
chunked = 1;
data_end = buffer().end();
/// Resize working buffer so any next read will call nextImpl
working_buffer.resize(offset());
chunk_left = 0;
next_chunk = 0;
}
bool ReadBufferFromPocoSocketChunked::hasBufferedData() const
{
if (available())
return true;
return chunked && (static_cast<size_t>(data_end - working_buffer.end()) > sizeof(next_chunk));
}
bool ReadBufferFromPocoSocketChunked::poll(size_t timeout_microseconds) const
{
if (chunked)
if (available() || static_cast<size_t>(data_end - working_buffer.end()) > sizeof(next_chunk))
return true;
return ReadBufferFromPocoSocketBase::poll(timeout_microseconds);
}
bool ReadBufferFromPocoSocketChunked::loadNextChunk(Position c_pos, bool cont)
{
auto buffered = std::min(static_cast<size_t>(data_end - c_pos), sizeof(next_chunk));
if (buffered)
std::memcpy(&next_chunk, c_pos, buffered);
if (buffered < sizeof(next_chunk))
if (socketReceiveBytesImpl(reinterpret_cast<char *>(&next_chunk) + buffered, sizeof(next_chunk) - buffered) < static_cast<ssize_t>(sizeof(next_chunk) - buffered))
return false;
next_chunk = fromLittleEndian(next_chunk);
if (next_chunk)
{
if (cont)
LOG_TEST(log, "{} <- {} Chunk receive continued. Size {}", ourAddress().toString(), peerAddress().toString(), next_chunk);
}
else
LOG_TEST(log, "{} <- {} Chunk receive ended.", ourAddress().toString(), peerAddress().toString());
return true;
}
bool ReadBufferFromPocoSocketChunked::processChunkLeft(Position c_pos)
{
if (data_end - c_pos < chunk_left)
{
working_buffer.resize(data_end - buffer().begin());
nextimpl_working_buffer_offset = c_pos - buffer().begin();
chunk_left -= (data_end - c_pos);
return true;
}
nextimpl_working_buffer_offset = c_pos - buffer().begin();
working_buffer.resize(nextimpl_working_buffer_offset + chunk_left);
c_pos += chunk_left;
if (!loadNextChunk(c_pos, true))
return false;
chunk_left = 0;
return true;
}
bool ReadBufferFromPocoSocketChunked::nextImpl()
{
if (!chunked)
return ReadBufferFromPocoSocketBase::nextImpl();
auto * c_pos = pos;
if (chunk_left == 0)
{
if (next_chunk == 0)
{
if (chunked == 1)
chunked = 2; // first chunked block - no end marker
else
c_pos = pos + sizeof(next_chunk); // bypass chunk end marker
if (c_pos > data_end)
c_pos = data_end;
if (!loadNextChunk(c_pos))
return false;
chunk_left = next_chunk;
next_chunk = 0;
if (chunk_left == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Native protocol: empty chunk received");
c_pos += sizeof(next_chunk);
if (c_pos >= data_end)
{
if (!ReadBufferFromPocoSocketBase::nextImpl())
return false;
data_end = buffer().end();
c_pos = buffer().begin();
}
LOG_TEST(log, "{} <- {} Chunk receive started. Message {}, size {}", ourAddress().toString(), peerAddress().toString(), static_cast<unsigned int>(*c_pos), chunk_left);
}
else
{
c_pos += sizeof(next_chunk);
if (c_pos >= data_end)
{
if (!ReadBufferFromPocoSocketBase::nextImpl())
return false;
data_end = buffer().end();
c_pos = buffer().begin();
}
chunk_left = next_chunk;
next_chunk = 0;
}
}
else
{
if (!ReadBufferFromPocoSocketBase::nextImpl())
return false;
data_end = buffer().end();
c_pos = buffer().begin();
}
return processChunkLeft(c_pos);
}
}

View File

@ -0,0 +1,109 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
/*
Handshake +=============
| 'Hello' type
| handshake exchange
| chunked protocol negotiation
+=============
Basic chunk:
+=============
Chunk begins | 0x12345678 chunk size, 4 bytes little endian
+-------------
| Packet type always follows beginning of the chunk
| packet data
+-------------
Chunk ends | 0x00000000 4 zero bytes
+=============
Datastream chunk:
+=============
Chunk begins | 0x12345678
+-------------
| Packet type
| packet data
+-------------
| Packet type
| packet data
+-------------
...arbitrary number .....
of packets... .....
+-------------
| Packet type
| packet data
+-------------
Chunk ends | 0x00000000
+=============
Multipart chunk:
+=============
Chunk begins | 0x12345678 chunk part size, 4 bytes little endian
+-------------
| Packet type
| packet data
+-------------
| Packet type
| (partial) packet data
+=============
Chunk continues | 0x12345678 chunk next part size, 4 bytes little endian
+=============
| possibly previous packet's data
+-------------
| Packet type
| packet data
+-------------
...arbitrary number .....
of chunk parts... .....
+-------------
| Packet type
| packet data
+-------------
Chunk ends | 0x00000000
+=============
*/
namespace DB
{
class ReadBufferFromPocoSocketChunked: public ReadBufferFromPocoSocketBase
{
public:
using ReadBufferFromPocoSocketBase::setAsyncCallback;
explicit ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
void enableChunked();
bool hasBufferedData() const;
bool poll(size_t timeout_microseconds) const;
Poco::Net::SocketAddress peerAddress() { return peer_address; }
Poco::Net::SocketAddress ourAddress() { return our_address; }
protected:
bool loadNextChunk(Position c_pos, bool cont = false);
bool processChunkLeft(Position c_pos);
bool nextImpl() override;
Poco::Net::SocketAddress our_address;
private:
LoggerPtr log;
Position data_end = nullptr; // end position of data in the internal_buffer
UInt32 chunk_left = 0; // chunk left to read from socket
UInt32 next_chunk = 0; // size of the next cnunk
UInt8 chunked = 0; // 0 - disabled; 1 - started; 2 - enabled;
};
}

View File

@ -64,7 +64,8 @@ public:
}
bytes += bytes_in_buffer;
pos = working_buffer.begin();
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;
}
/// Calling finalize() in the destructor of derived classes is a bad practice.
@ -164,6 +165,11 @@ protected:
bool finalized = false;
bool canceled = false;
/// The number of bytes to preserve from the initial position of `working_buffer`
/// buffer. Apparently this is an additional out-parameter for nextImpl(),
/// not a real field.
size_t nextimpl_working_buffer_offset = 0;
private:
/** Write the data in the buffer (from the beginning of the buffer to the current position).
* Throw an exception if something is wrong.

View File

@ -183,6 +183,7 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
, socket(socket_)
, peer_address(socket.peerAddress())
, our_address(socket.address())
, write_event(ProfileEvents::end())
, socket_description("socket (" + peer_address.toString() + ")")
{
}

View File

@ -0,0 +1,210 @@
#include <IO/WriteBufferFromPocoSocketChunked.h>
#include <Common/logger_useful.h>
#include <IO/NetUtils.h>
namespace
{
template <typename T>
void setValue(T * typed_ptr, std::type_identity_t<T> val)
{
memcpy(static_cast<void*>(typed_ptr), &val, sizeof(T));
}
}
namespace DB
{
WriteBufferFromPocoSocketChunked::WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size)
: WriteBufferFromPocoSocketChunked(socket_, ProfileEvents::end(), buf_size)
{}
WriteBufferFromPocoSocketChunked::WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size)
: WriteBufferFromPocoSocket(
socket_, write_event_,
std::clamp(buf_size, sizeof(*chunk_size_ptr) + 1, static_cast<size_t>(std::numeric_limits<std::remove_reference_t<decltype(*chunk_size_ptr)>>::max()))),
log(getLogger("Protocol"))
{}
void WriteBufferFromPocoSocketChunked::enableChunked()
{
chunked = true;
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
pos += std::min(available(), sizeof(*chunk_size_ptr));
/// Pretend finishChunk() was just called to prevent sending empty chunk if finishChunk() called immediately
last_finish_chunk = chunk_size_ptr;
}
void WriteBufferFromPocoSocketChunked::finishChunk()
{
if (!chunked)
return;
if (pos <= reinterpret_cast<Position>(chunk_size_ptr) + sizeof(*chunk_size_ptr))
{
/// Prevent duplicate finish chunk (and finish chunk right after enableChunked())
if (chunk_size_ptr == last_finish_chunk)
return;
/// If current chunk is empty it means we are finishing a chunk previously sent by next(),
/// we want to convert current chunk header into end-of-chunk marker and initialize next chunk.
/// We don't need to worry about if it's the end of the buffer because next() always sends the whole buffer
/// so it should be a beginning of the buffer.
chassert(reinterpret_cast<Position>(chunk_size_ptr) == working_buffer.begin());
setValue(chunk_size_ptr, 0);
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
pos += std::min(available(), sizeof(*chunk_size_ptr));
last_finish_chunk = chunk_size_ptr;
return;
}
/// Previously finished chunk wasn't sent yet
if (last_finish_chunk == chunk_size_ptr)
{
chunk_started = false;
LOG_TEST(log, "{} -> {} Chunk send ended.", ourAddress().toString(), peerAddress().toString());
}
/// Fill up current chunk size
setValue(chunk_size_ptr, toLittleEndian(static_cast<UInt32>(pos - reinterpret_cast<Position>(chunk_size_ptr) - sizeof(*chunk_size_ptr))));
if (!chunk_started)
LOG_TEST(log, "{} -> {} Chunk send started. Message {}, size {}",
ourAddress().toString(), peerAddress().toString(),
static_cast<unsigned int>(*(reinterpret_cast<char *>(chunk_size_ptr) + sizeof(*chunk_size_ptr))),
*chunk_size_ptr);
else
{
chunk_started = false;
LOG_TEST(log, "{} -> {} Chunk send continued. Size {}", ourAddress().toString(), peerAddress().toString(), *chunk_size_ptr);
}
LOG_TEST(log, "{} -> {} Chunk send ended.", ourAddress().toString(), peerAddress().toString());
if (available() < sizeof(*chunk_size_ptr))
{
finishing = available();
pos += available();
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
last_finish_chunk = chunk_size_ptr;
return;
}
/// Buffer end-of-chunk
setValue(reinterpret_cast<decltype(chunk_size_ptr)>(pos), 0);
pos += sizeof(*chunk_size_ptr);
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(pos);
pos += std::min(available(), sizeof(*chunk_size_ptr));
last_finish_chunk = chunk_size_ptr;
}
WriteBufferFromPocoSocketChunked::~WriteBufferFromPocoSocketChunked()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferFromPocoSocketChunked::nextImpl()
{
if (!chunked)
{
WriteBufferFromPocoSocket::nextImpl();
return;
}
/// next() after finishChunk at the end of the buffer
if (finishing < sizeof(*chunk_size_ptr))
{
pos -= finishing;
/// Send current chunk
WriteBufferFromPocoSocket::nextImpl();
/// Send end-of-chunk directly
UInt32 s = 0;
socketSendBytes(reinterpret_cast<const char *>(&s), sizeof(s));
finishing = sizeof(*chunk_size_ptr);
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
last_finish_chunk = chunk_size_ptr;
return;
}
/// Prevent sending empty chunk
if (offset() == sizeof(*chunk_size_ptr))
{
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
return;
}
/// Finish chunk at the end of the buffer
if (working_buffer.end() - reinterpret_cast<Position>(chunk_size_ptr) <= static_cast<std::ptrdiff_t>(sizeof(*chunk_size_ptr)))
{
pos = reinterpret_cast<Position>(chunk_size_ptr);
/// Send current chunk
WriteBufferFromPocoSocket::nextImpl();
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
last_finish_chunk = nullptr;
return;
}
bool initialize_last_finish_chunk = false;
if (pos - reinterpret_cast<Position>(chunk_size_ptr) == sizeof(*chunk_size_ptr)) // next() after finishChunk
{
pos -= sizeof(*chunk_size_ptr);
initialize_last_finish_chunk = true;
}
else // fill up current chunk size
{
setValue(chunk_size_ptr, toLittleEndian(static_cast<UInt32>(pos - reinterpret_cast<Position>(chunk_size_ptr) - sizeof(*chunk_size_ptr))));
if (!chunk_started)
{
chunk_started = true;
LOG_TEST(log, "{} -> {} Chunk send started. Message {}, size {}",
ourAddress().toString(), peerAddress().toString(),
static_cast<unsigned int>(*(reinterpret_cast<char *>(chunk_size_ptr) + sizeof(*chunk_size_ptr))),
*chunk_size_ptr);
}
else
LOG_TEST(log, "{} -> {} Chunk send continued. Size {}", ourAddress().toString(), peerAddress().toString(), *chunk_size_ptr);
}
/// Send current chunk
WriteBufferFromPocoSocket::nextImpl();
/// Initialize next chunk
chunk_size_ptr = reinterpret_cast<decltype(chunk_size_ptr)>(working_buffer.begin());
nextimpl_working_buffer_offset = sizeof(*chunk_size_ptr);
last_finish_chunk = initialize_last_finish_chunk ? chunk_size_ptr : nullptr;
}
void WriteBufferFromPocoSocketChunked::finalizeImpl()
{
if (chunked && offset() == sizeof(*chunk_size_ptr))
pos -= sizeof(*chunk_size_ptr);
WriteBufferFromPocoSocket::finalizeImpl();
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <algorithm>
namespace DB
{
class WriteBufferFromPocoSocketChunked: public WriteBufferFromPocoSocket
{
public:
explicit WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit WriteBufferFromPocoSocketChunked(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
void enableChunked();
void finishChunk();
~WriteBufferFromPocoSocketChunked() override;
protected:
void nextImpl() override;
void finalizeImpl() override;
Poco::Net::SocketAddress peerAddress() const { return peer_address; }
Poco::Net::SocketAddress ourAddress() const { return our_address; }
private:
LoggerPtr log;
bool chunked = false;
UInt32 * last_finish_chunk = nullptr; // pointer to the last chunk header created by finishChunk
bool chunk_started = false; // chunk started flag
UInt32 * chunk_size_ptr = nullptr; // pointer to the chunk size holder in the buffer
size_t finishing = sizeof(*chunk_size_ptr); // indicates not enough buffer for end-of-chunk marker
};
}

View File

@ -113,6 +113,9 @@ Cluster::Address::Address(
secure = ConfigHelper::getBool(config, config_prefix + ".secure", false, /* empty_as */true) ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = Priority{config.getInt(config_prefix + ".priority", 1)};
proto_send_chunked = config.getString(config_prefix + ".proto_caps.send", "notchunked");
proto_recv_chunked = config.getString(config_prefix + ".proto_caps.recv", "notchunked");
const char * port_type = secure == Protocol::Secure::Enable ? "tcp_port_secure" : "tcp_port";
auto default_port = config.getInt(port_type, 0);
@ -425,7 +428,9 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name, address.port,
address.default_database, address.user, address.password, address.quota_key,
address.default_database, address.user, address.password,
address.proto_send_chunked, address.proto_recv_chunked,
address.quota_key,
address.cluster, address.cluster_secret,
"server", address.compression,
address.secure, address.priority);
@ -589,6 +594,8 @@ void Cluster::addShard(
replica.default_database,
replica.user,
replica.password,
replica.proto_send_chunked,
replica.proto_recv_chunked,
replica.quota_key,
replica.cluster,
replica.cluster_secret,
@ -744,6 +751,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
address.default_database,
address.user,
address.password,
address.proto_send_chunked,
address.proto_recv_chunked,
address.quota_key,
address.cluster,
address.cluster_secret,

View File

@ -114,6 +114,8 @@ public:
UInt16 port{0};
String user;
String password;
String proto_send_chunked = "notchunked";
String proto_recv_chunked = "notchunked";
String quota_key;
/// For inter-server authorization

View File

@ -103,6 +103,7 @@ namespace DB::ErrorCodes
extern const int SUPPORT_IS_DISABLED;
extern const int UNSUPPORTED_METHOD;
extern const int USER_EXPIRED;
extern const int NETWORK_ERROR;
}
namespace
@ -254,8 +255,8 @@ void TCPHandler::runImpl()
socket().setSendTimeout(send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket(), read_event);
out = std::make_shared<WriteBufferFromPocoSocket>(socket(), write_event);
in = std::make_shared<ReadBufferFromPocoSocketChunked>(socket(), read_event);
out = std::make_shared<WriteBufferFromPocoSocketChunked>(socket(), write_event);
/// Support for PROXY protocol
if (parse_proxy_protocol && !receiveProxyHeader())
@ -280,6 +281,48 @@ void TCPHandler::runImpl()
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM)
receiveAddendum();
{
/// Server side of chunked protocol negotiation.
/// Server advertises its protocol capabilities (separate for send and receive channels) by sending
/// in its 'Hello' response one of four types - chunked, notchunked, chunked_optional, notchunked_optional.
/// Not optional types are strict meaning that server only supports this type, optional means that
/// server prefer this type but capable to work in opposite.
/// Client selects which type it is going to communicate based on the settings from config or arguments,
/// and sends either "chunked" or "notchunked" protocol request in addendum section of handshake.
/// Client can detect if server's protocol capabilities are not compatible with client's settings (for example
/// server strictly requires chunked protocol but client's settings only allows notchunked protocol) - in such case
/// client should interrupt this connection. However if client continues with incompatible protocol type request, server
/// will send appropriate exception and disconnect client.
auto is_chunked = [](const String & chunked_srv_str, const String & chunked_cl_str, const String & direction)
{
bool chunked_srv = chunked_srv_str.starts_with("chunked");
bool optional_srv = chunked_srv_str.ends_with("_optional");
bool chunked_cl = chunked_cl_str.starts_with("chunked");
if (optional_srv)
return chunked_cl;
if (chunked_cl != chunked_srv)
throw NetException(
ErrorCodes::NETWORK_ERROR,
"Incompatible protocol: {} is {}, client requested {}",
direction,
chunked_srv ? "chunked" : "notchunked",
chunked_cl ? "chunked" : "notchunked");
return chunked_srv;
};
bool out_chunked = is_chunked(server.config().getString("proto_caps.send", "notchunked"), proto_recv_chunked_cl, "send");
bool in_chunked = is_chunked(server.config().getString("proto_caps.recv", "notchunked"), proto_send_chunked_cl, "recv");
if (out_chunked)
out->enableChunked();
if (in_chunked)
in->enableChunked();
}
if (!is_interserver_mode)
{
/// If session created, then settings in session context has been updated.
@ -321,7 +364,7 @@ void TCPHandler::runImpl()
{
Stopwatch idle_time;
UInt64 timeout_ms = std::min(poll_interval, idle_connection_timeout) * 1000000;
while (tcp_server.isOpen() && !server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_ms))
while (tcp_server.isOpen() && !server.isCancelled() && !in->poll(timeout_ms))
{
if (idle_time.elapsedSeconds() > idle_connection_timeout)
{
@ -796,7 +839,7 @@ bool TCPHandler::readDataNext()
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (true)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_us))
if (in->poll(timeout_us))
{
/// If client disconnected.
if (in->eof())
@ -1186,6 +1229,8 @@ void TCPHandler::processTablesStatusRequest()
}
response.write(*out, client_tcp_protocol_version);
out->finishChunk();
}
void TCPHandler::receiveUnexpectedTablesStatusRequest()
@ -1206,6 +1251,8 @@ void TCPHandler::sendPartUUIDs()
writeVarUInt(Protocol::Server::PartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->finishChunk();
out->next();
}
}
@ -1214,6 +1261,8 @@ void TCPHandler::sendPartUUIDs()
void TCPHandler::sendReadTaskRequestAssumeLocked()
{
writeVarUInt(Protocol::Server::ReadTaskRequest, *out);
out->finishChunk();
out->next();
}
@ -1222,6 +1271,8 @@ void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRanges
{
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
announcement.serialize(*out);
out->finishChunk();
out->next();
}
@ -1230,6 +1281,8 @@ void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest re
{
writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out);
request.serialize(*out);
out->finishChunk();
out->next();
}
@ -1238,6 +1291,8 @@ void TCPHandler::sendProfileInfo(const ProfileInfo & info)
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
info.write(*out, client_tcp_protocol_version);
out->finishChunk();
out->next();
}
@ -1253,6 +1308,8 @@ void TCPHandler::sendTotals(const Block & totals)
state.block_out->write(totals);
state.maybe_compressed_out->next();
out->finishChunk();
out->next();
}
}
@ -1269,6 +1326,8 @@ void TCPHandler::sendExtremes(const Block & extremes)
state.block_out->write(extremes);
state.maybe_compressed_out->next();
out->finishChunk();
out->next();
}
}
@ -1286,6 +1345,8 @@ void TCPHandler::sendProfileEvents()
writeStringBinary("", *out);
state.profile_events_block_out->write(block);
out->finishChunk();
out->next();
auto elapsed_milliseconds = stopwatch.elapsedMilliseconds();
@ -1323,6 +1384,8 @@ void TCPHandler::sendTimezone()
LOG_DEBUG(log, "TCPHandler::sendTimezone(): {}", tz);
writeVarUInt(Protocol::Server::TimezoneUpdate, *out);
writeStringBinary(tz, *out);
out->finishChunk();
out->next();
}
@ -1583,6 +1646,12 @@ void TCPHandler::receiveAddendum()
if (!is_interserver_mode)
session->setQuotaClientKey(quota_key);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
readStringBinary(proto_send_chunked_cl, *in);
readStringBinary(proto_recv_chunked_cl, *in);
}
}
@ -1616,6 +1685,11 @@ void TCPHandler::sendHello()
writeStringBinary(server_display_name, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
writeVarUInt(VERSION_PATCH, *out);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
writeStringBinary(server.config().getString("proto_caps.send", "notchunked"), *out);
writeStringBinary(server.config().getString("proto_caps.recv", "notchunked"), *out);
}
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES)
{
auto rules = server.context()->getAccessControl().getPasswordComplexityRules();
@ -1668,6 +1742,7 @@ bool TCPHandler::receivePacket()
case Protocol::Client::Ping:
writeVarUInt(Protocol::Server::Pong, *out);
out->finishChunk();
out->next();
return false;
@ -2197,7 +2272,7 @@ QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus()
after_check_cancelled.restart();
/// During request execution the only packet that can come from the client is stopping the query.
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(0))
if (in->poll(0))
{
if (in->eof())
{
@ -2248,19 +2323,33 @@ void TCPHandler::sendData(const Block & block)
}
writeVarUInt(Protocol::Server::Data, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
/// For testing hedged requests
if (block.rows() > 0 && query_context->getSettingsRef().sleep_in_send_data_ms.totalMilliseconds())
{
/// This strange sequence is needed in case of chunked protocol is enabled, in order for client not to
/// hang on receiving of at least packet type - chunk will not be processed unless either chunk footer
/// or chunk continuation header is received - first 'next' is sending starting chunk containing packet type
/// and second 'next' is sending chunk continuation header.
out->next();
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
out->next();
std::chrono::milliseconds ms(query_context->getSettingsRef().sleep_in_send_data_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
else
{
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
}
state.block_out->write(block);
state.maybe_compressed_out->next();
if (state.maybe_compressed_out != out)
state.maybe_compressed_out->next();
out->finishChunk();
out->next();
}
catch (...)
@ -2296,6 +2385,8 @@ void TCPHandler::sendLogData(const Block & block)
writeStringBinary("", *out);
state.logs_block_out->write(block);
out->finishChunk();
out->next();
}
@ -2307,6 +2398,7 @@ void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
writeStringBinary("", *out);
writeStringBinary(columns.toString(), *out);
out->finishChunk();
out->next();
}
@ -2316,6 +2408,8 @@ void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
writeVarUInt(Protocol::Server::Exception, *out);
writeException(e, *out, with_stack_trace);
out->finishChunk();
out->next();
}
@ -2326,6 +2420,8 @@ void TCPHandler::sendEndOfStream()
state.io.setAllDataSent();
writeVarUInt(Protocol::Server::EndOfStream, *out);
out->finishChunk();
out->next();
}
@ -2344,6 +2440,8 @@ void TCPHandler::sendProgress()
increment.elapsed_ns = current_elapsed_ns - state.prev_elapsed_ns;
state.prev_elapsed_ns = current_elapsed_ns;
increment.write(*out, client_tcp_protocol_version);
out->finishChunk();
out->next();
}

View File

@ -18,6 +18,8 @@
#include <Interpreters/ProfileEventsExt.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <IO/ReadBufferFromPocoSocketChunked.h>
#include <IO/WriteBufferFromPocoSocketChunked.h>
#include "Core/Types.h"
#include "IServer.h"
@ -186,6 +188,8 @@ private:
UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0;
UInt32 client_tcp_protocol_version = 0;
String proto_send_chunked_cl = "notchunked";
String proto_recv_chunked_cl = "notchunked";
String quota_key;
/// Connection settings, which are extracted from a context.
@ -204,8 +208,8 @@ private:
ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::NO_QUERY;
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
std::shared_ptr<ReadBufferFromPocoSocketChunked> in;
std::shared_ptr<WriteBufferFromPocoSocketChunked> out;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;

View File

@ -273,6 +273,8 @@ ConnectionPoolWithFailoverPtr DistributedAsyncInsertDirectoryQueue::createPool(c
address.default_database,
address.user,
address.password,
address.proto_send_chunked,
address.proto_recv_chunked,
address.quota_key,
address.cluster,
address.cluster_secret,

View File

@ -5704,7 +5704,8 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, SSHKey(), /*jwt*/"", node.quota_key, node.cluster, node.cluster_secret,
node.user, node.password, node.proto_send_chunked, node.proto_recv_chunked,
SSHKey(), /*jwt*/"", node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure

View File

@ -333,7 +333,7 @@ def test_receive_timeout2(started_cluster):
# in packet receiving but there are replicas in process of
# connection establishing.
update_configs(
node_1_sleep_in_send_data=4000,
node_1_sleep_in_send_data=5000,
node_2_sleep_in_send_tables_status=2000,
node_3_sleep_in_send_tables_status=2000,
)

View File

@ -1,4 +0,0 @@
<Test> MergeTreeReadPoolBase: Will use min_marks_per_task=24
<Test> MergeTreeMarksLoader: Loading marks from path data.cmrk3
<Test> MergeTreeRangeReader: First reader returned: num_rows: 1, columns: 1, total_rows_per_granule: 1, no filter, column[0]: Int32(size = 1), requested columns: key
<Test> MergeTreeRangeReader: read() returned num_rows: 1, columns: 1, total_rows_per_granule: 1, no filter, column[0]: Int32(size = 1), sample block key

View File

@ -18,6 +18,10 @@ $CLICKHOUSE_CLIENT -m -q "
# instead of "last" value, hence you cannot simply append another
# --send_logs_level here.
CLICKHOUSE_CLIENT_CLEAN=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=test/g')
$CLICKHOUSE_CLIENT_CLEAN -q "select * from data SETTINGS merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability = 0.0;" |& grep -o -e '<Unknown>.*' -e '<Test>.*'
$CLICKHOUSE_CLIENT -q "drop table data"
set -e
trap '$CLICKHOUSE_CLIENT -q "drop table data"' EXIT
$CLICKHOUSE_CLIENT_CLEAN -q "select * from data SETTINGS merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability = 0.0;" |& (! grep -q -o -e '<Unknown>.*')
$CLICKHOUSE_CLIENT_CLEAN -q "select * from data SETTINGS merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability = 0.0;" |& grep -q -o -e '<Test>.*'