Refactoring distributed header parsing

This commit is contained in:
Azat Khuzhin 2021-01-10 14:39:46 +03:00
parent 676bc83c6d
commit fce8b6b5ef
2 changed files with 76 additions and 81 deletions

View File

@ -78,6 +78,70 @@ namespace
}
}
struct DistributedHeader
{
Settings insert_settings;
std::string insert_query;
ClientInfo client_info;
};
static DistributedHeader readDistributedHeader(ReadBuffer & in, Poco::Logger * log)
{
DistributedHeader header;
UInt64 query_size;
readVarUInt(query_size, in);
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER)
{
/// Read the header as a string.
String header_data;
readStringBinary(header_data, in);
/// Check the checksum of the header.
CityHash_v1_0_2::uint128 checksum;
readPODBinary(checksum, in);
assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header_data.data(), header_data.size()));
/// Read the parts of the header.
ReadBufferFromString header_buf(header_data);
UInt64 initiator_revision;
readVarUInt(initiator_revision, header_buf);
if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision)
{
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
}
readStringBinary(header.insert_query, header_buf);
header.insert_settings.read(header_buf);
if (header_buf.hasPendingData())
header.client_info.read(header_buf, initiator_revision);
/// Add handling new data here, for example:
///
/// if (header_buf.hasPendingData())
/// readVarUInt(my_new_data, header_buf);
///
/// And note that it is safe, because we have checksum and size for header.
return header;
}
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
{
header.insert_settings.read(in, SettingsWriteFormat::BINARY);
readStringBinary(header.insert_query, in);
return header;
}
header.insert_query.resize(query_size);
in.readStrict(header.insert_query.data(), query_size);
return header;
}
}
@ -330,13 +394,10 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
try
{
Block sample_block;
Settings insert_settings;
String insert_query;
ClientInfo client_info;
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readHeader(in, insert_settings, insert_query, client_info, log);
readDistributedHeader(in, log);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
@ -359,16 +420,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
ReadBufferFromFile in{file_path};
const auto & header = readDistributedHeader(in, log);
Settings insert_settings;
std::string insert_query;
ClientInfo client_info;
readHeader(in, insert_settings, insert_query, client_info, log);
auto connection = pool->get(timeouts, &insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, insert_settings, client_info};
auto connection = pool->get(timeouts, &header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info};
remote.writePrefix();
remote.writePrepared(in);
@ -390,57 +446,6 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
LOG_TRACE(log, "Finished processing `{}`", file_path);
}
void StorageDistributedDirectoryMonitor::readHeader(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Poco::Logger * log)
{
UInt64 query_size;
readVarUInt(query_size, in);
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER)
{
/// Read the header as a string.
String header;
readStringBinary(header, in);
/// Check the checksum of the header.
CityHash_v1_0_2::uint128 checksum;
readPODBinary(checksum, in);
assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header.data(), header.size()));
/// Read the parts of the header.
ReadBufferFromString header_buf(header);
UInt64 initiator_revision;
readVarUInt(initiator_revision, header_buf);
if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision)
{
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
}
readStringBinary(insert_query, header_buf);
insert_settings.read(header_buf);
if (header_buf.hasPendingData())
client_info.read(header_buf, initiator_revision);
/// Add handling new data here, for example:
/// if (header_buf.hasPendingData())
/// readVarUInt(my_new_data, header_buf);
return;
}
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
{
insert_settings.read(in, SettingsWriteFormat::BINARY);
readStringBinary(insert_query, in);
return;
}
insert_query.resize(query_size);
in.readStrict(insert_query.data(), query_size);
}
struct StorageDistributedDirectoryMonitor::BatchHeader
{
Settings settings;
@ -551,9 +556,6 @@ struct StorageDistributedDirectoryMonitor::Batch
bool batch_broken = false;
try
{
Settings insert_settings;
String insert_query;
ClientInfo client_info;
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
@ -568,12 +570,13 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
parent.readHeader(in, insert_settings, insert_query, client_info, parent.log);
const auto & header = readDistributedHeader(in, parent.log);
if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, insert_settings, client_info);
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info);
remote->writePrefix();
}
@ -652,10 +655,7 @@ public:
, block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION)
, log{&Poco::Logger::get("DirectoryMonitorBlockInputStream")}
{
Settings insert_settings;
String insert_query;
ClientInfo client_info;
StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, client_info, log);
readDistributedHeader(in, log);
block_in.readPrefix();
first_block = block_in.read();
@ -744,14 +744,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
size_t total_rows = 0;
size_t total_bytes = 0;
Block sample_block;
Settings insert_settings;
String insert_query;
ClientInfo client_info;
DistributedHeader header;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readHeader(in, insert_settings, insert_query, client_info, log);
header = readDistributedHeader(in, log);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
@ -778,7 +776,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
throw;
}
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(client_info), std::move(sample_block));
BatchHeader batch_header(std::move(header.insert_settings), std::move(header.insert_query), std::move(header.client_info), std::move(sample_block));
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.file_indices.push_back(file_idx);

View File

@ -111,9 +111,6 @@ private:
CurrentMetrics::Increment metric_pending_files;
/// Read insert query and insert settings for backward compatible.
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Poco::Logger * log);
friend class DirectoryMonitorBlockInputStream;
};