Add checksum for extra info/query in distributed sends

This extras includes:
- server revision
- query settings
- query

Otherwise the code can try to interpret data, and got for instance
std::length_error exception, which is not catched (to mark the part as
broken). Also this will protect from the corruptions on disk.

And add a simple test, since dbms/tests/integration/test_insert_into_distributed too complex.

Also simplify the code by using readStringBinary() over
readVarUInt()+b.readStrict() (this also gains additional checks that
string is not bigger then 1GB).

Refs: #4852 (8ef7f3589a)
v2: avoid ABI breakage (suggested by @vitlibar)
v3: minor code fixes (suggested by @vitlibar) and as a consequence clang-8 build fix
v4: drop DBMS_MIN_REVISION_WITH_EXTRAS_CHECKSUM_IN_DIST_BATCH and also
    revert some renames to make the patch cleaner
This commit is contained in:
Azat Khuzhin 2019-11-10 14:11:57 +03:00
parent 2592e44eef
commit 3a72e1c12a
4 changed files with 62 additions and 4 deletions

View File

@ -7,6 +7,8 @@
#include <Common/ClickHouseRevision.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <Common/hex.h>
#include <common/StringRef.h>
#include <Interpreters/Context.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <IO/ReadBufferFromFile.h>
@ -40,6 +42,7 @@ namespace ErrorCodes
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CORRUPTED_DATA;
}
@ -60,6 +63,19 @@ namespace
return pools;
}
void assertChecksum(CityHash_v1_0_2::uint128 expected, CityHash_v1_0_2::uint128 calculated)
{
if (expected != calculated)
{
String message = "Checksum of extra info doesn't match: corrupted data."
" Reference: " + getHexUIntLowercase(expected.first) + getHexUIntLowercase(expected.second)
+ ". Actual: " + getHexUIntLowercase(calculated.first) + getHexUIntLowercase(calculated.second)
+ ".";
throw Exception(message, ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}
}
@ -277,13 +293,21 @@ void StorageDistributedDirectoryMonitor::readQueryAndSettings(
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_EXTRA_INFO)
{
UInt64 initiator_revision;
CityHash_v1_0_2::uint128 expected;
CityHash_v1_0_2::uint128 calculated;
/// Read extra information.
String extra_info_as_string;
readStringBinary(extra_info_as_string, in);
readVarUInt(query_size, in);
ReadBufferFromString extra_info(extra_info_as_string);
/// To avoid out-of-bound, other cases will be checked in read*() helpers.
if (extra_info_as_string.size() < sizeof(expected))
throw Exception("Not enough data", ErrorCodes::CORRUPTED_DATA);
StringRef extra_info_ref(extra_info_as_string.data(), extra_info_as_string.size() - sizeof(expected));
ReadBufferFromMemory extra_info(extra_info_ref.data, extra_info_ref.size);
ReadBuffer checksum(extra_info_as_string.data(), sizeof(expected), extra_info_ref.size);
UInt64 initiator_revision;
readVarUInt(initiator_revision, extra_info);
if (ClickHouseRevision::get() < initiator_revision)
{
@ -293,13 +317,29 @@ void StorageDistributedDirectoryMonitor::readQueryAndSettings(
<< "It may lack support for new features.");
}
/// Extra checksum (all data except itself -- this checksum)
readPODBinary(expected, checksum);
calculated = CityHash_v1_0_2::CityHash128(extra_info_ref.data, extra_info_ref.size);
assertChecksum(expected, calculated);
insert_settings.deserialize(extra_info);
/// Read query
readStringBinary(insert_query, in);
/// Query checksum
readPODBinary(expected, extra_info);
calculated = CityHash_v1_0_2::CityHash128(insert_query.data(), insert_query.size());
assertChecksum(expected, calculated);
/// Add handling new data here, for example:
/// if (initiator_revision >= DBMS_MIN_REVISION_WITH_MY_NEW_DATA)
/// readVarUInt(my_new_data, extra_info);
return;
}
else if (query_size == DBMS_DISTRIBUTED_SIGNATURE_SETTINGS_OLD_FORMAT)
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_SETTINGS_OLD_FORMAT)
{
insert_settings.deserialize(in, SettingsBinaryFormat::OLD);
readVarUInt(query_size, in);

View File

@ -595,9 +595,14 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
writeVarUInt(ClickHouseRevision::get(), extra_info);
context.getSettingsRef().serialize(extra_info);
writePODBinary(CityHash_v1_0_2::CityHash128(query_string.data(), query_string.size()), extra_info);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, extra_info);
const auto &extra_info_ref = extra_info.stringRef();
writePODBinary(CityHash_v1_0_2::CityHash128(extra_info_ref.data, extra_info_ref.size), extra_info);
writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_EXTRA_INFO, out);
writeStringBinary(extra_info.str(), out);

View File

@ -0,0 +1,9 @@
SET distributed_directory_monitor_batch_inserts=1;
SET distributed_directory_monitor_sleep_time_ms=10;
SET distributed_directory_monitor_max_sleep_time_ms=100;
CREATE TABLE test (key UInt64) ENGINE=TinyLog();
CREATE TABLE dist_test AS test Engine=Distributed(test_cluster_two_shards, currentDatabase(), test, key);
INSERT INTO dist_test SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test;
SELECT * FROM dist_test;