straighten the protocol version

This commit is contained in:
Alexander Kuzmenkov 2020-09-17 15:15:05 +03:00
parent 7b8ad02a12
commit a374541214
24 changed files with 58 additions and 77 deletions

View File

@ -781,7 +781,7 @@ void BaseDaemon::initializeTerminationAndSignalProcessing()
void BaseDaemon::logRevision() const void BaseDaemon::logRevision() const
{ {
Poco::Logger::root().information("Starting " + std::string{VERSION_FULL} Poco::Logger::root().information("Starting " + std::string{VERSION_FULL}
+ " with revision " + std::to_string(ClickHouseRevision::get()) + " with revision " + std::to_string(ClickHouseRevision::getVersionRevision())
+ ", " + build_id_info + ", " + build_id_info
+ ", PID " + std::to_string(getpid())); + ", PID " + std::to_string(getpid()));
} }

View File

@ -105,7 +105,7 @@ void ClusterCopierApp::mainImpl()
ThreadStatus thread_status; ThreadStatus thread_status;
auto * log = &logger(); auto * log = &logger();
LOG_INFO(log, "Starting clickhouse-copier (id {}, host_id {}, path {}, revision {})", process_id, host_id, process_path, ClickHouseRevision::get()); LOG_INFO(log, "Starting clickhouse-copier (id {}, host_id {}, path {}, revision {})", process_id, host_id, process_path, ClickHouseRevision::getVersionRevision());
SharedContextHolder shared_context = Context::createShared(); SharedContextHolder shared_context = Context::createShared();
auto context = std::make_unique<Context>(Context::createGlobal(shared_context.get())); auto context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));

View File

@ -256,7 +256,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
#endif #endif
#endif #endif
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get()); CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger()); CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
if (ThreadFuzzer::instance().isEffective()) if (ThreadFuzzer::instance().isEffective())

View File

@ -162,14 +162,12 @@ void Connection::sendHello()
|| has_control_character(password)) || has_control_character(password))
throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters", ErrorCodes::BAD_ARGUMENTS); throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters", ErrorCodes::BAD_ARGUMENTS);
auto client_revision = ClickHouseRevision::get();
writeVarUInt(Protocol::Client::Hello, *out); writeVarUInt(Protocol::Client::Hello, *out);
writeStringBinary((DBMS_NAME " ") + client_name, *out); writeStringBinary((DBMS_NAME " ") + client_name, *out);
writeVarUInt(DBMS_VERSION_MAJOR, *out); writeVarUInt(DBMS_VERSION_MAJOR, *out);
writeVarUInt(DBMS_VERSION_MINOR, *out); writeVarUInt(DBMS_VERSION_MINOR, *out);
// NOTE For backward compatibility of the protocol, client cannot send its version_patch. // NOTE For backward compatibility of the protocol, client cannot send its version_patch.
writeVarUInt(client_revision, *out); writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
writeStringBinary(default_database, *out); writeStringBinary(default_database, *out);
writeStringBinary(user, *out); writeStringBinary(user, *out);
writeStringBinary(password, *out); writeStringBinary(password, *out);

View File

@ -6,6 +6,6 @@
namespace ClickHouseRevision namespace ClickHouseRevision
{ {
unsigned get() { return VERSION_REVISION; } unsigned getVersionRevision() { return VERSION_REVISION; }
unsigned getVersionInteger() { return VERSION_INTEGER; } unsigned getVersionInteger() { return VERSION_INTEGER; }
} }

View File

@ -2,6 +2,6 @@
namespace ClickHouseRevision namespace ClickHouseRevision
{ {
unsigned get(); unsigned getVersionRevision();
unsigned getVersionInteger(); unsigned getVersionInteger();
} }

View File

@ -37,7 +37,7 @@ StatusFile::FillFunction StatusFile::write_full_info = [](WriteBuffer & out)
{ {
out << "PID: " << getpid() << "\n" out << "PID: " << getpid() << "\n"
<< "Started at: " << LocalDateTime(time(nullptr)) << "\n" << "Started at: " << LocalDateTime(time(nullptr)) << "\n"
<< "Revision: " << ClickHouseRevision::get() << "\n"; << "Revision: " << ClickHouseRevision::getVersionRevision() << "\n";
}; };

View File

