added indexes to checkDataPart

This commit is contained in:
Nikita Vasilev 2019-01-09 22:20:50 +03:00
parent 0ba6f1421a
commit d9b7f30245
4 changed files with 47 additions and 1 deletions

View File

@ -2097,7 +2097,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const St
/// Check the data while we are at it.
if (part->checksums.empty())
{
part->checksums = checkDataPart(full_part_path, index_granularity, false, primary_key_data_types);
part->checksums = checkDataPart(full_part_path, index_granularity, false, primary_key_data_types, indexes);
{
WriteBufferFromFile out(full_part_path + "checksums.txt.tmp", 4096);

View File

@ -225,6 +225,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
storage.data.index_granularity,
true,
storage.data.primary_key_data_types,
storage.data.indexes,
[this] { return need_stop.load(); });
if (need_stop)

View File

@ -137,6 +137,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
size_t index_granularity,
bool require_checksums,
const DataTypes & primary_key_data_types,
const MergeTreeIndexes & indexes,
std::function<bool()> is_cancelled)
{
Logger * log = &Logger::get("checkDataPart");
@ -241,6 +242,49 @@ MergeTreeData::DataPart::Checksums checkDataPart(
rows = count;
}
/// Read and check skip indexes
for (const auto index : indexes)
{
LOG_DEBUG(log, "Checking index " << index->name << " in " << path);
Stream stream(path, index->getFileName(), ".idx");
size_t mark_num = 0;
while (!stream.uncompressed_hashing_buf.eof())
{
if (stream.mrk_hashing_buf.eof())
throw Exception("Unexpected end of mrk file while reading index " + index->name,
ErrorCodes::CORRUPTED_DATA);
try
{
stream.assertMark();
}
catch (Exception &e)
{
e.addMessage("Cannot read mark " + toString(mark_num)
+ " in file " + stream.mrk_file_path
+ ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
throw;
}
try
{
index->createIndexGranule()->deserializeBinary(stream.uncompressed_hashing_buf);
}
catch (Exception &e)
{
e.addMessage("Cannot read granule " + toString(mark_num)
+ " in file " + stream.bin_file_path
+ ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
throw;
}
++mark_num;
if (is_cancelled())
return {};
}
stream.assertEnd();
stream.saveChecksums(checksums_data);
}
/// Read all columns, calculate checksums and validate marks.
for (const NameAndTypePair & name_type : columns)
{

View File

@ -17,6 +17,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
size_t index_granularity,
bool require_checksums,
const DataTypes & primary_key_data_types, /// Check the primary key. If it is not necessary, pass an empty array.
const MergeTreeIndexes & indexes = {}, /// Check skip indexes
std::function<bool()> is_cancelled = []{ return false; });
}