Merge pull request #8044 from vitlibar/simplify-format-of-header-of-distributed-send

Simplify format of the header of data sent to a shard in a distributed query.
This commit is contained in:
Vitaly Baranov 2019-12-06 15:46:00 +03:00 committed by GitHub
commit 8bd9709d1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 56 deletions

View File

@ -151,8 +151,8 @@
#endif
/// Marks that extra information is sent to a shard. It could be any magic numbers.
#define DBMS_DISTRIBUTED_SIGNATURE_EXTRA_INFO 0xCAFEDACEull
#define DBMS_DISTRIBUTED_SIGNATURE_SETTINGS_OLD_FORMAT 0xCAFECABEull
#define DBMS_DISTRIBUTED_SIGNATURE_HEADER 0xCAFEDACEull
#define DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT 0xCAFECABEull
#if !__has_include(<sanitizer/asan_interface.h>)
# define ASAN_UNPOISON_MEMORY_REGION(a, b)

View File

@ -266,7 +266,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
Settings insert_settings;
std::string insert_query;
readQueryAndSettings(in, insert_settings, insert_query);
readHeader(in, insert_settings, insert_query);
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
@ -285,30 +285,28 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
LOG_TRACE(log, "Finished processing `" << file_path << '`');
}
void StorageDistributedDirectoryMonitor::readQueryAndSettings(
void StorageDistributedDirectoryMonitor::readHeader(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const
{
UInt64 query_size;
readVarUInt(query_size, in);
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_EXTRA_INFO)
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER)
{
/// Read the header as a string.
String header;
readStringBinary(header, in);
/// Check the checksum of the header.
CityHash_v1_0_2::uint128 checksum;
readPODBinary(checksum, in);
assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header.data(), header.size()));
/// Read the parts of the header.
ReadBufferFromString header_buf(header);
UInt64 initiator_revision;
CityHash_v1_0_2::uint128 expected;
CityHash_v1_0_2::uint128 calculated;
/// Read extra information.
String extra_info_as_string;
readStringBinary(extra_info_as_string, in);
/// To avoid out-of-bound, other cases will be checked in read*() helpers.
if (extra_info_as_string.size() < sizeof(expected))
throw Exception("Not enough data", ErrorCodes::CORRUPTED_DATA);
StringRef extra_info_ref(extra_info_as_string.data(), extra_info_as_string.size() - sizeof(expected));
ReadBufferFromMemory extra_info(extra_info_ref.data, extra_info_ref.size);
ReadBuffer checksum(extra_info_as_string.data(), sizeof(expected), extra_info_ref.size);
readVarUInt(initiator_revision, extra_info);
readVarUInt(initiator_revision, header_buf);
if (ClickHouseRevision::get() < initiator_revision)
{
LOG_WARNING(
@ -317,32 +315,21 @@ void StorageDistributedDirectoryMonitor::readQueryAndSettings(
<< "It may lack support for new features.");
}
/// Extra checksum (all data except itself -- this checksum)
readPODBinary(expected, checksum);
calculated = CityHash_v1_0_2::CityHash128(extra_info_ref.data, extra_info_ref.size);
assertChecksum(expected, calculated);
insert_settings.deserialize(extra_info);
/// Read query
readStringBinary(insert_query, in);
/// Query checksum
readPODBinary(expected, extra_info);
calculated = CityHash_v1_0_2::CityHash128(insert_query.data(), insert_query.size());
assertChecksum(expected, calculated);
readStringBinary(insert_query, header_buf);
insert_settings.deserialize(header_buf);
/// Add handling new data here, for example:
/// if (initiator_revision >= DBMS_MIN_REVISION_WITH_MY_NEW_DATA)
/// readVarUInt(my_new_data, extra_info);
/// readVarUInt(my_new_data, header_buf);
return;
}
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_SETTINGS_OLD_FORMAT)
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
{
insert_settings.deserialize(in, SettingsBinaryFormat::OLD);
readVarUInt(query_size, in);
readStringBinary(insert_query, in);
return;
}
insert_query.resize(query_size);
@ -459,7 +446,7 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
parent.readQueryAndSettings(in, insert_settings, insert_query);
parent.readHeader(in, insert_settings, insert_query);
if (first)
{
@ -567,7 +554,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readQueryAndSettings(in, insert_settings, insert_query);
readHeader(in, insert_settings, insert_query);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());

View File

@ -66,7 +66,7 @@ private:
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
/// Read insert query and insert settings for backward compatible.
void readQueryAndSettings(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const;
void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const;
};
}

View File

@ -588,25 +588,22 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
/// We wrap the extra information into a string for compatibility with older versions:
/// a shard will able to read this information partly and ignore other parts
/// based on its version.
WriteBufferFromOwnString extra_info;
writeVarUInt(ClickHouseRevision::get(), extra_info);
context.getSettingsRef().serialize(extra_info);
writePODBinary(CityHash_v1_0_2::CityHash128(query_string.data(), query_string.size()), extra_info);
/// Prepare the header.
/// We wrap the header into a string for compatibility with older versions:
/// a shard will able to read the header partly and ignore other parts based on its version.
WriteBufferFromOwnString header_buf;
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, extra_info);
/// writeVarUInt(my_new_data, header_buf);
const auto &extra_info_ref = extra_info.stringRef();
writePODBinary(CityHash_v1_0_2::CityHash128(extra_info_ref.data, extra_info_ref.size), extra_info);
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_EXTRA_INFO, out);
writeStringBinary(extra_info.str(), out);
writeStringBinary(query_string, out);
/// Write the header.
const StringRef header = header_buf.stringRef();
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
writeStringBinary(header, out);
writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
stream.writePrefix();
stream.write(block);