@ -2,18 +2,7 @@
// .h autogenerated by cmake! // .h autogenerated by cmake!
#cmakedefine01 USE_DBMS_TCP_PROTOCOL_VERSION #cmakedefine VERSION_REVISION @VERSION_REVISION@
#if USE_DBMS_TCP_PROTOCOL_VERSION
#include "Core/Defines.h"
#ifndef VERSION_REVISION
#define VERSION_REVISION DBMS_TCP_PROTOCOL_VERSION
#endif
#else
#cmakedefine VERSION_REVISION @VERSION_REVISION@
#endif
#cmakedefine VERSION_NAME "@VERSION_NAME@" #cmakedefine VERSION_NAME "@VERSION_NAME@"
#define DBMS_NAME VERSION_NAME #define DBMS_NAME VERSION_NAME
#cmakedefine VERSION_MAJOR @VERSION_MAJOR@ #cmakedefine VERSION_MAJOR @VERSION_MAJOR@

View File

@ -68,10 +68,10 @@
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429 #define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
/// Minimum revision supporting OpenTelemetry /// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54227 #define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54441
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change. /// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54227 #define DBMS_TCP_PROTOCOL_VERSION 54441
/// The boundary on which the blocks for asynchronous file operations should be aligned. /// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096 #define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/ClickHouseRevision.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h> #include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h> #include <DataStreams/NativeBlockOutputStream.h>
@ -23,7 +22,7 @@ struct TemporaryFileStream
TemporaryFileStream(const std::string & path) TemporaryFileStream(const std::string & path)
: file_in(path) : file_in(path)
, compressed_in(file_in) , compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) , block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{} {}
TemporaryFileStream(const std::string & path, const Block & header_) TemporaryFileStream(const std::string & path, const Block & header_)

View File

@ -844,7 +844,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
const std::string & path = file->path(); const std::string & path = file->path();
WriteBufferFromFile file_buf(path); WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf); CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get(), getHeader(false)); NativeBlockOutputStream block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}.", path); LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}.", path);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart); ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);

View File

@ -5,7 +5,6 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Core/Defines.h> #include <Core/Defines.h>
#include <common/getFQDNOrHostName.h> #include <common/getFQDNOrHostName.h>
#include <Common/ClickHouseRevision.h>
#include <unistd.h> #include <unistd.h>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
@ -44,7 +43,7 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
writeBinary(client_name, out); writeBinary(client_name, out);
writeVarUInt(client_version_major, out); writeVarUInt(client_version_major, out);
writeVarUInt(client_version_minor, out); writeVarUInt(client_version_minor, out);
writeVarUInt(client_revision, out); writeVarUInt(client_tcp_protocol_version, out);
} }
else if (interface == Interface::HTTP) else if (interface == Interface::HTTP)
{ {
@ -111,7 +110,7 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(client_name, in); readBinary(client_name, in);
readVarUInt(client_version_major, in); readVarUInt(client_version_major, in);
readVarUInt(client_version_minor, in); readVarUInt(client_version_minor, in);
readVarUInt(client_revision, in); readVarUInt(client_tcp_protocol_version, in);
} }
else if (interface == Interface::HTTP) else if (interface == Interface::HTTP)
{ {
@ -130,7 +129,7 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
readVarUInt(client_version_patch, in); readVarUInt(client_version_patch, in);
else else
client_version_patch = client_revision; client_version_patch = client_tcp_protocol_version;
} }
// TODO what does it even mean to read this structure over HTTP? I thought // TODO what does it even mean to read this structure over HTTP? I thought
@ -244,7 +243,7 @@ void ClientInfo::fillOSUserHostNameAndVersionInfo()
client_version_major = DBMS_VERSION_MAJOR; client_version_major = DBMS_VERSION_MAJOR;
client_version_minor = DBMS_VERSION_MINOR; client_version_minor = DBMS_VERSION_MINOR;
client_version_patch = DBMS_VERSION_PATCH; client_version_patch = DBMS_VERSION_PATCH;
client_revision = ClickHouseRevision::get(); client_tcp_protocol_version = DBMS_TCP_PROTOCOL_VERSION;
} }

View File

