Fix protocol incompatibility introduced in https://github.com/ClickHouse/ClickHouse/pull/8598

This commit is contained in:
alesapin 2020-02-27 13:43:38 +03:00
parent f09131809f
commit e161cc1424

View File

@ -35,9 +35,9 @@ namespace DataPartsExchange
namespace
{
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = "0";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = "1";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = "2";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = 0;
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1;
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2;
std::string getEndpointId(const std::string & node_id)
@ -54,7 +54,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
String part_name = params.get("part");
@ -79,7 +79,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
response.setChunkedTransferEncoding(false);
return;
}
response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS});
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS))});
++total_sends;
SCOPE_EXIT({--total_sends;});
@ -107,10 +109,10 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
MergeTreeData::DataPart::Checksums data_checksums;
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
writeBinary(checksums.getTotalSizeOnDisk(), out);
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
WriteBufferFromOwnString ttl_infos_buffer;
part->ttl_infos.write(ttl_infos_buffer);
@ -202,7 +204,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)},
{"compress", "false"}
});
@ -224,15 +226,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
data_settings->replicated_max_parallel_fetches_for_host
};
auto server_protocol_version = in.getResponseCookie("server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
ReservationPtr reservation;
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
size_t sum_files_size;
readBinary(sum_files_size, in);
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
IMergeTreeDataPart::TTLInfos ttl_infos;
String ttl_infos_string;