Merge pull request #5832 from yandex/fix_low_cardinality_check

Fix low cardinality check
This commit is contained in:
alexey-milovidov 2019-07-02 17:06:06 +03:00 committed by GitHub
commit 1bda6994ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -76,7 +76,7 @@ public:
, mrk_hashing_buf(mrk_file_buf)
{}
void assertMark()
void assertMark(bool only_read=false)
{
MarkInCompressedFile mrk_mark;
readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
@ -120,7 +120,7 @@ public:
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (mrk_mark != data_mark || mrk_rows != index_granularity.getMarkRows(mark_position))
if (!only_read && (mrk_mark != data_mark || mrk_rows != index_granularity.getMarkRows(mark_position)))
throw Exception("Incorrect mark: " + data_mark.toStringWithRows(index_granularity.getMarkRows(mark_position)) +
(has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " +
mrk_mark.toStringWithRows(mrk_rows) + " in " + mrk_file_path + " file", ErrorCodes::INCORRECT_MARK);
@ -319,20 +319,35 @@ MergeTreeData::DataPart::Checksums checkDataPart(
size_t column_size = 0;
size_t mark_num = 0;
IDataType::DeserializeBinaryBulkStatePtr state;
IDataType::DeserializeBinaryBulkSettings settings;
settings.getter = [&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
auto & stream = streams.try_emplace(file_name, path, file_name, ".bin", mrk_file_extension, adaptive_index_granularity).first->second;
return &stream.uncompressed_hashing_buf;
};
/// Prefixes have to be read before data because first mark points after prefix
name_type.type->deserializeBinaryBulkStatePrefix(settings, state);
while (true)
{
IDataType::DeserializeBinaryBulkSettings settings;
/// Check that mark points to current position in file.
bool marks_eof = false;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
auto & stream = streams.try_emplace(file_name, path, file_name, ".bin", mrk_file_extension, adaptive_index_granularity).first->second;
try
{
/// LowCardinality dictionary column is not read monotonically, so marks maybe inconsistent with
/// offset position in file. So we just read data and marks file, but doesn't check marks equality.
bool only_read = !substream_path.empty() && substream_path.back().type == IDataType::Substream::DictionaryKeys;
if (!stream.mrk_hashing_buf.eof())
stream.assertMark();
stream.assertMark(only_read);
else
marks_eof = true;
}
@ -352,17 +367,6 @@ MergeTreeData::DataPart::Checksums checkDataPart(
/// NOTE Shared array sizes of Nested columns are read more than once. That's Ok.
MutableColumnPtr tmp_column = name_type.type->createColumn();
settings.getter = [&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
auto stream_it = streams.find(file_name);
if (stream_it == streams.end())
throw Exception("Logical error: cannot find stream " + file_name, ErrorCodes::LOGICAL_ERROR);
return &stream_it->second.uncompressed_hashing_buf;
};
IDataType::DeserializeBinaryBulkStatePtr state;
name_type.type->deserializeBinaryBulkStatePrefix(settings, state);
name_type.type->deserializeBinaryBulkWithMultipleStreams(*tmp_column, rows_after_mark, settings, state);
size_t read_size = tmp_column->size();