@ -82,7 +82,7 @@ public:
UInt64 client_version_major = 0; UInt64 client_version_major = 0;
UInt64 client_version_minor = 0; UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0; UInt64 client_version_patch = 0;
unsigned client_revision = 0; unsigned client_tcp_protocol_version = 0;
/// For http /// For http
HTTPMethod http_method = HTTPMethod::UNKNOWN; HTTPMethod http_method = HTTPMethod::UNKNOWN;

View File

@ -49,7 +49,7 @@ void CrashLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(trace); columns[i++]->insert(trace);
columns[i++]->insert(trace_full); columns[i++]->insert(trace_full);
columns[i++]->insert(VERSION_FULL); columns[i++]->insert(VERSION_FULL);
columns[i++]->insert(ClickHouseRevision::get()); columns[i++]->insert(ClickHouseRevision::getVersionRevision());
String build_id_hex; String build_id_hex;
#if defined(__ELF__) && !defined(__FreeBSD__) #if defined(__ELF__) && !defined(__FreeBSD__)

View File

@ -118,7 +118,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
appendClientInfo(client_info, columns, i); appendClientInfo(client_info, columns, i);
columns[i++]->insert(ClickHouseRevision::get()); columns[i++]->insert(ClickHouseRevision::getVersionRevision());
{ {
Array threads_array; Array threads_array;
@ -172,7 +172,7 @@ void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableCo
columns[i++]->insert(client_info.os_user); columns[i++]->insert(client_info.os_user);
columns[i++]->insert(client_info.client_hostname); columns[i++]->insert(client_info.client_hostname);
columns[i++]->insert(client_info.client_name); columns[i++]->insert(client_info.client_name);
columns[i++]->insert(client_info.client_revision); columns[i++]->insert(client_info.client_tcp_protocol_version);
columns[i++]->insert(client_info.client_version_major); columns[i++]->insert(client_info.client_version_major);
columns[i++]->insert(client_info.client_version_minor); columns[i++]->insert(client_info.client_version_minor);
columns[i++]->insert(client_info.client_version_patch); columns[i++]->insert(client_info.client_version_patch);

View File

@ -93,7 +93,7 @@ void QueryThreadLogElement::appendToBlock(MutableColumns & columns) const
QueryLogElement::appendClientInfo(client_info, columns, i); QueryLogElement::appendClientInfo(client_info, columns, i);
columns[i++]->insert(ClickHouseRevision::get()); columns[i++]->insert(ClickHouseRevision::getVersionRevision());
if (profile_counters) if (profile_counters)
{ {

View File

@ -62,7 +62,7 @@ void TextLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(logger_name); columns[i++]->insert(logger_name);
columns[i++]->insert(message); columns[i++]->insert(message);
columns[i++]->insert(ClickHouseRevision::get()); columns[i++]->insert(ClickHouseRevision::getVersionRevision());
columns[i++]->insert(source_file); columns[i++]->insert(source_file);
columns[i++]->insert(source_line); columns[i++]->insert(source_line);

View File

@ -43,7 +43,7 @@ void TraceLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(DateLUT::instance().toDayNum(event_time)); columns[i++]->insert(DateLUT::instance().toDayNum(event_time));
columns[i++]->insert(event_time); columns[i++]->insert(event_time);
columns[i++]->insert(timestamp_ns); columns[i++]->insert(timestamp_ns);
columns[i++]->insert(ClickHouseRevision::get()); columns[i++]->insert(ClickHouseRevision::getVersionRevision());
columns[i++]->insert(static_cast<UInt8>(trace_type)); columns[i++]->insert(static_cast<UInt8>(trace_type));
columns[i++]->insert(thread_id); columns[i++]->insert(thread_id);
columns[i++]->insertData(query_id.data(), query_id.size()); columns[i++]->insertData(query_id.data(), query_id.size());

View File

@ -1,6 +1,5 @@
#include <Processors/Transforms/AggregatingTransform.h> #include <Processors/Transforms/AggregatingTransform.h>
#include <Common/ClickHouseRevision.h>
#include <DataStreams/NativeBlockInputStream.h> #include <DataStreams/NativeBlockInputStream.h>
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
@ -56,7 +55,7 @@ namespace
public: public:
SourceFromNativeStream(const Block & header, const std::string & path) SourceFromNativeStream(const Block & header, const std::string & path)
: ISource(header), file_in(path), compressed_in(file_in), : ISource(header), file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{ {
block_in->readPrefix(); block_in->readPrefix();
} }

View File

@ -1,7 +1,6 @@
#include <iomanip> #include <iomanip>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/NetException.h> #include <Common/NetException.h>
@ -183,7 +182,7 @@ void TCPHandler::runImpl()
/// Should we send internal logs to client? /// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level; const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& client_logs_level != LogsLevel::none) && client_logs_level != LogsLevel::none)
{ {
state.logs_queue = std::make_shared<InternalTextLogsQueue>(); state.logs_queue = std::make_shared<InternalTextLogsQueue>();
@ -218,7 +217,7 @@ void TCPHandler::runImpl()
state.need_receive_data_for_input = true; state.need_receive_data_for_input = true;
/// Send ColumnsDescription for input storage. /// Send ColumnsDescription for input storage.
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
&& query_context->getSettingsRef().input_format_defaults_for_omitted_fields) && query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{ {
sendTableColumns(metadata_snapshot->getColumns()); sendTableColumns(metadata_snapshot->getColumns());
@ -248,7 +247,7 @@ void TCPHandler::runImpl()
customizeContext(*query_context); customizeContext(*query_context);
bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA; bool may_have_embedded_data = client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
/// Processing Query /// Processing Query
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data); state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
@ -482,7 +481,7 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
state.io.out->writePrefix(); state.io.out->writePrefix();
/// Send ColumnsDescription for insertion table /// Send ColumnsDescription for insertion table
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{ {
const auto & table_id = query_context->getInsertionTable(); const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields) if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
@ -638,7 +637,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
void TCPHandler::processTablesStatusRequest() void TCPHandler::processTablesStatusRequest()
{ {
TablesStatusRequest request; TablesStatusRequest request;
request.read(*in, client_revision); request.read(*in, client_tcp_protocol_version);
TablesStatusResponse response; TablesStatusResponse response;
for (const QualifiedTableName & table_name: request.tables) for (const QualifiedTableName & table_name: request.tables)
@ -661,13 +660,13 @@ void TCPHandler::processTablesStatusRequest()
} }
writeVarUInt(Protocol::Server::TablesStatusResponse, *out); writeVarUInt(Protocol::Server::TablesStatusResponse, *out);
response.write(*out, client_revision); response.write(*out, client_tcp_protocol_version);
} }
void TCPHandler::receiveUnexpectedTablesStatusRequest() void TCPHandler::receiveUnexpectedTablesStatusRequest()
{ {
TablesStatusRequest skip_request; TablesStatusRequest skip_request;
skip_request.read(*in, client_revision); skip_request.read(*in, client_tcp_protocol_version);
throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
} }
@ -742,7 +741,7 @@ void TCPHandler::receiveHello()
readVarUInt(client_version_major, *in); readVarUInt(client_version_major, *in);
readVarUInt(client_version_minor, *in); readVarUInt(client_version_minor, *in);
// NOTE For backward compatibility of the protocol, client cannot send its version_patch. // NOTE For backward compatibility of the protocol, client cannot send its version_patch.
readVarUInt(client_revision, *in); readVarUInt(client_tcp_protocol_version, *in);
readStringBinary(default_database, *in); readStringBinary(default_database, *in);
readStringBinary(user, *in); readStringBinary(user, *in);
readStringBinary(password, *in); readStringBinary(password, *in);
@ -750,7 +749,7 @@ void TCPHandler::receiveHello()
LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.", LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.",
client_name, client_name,
client_version_major, client_version_minor, client_version_patch, client_version_major, client_version_minor, client_version_patch,
client_revision, client_tcp_protocol_version,
(!default_database.empty() ? ", database: " + default_database : ""), (!default_database.empty() ? ", database: " + default_database : ""),
(!user.empty() ? ", user: " + user : "")); (!user.empty() ? ", user: " + user : ""));
@ -781,12 +780,12 @@ void TCPHandler::sendHello()
writeStringBinary(DBMS_NAME, *out); writeStringBinary(DBMS_NAME, *out);
writeVarUInt(DBMS_VERSION_MAJOR, *out); writeVarUInt(DBMS_VERSION_MAJOR, *out);
writeVarUInt(DBMS_VERSION_MINOR, *out); writeVarUInt(DBMS_VERSION_MINOR, *out);
writeVarUInt(ClickHouseRevision::get(), *out); writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
writeStringBinary(DateLUT::instance().getTimeZone(), *out); writeStringBinary(DateLUT::instance().getTimeZone(), *out);
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
writeStringBinary(server_display_name, *out); writeStringBinary(server_display_name, *out);
if (client_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
writeVarUInt(DBMS_VERSION_PATCH, *out); writeVarUInt(DBMS_VERSION_PATCH, *out);
out->next(); out->next();
} }
@ -847,8 +846,8 @@ void TCPHandler::receiveQuery()
/// Client info /// Client info
ClientInfo & client_info = query_context->getClientInfo(); ClientInfo & client_info = query_context->getClientInfo();
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
client_info.read(*in, client_revision); client_info.read(*in, client_tcp_protocol_version);
/// For better support of old clients, that does not send ClientInfo. /// For better support of old clients, that does not send ClientInfo.
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY) if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
@ -858,7 +857,7 @@ void TCPHandler::receiveQuery()
client_info.client_version_major = client_version_major; client_info.client_version_major = client_version_major;
client_info.client_version_minor = client_version_minor; client_info.client_version_minor = client_version_minor;
client_info.client_version_patch = client_version_patch; client_info.client_version_patch = client_version_patch;
client_info.client_revision = client_revision; client_info.client_tcp_protocol_version = client_tcp_protocol_version;
} }
/// Set fields, that are known apriori. /// Set fields, that are known apriori.
@ -879,7 +878,7 @@ void TCPHandler::receiveQuery()
/// Per query settings are also passed via TCP. /// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints. /// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = auto settings_format =
(client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS)
? SettingsWriteFormat::STRINGS_WITH_FLAGS ? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY; : SettingsWriteFormat::BINARY;
@ -900,7 +899,7 @@ void TCPHandler::receiveQuery()
// Use the received query id, or generate a random default. It is convenient // Use the received query id, or generate a random default. It is convenient
// to also generate the default OpenTelemetry trace id at the same time, and // to also generate the default OpenTelemetry trace id at the same time, and
// and and set the trace parent. // set the trace parent.
// Why is this done here and not earlier: // Why is this done here and not earlier:
// 1) ClientInfo might contain upstream trace id, so we decide whether to use // 1) ClientInfo might contain upstream trace id, so we decide whether to use
// the default ids after we have received the ClientInfo. // the default ids after we have received the ClientInfo.
@ -933,11 +932,11 @@ void TCPHandler::receiveUnexpectedQuery()
readStringBinary(skip_string, *in); readStringBinary(skip_string, *in);
ClientInfo skip_client_info; ClientInfo skip_client_info;
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
skip_client_info.read(*in, client_revision); skip_client_info.read(*in, client_tcp_protocol_version);
Settings skip_settings; Settings skip_settings;
auto settings_format = (client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY; : SettingsWriteFormat::BINARY;
skip_settings.read(*in, settings_format); skip_settings.read(*in, settings_format);
@ -1011,7 +1010,7 @@ void TCPHandler::receiveUnexpectedData()
auto skip_block_in = std::make_shared<NativeBlockInputStream>( auto skip_block_in = std::make_shared<NativeBlockInputStream>(
*maybe_compressed_in, *maybe_compressed_in,
last_block_in.header, last_block_in.header,
client_revision); client_tcp_protocol_version);
skip_block_in->read(); skip_block_in->read();
throw NetException("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); throw NetException("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
@ -1038,7 +1037,7 @@ void TCPHandler::initBlockInput()
state.block_in = std::make_shared<NativeBlockInputStream>( state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in, *state.maybe_compressed_in,
header, header,
client_revision); client_tcp_protocol_version);
} }
} }
@ -1069,7 +1068,7 @@ void TCPHandler::initBlockOutput(const Block & block)
state.block_out = std::make_shared<NativeBlockOutputStream>( state.block_out = std::make_shared<NativeBlockOutputStream>(
*state.maybe_compressed_out, *state.maybe_compressed_out,
client_revision, client_tcp_protocol_version,
block.cloneEmpty(), block.cloneEmpty(),
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format); !connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
} }
@ -1082,7 +1081,7 @@ void TCPHandler::initLogsBlockOutput(const Block & block)
/// Use uncompressed stream since log blocks usually contain only one row /// Use uncompressed stream since log blocks usually contain only one row
state.logs_block_out = std::make_shared<NativeBlockOutputStream>( state.logs_block_out = std::make_shared<NativeBlockOutputStream>(
*out, *out,
client_revision, client_tcp_protocol_version,
block.cloneEmpty(), block.cloneEmpty(),
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format); !connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
} }
@ -1186,7 +1185,7 @@ void TCPHandler::sendProgress()
{ {
writeVarUInt(Protocol::Server::Progress, *out); writeVarUInt(Protocol::Server::Progress, *out);
auto increment = state.progress.fetchAndResetPiecewiseAtomically(); auto increment = state.progress.fetchAndResetPiecewiseAtomically();
increment.write(*out, client_revision); increment.write(*out, client_tcp_protocol_version);
out->next(); out->next();
} }

