Add protocol version to cookies

This commit is contained in:
alesapin 2019-09-06 15:18:56 +03:00
parent 0c9a9dee1c
commit ea8e543b1a
4 changed files with 47 additions and 70 deletions

View File

@ -172,15 +172,8 @@ public:
/// availability of free space is not checked.
void update(UInt64 new_size);
UInt64 getSize() const
{
return size;
}
const DiskPtr & getDisk() const
{
return disk_ptr;
}
UInt64 getSize() const { return size; }
const DiskPtr & getDisk() const { return disk_ptr; }
private:
UInt64 size;
@ -313,8 +306,6 @@ private:
};
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
/// Parse .xml configuration and store information about policies

View File

@ -38,20 +38,23 @@ namespace detail
SessionPtr session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
std::vector<Poco::Net::HTTPCookie> cookies;
public:
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTPBase(SessionPtr session_,
explicit ReadWriteBufferFromHTTPBase(
SessionPtr session_,
Poco::URI uri_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session {session_}
, uri{uri_}
, method{!method_.empty() ? method_
: out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session{session_}
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri.getPath().empty())
@ -78,6 +81,7 @@ namespace detail
out_stream_callback(stream_out);
istr = receiveResponse(*session, request, response);
response.getCookies(cookies);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
@ -90,7 +94,6 @@ namespace detail
}
}
bool nextImpl() override
{
if (!impl->next())
@ -99,6 +102,14 @@ namespace detail
working_buffer = internal_buffer;
return true;
}
std::string getResponseCookie(const std::string & name, const std::string & def) const
{
for (const auto & cookie : cookies)
if (cookie.getName() == name)
return cookie.getValue();
return def;
}
};
}

View File

@ -33,7 +33,7 @@ Block PartLogElement::createBlock()
return
{
{ColumnInt8::create(), std::move(event_type_datatype), "event_type"},
{ColumnInt8::create(), std::move(event_type_datatype), "event_type"},
{ColumnUInt16::create(), std::make_shared<DataTypeDate>(), "event_date"},
{ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(), "event_time"},
{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "duration_ms"},

View File

@ -56,10 +56,12 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String protocol_version = params.get("protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
String part_name = params.get("part");
if (protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE && protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE)
if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE && client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE)
throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL);
const auto data_settings = data.getSettings();
@ -78,6 +80,8 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
response.setChunkedTransferEncoding(false);
return;
}
response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE});
++total_sends;
SCOPE_EXIT({--total_sends;});
@ -108,11 +112,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
MergeTreeData::DataPart::Checksums data_checksums;
if (protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
writeBinary(checksums.getTotalSizeOnDisk(), out);
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
@ -195,10 +199,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
uri.setPort(port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part_name", part_name},
{"protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE},
{"compress", "false"}
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE},
{"compress", "false"}
});
Poco::Net::HTTPBasicCredentials creds{};
@ -208,50 +212,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
creds.setPassword(password);
}
bool protocol_error = true;
try
{
PooledReadWriteBufferFromHTTP in{
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data_settings->replicated_max_parallel_fetches_for_host
};
UInt64 sum_files_size;
readBinary(sum_files_size, in);
protocol_error = false;
auto reservation = data.reserveSpace(sum_files_size);
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
}
catch (const Exception & e) ///@TODO_IGR ASK maybe catch connection and others error here
{
if (!protocol_error)
throw;
LOG_WARNING(log, "Looks like old ClickHouse version node. Trying to use fetch protocol version 0 (" + String(e.what()) + ")"); ///@TODO_IGR ASK new msg
}
/// Protocol error
/// Seems to be replica without protocol_version "1" supporting
/// Try to use old one
Poco::URI uri_v0;
uri_v0.setScheme(interserver_scheme);
uri_v0.setHost(host);
uri_v0.setPort(port);
uri_v0.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"compress", "false"}
});
PooledReadWriteBufferFromHTTP in{
uri_v0,
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
@ -260,8 +222,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
data_settings->replicated_max_parallel_fetches_for_host
};
/// We don't know real size of part
auto reservation = data.reserveOnMaxDiskWithoutReservation();
auto server_protocol_version = in.getResponseCookie("server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
DiskSpace::ReservationPtr reservation;
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
size_t sum_files_size;
readBinary(sum_files_size, in);
reservation = data.reserveSpace(sum_files_size);
}
else
{
/// We don't know real size of part because sender server version is old
reservation = data.reserveOnMaxDiskWithoutReservation();
}
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
}