View File

@ -124,7 +124,7 @@ private:
UInt64 client_version_major = 0; UInt64 client_version_major = 0;
UInt64 client_version_minor = 0; UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0; UInt64 client_version_patch = 0;
UInt64 client_revision = 0; UInt64 client_tcp_protocol_version = 0;
Context connection_context; Context connection_context;
std::optional<Context> query_context; std::optional<Context> query_context;

View File

@ -3,7 +3,6 @@
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/ClickHouseRevision.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/hex.h> #include <Common/hex.h>
@ -357,7 +356,7 @@ void StorageDistributedDirectoryMonitor::readHeader(
UInt64 initiator_revision; UInt64 initiator_revision;
readVarUInt(initiator_revision, header_buf); readVarUInt(initiator_revision, header_buf);
if (ClickHouseRevision::get() < initiator_revision) if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision)
{ {
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features."); LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
} }
@ -576,7 +575,7 @@ public:
explicit DirectoryMonitorBlockInputStream(const String & file_name) explicit DirectoryMonitorBlockInputStream(const String & file_name)
: in(file_name) : in(file_name)
, decompressing_in(in) , decompressing_in(in)
, block_in(decompressing_in, ClickHouseRevision::get()) , block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION)
, log{&Poco::Logger::get("DirectoryMonitorBlockInputStream")} , log{&Poco::Logger::get("DirectoryMonitorBlockInputStream")}
{ {
Settings insert_settings; Settings insert_settings;
@ -681,7 +680,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
readHeader(in, insert_settings, insert_query, client_info, log); readHeader(in, insert_settings, insert_query, client_info, log);
CompressedReadBuffer decompressing_in(in); CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get()); NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix(); block_in.readPrefix();
while (Block block = block_in.read()) while (Block block = block_in.read())

View File

@ -21,7 +21,6 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/Exception.h> #include <Common/Exception.h>
@ -583,16 +582,16 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
{ {
WriteBufferFromFile out{first_file_tmp_path}; WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out}; CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()}; NativeBlockOutputStream stream{compress, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};
/// Prepare the header. /// Prepare the header.
/// We wrap the header into a string for compatibility with older versions: /// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version. /// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf; WriteBufferFromOwnString header_buf;
writeVarUInt(ClickHouseRevision::get(), header_buf); writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, header_buf);
writeStringBinary(query_string, header_buf); writeStringBinary(query_string, header_buf);
context.getSettingsRef().write(header_buf); context.getSettingsRef().write(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get()); context.getClientInfo().write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
/// Add new fields here, for example: /// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf); /// writeVarUInt(my_new_data, header_buf);

View File

@ -91,7 +91,7 @@ void StorageSystemProcesses::fillData(MutableColumns & res_columns, const Contex
res_columns[i++]->insert(process.client_info.os_user); res_columns[i++]->insert(process.client_info.os_user);
res_columns[i++]->insert(process.client_info.client_hostname); res_columns[i++]->insert(process.client_info.client_hostname);
res_columns[i++]->insert(process.client_info.client_name); res_columns[i++]->insert(process.client_info.client_name);
res_columns[i++]->insert(process.client_info.client_revision); res_columns[i++]->insert(process.client_info.client_tcp_protocol_version);
res_columns[i++]->insert(process.client_info.client_version_major); res_columns[i++]->insert(process.client_info.client_version_major);
res_columns[i++]->insert(process.client_info.client_version_minor); res_columns[i++]->insert(process.client_info.client_version_minor);
res_columns[i++]->insert(process.client_info.client_version_patch); res_columns[i++]->insert(process.client_info.client_version_patch);