ClickHouse/src/Storages/MergeTree/checkDataPart.cpp

163 lines
5.6 KiB
C++
Raw Normal View History

2019-03-20 16:18:13 +00:00
#include <algorithm>
#include <optional>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/checkDataPart.h>
2020-01-13 14:53:32 +00:00
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
2018-12-28 18:15:26 +00:00
#include <Compression/CompressedReadBuffer.h>
#include <IO/HashingReadBuffer.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedChecks;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
2020-01-17 12:24:27 +00:00
extern const int UNKNOWN_PART_TYPE;
}
2020-01-13 14:53:32 +00:00
IMergeTreeDataPart::Checksums checkDataPart(
2020-02-28 17:14:55 +00:00
const DiskPtr & disk,
const String & full_relative_path,
2020-01-13 14:53:32 +00:00
const NamesAndTypesList & columns_list,
const MergeTreeDataPartType & part_type,
bool require_checksums,
std::function<bool()> is_cancelled)
{
/** Responsibility:
* - read list of columns from columns.txt;
* - read checksums if exist;
2020-01-13 14:53:32 +00:00
* - validate list of columns and checksums
*/
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
2020-02-28 17:14:55 +00:00
String path = full_relative_path;
if (!path.empty() && path.back() != '/')
path += "/";
2020-01-13 14:53:32 +00:00
NamesAndTypesList columns_txt;
{
2020-02-28 17:14:55 +00:00
auto buf = disk->readFile(path + "columns.txt");
columns_txt.readText(*buf);
assertEOF(*buf);
}
2020-01-13 14:53:32 +00:00
if (columns_txt != columns_list)
throw Exception("Columns doesn't match in part " + path
+ ". Expected: " + columns_list.toString()
+ ". Found: " + columns_txt.toString(), ErrorCodes::CORRUPTED_DATA);
/// Real checksums based on contents of data. Must correspond to checksums.txt. If not - it means the data is broken.
2020-01-13 14:53:32 +00:00
IMergeTreeDataPart::Checksums checksums_data;
2020-04-07 17:08:46 +00:00
/// This function calculates checksum for both compressed and decompressed contents of compressed file.
2020-02-28 17:14:55 +00:00
auto checksum_compressed_file = [](const DiskPtr & disk_, const String & file_path)
{
2020-02-28 17:14:55 +00:00
auto file_buf = disk_->readFile(file_path);
HashingReadBuffer compressed_hashing_buf(*file_buf);
2020-01-13 14:53:32 +00:00
CompressedReadBuffer uncompressing_buf(compressed_hashing_buf);
HashingReadBuffer uncompressed_hashing_buf(uncompressing_buf);
2020-01-13 14:53:32 +00:00
uncompressed_hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
return IMergeTreeDataPart::Checksums::Checksum
{
2020-01-13 14:53:32 +00:00
compressed_hashing_buf.count(), compressed_hashing_buf.getHash(),
uncompressed_hashing_buf.count(), uncompressed_hashing_buf.getHash()
};
};
2019-12-02 23:50:53 +00:00
2020-04-07 17:08:46 +00:00
/// First calculate checksums for columns data
2020-01-13 14:53:32 +00:00
if (part_type == MergeTreeDataPartType::COMPACT)
{
const auto & file_name = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
2020-02-28 17:14:55 +00:00
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
2020-01-13 14:53:32 +00:00
}
else if (part_type == MergeTreeDataPartType::WIDE)
{
for (const auto & column : columns_list)
{
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(column.name, substream_path) + ".bin";
2020-02-28 17:14:55 +00:00
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);
2020-01-13 14:53:32 +00:00
}, {});
}
2020-01-13 14:53:32 +00:00
}
else
{
throw Exception("Unknown type in part " + path, ErrorCodes::UNKNOWN_PART_TYPE);
}
2020-04-07 17:08:46 +00:00
/// Checksums from the rest files listed in checksums.txt. May be absent. If present, they are subsequently compared with the actual data checksums.
2020-01-13 14:53:32 +00:00
IMergeTreeDataPart::Checksums checksums_txt;
2020-02-28 17:14:55 +00:00
if (require_checksums || disk->exists(path + "checksums.txt"))
{
2020-02-28 17:14:55 +00:00
auto buf = disk->readFile(path + "checksums.txt");
checksums_txt.read(*buf);
assertEOF(*buf);
}
const auto & checksum_files_txt = checksums_txt.files;
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
{
const String & file_name = it->name();
auto checksum_it = checksums_data.files.find(file_name);
2020-04-07 23:13:53 +00:00
2020-04-07 17:08:46 +00:00
/// Skip files that we already calculated. Also skip metadata files that are not checksummed.
if (checksum_it == checksums_data.files.end() && file_name != "checksums.txt" && file_name != "columns.txt")
{
auto txt_checksum_it = checksum_files_txt.find(file_name);
if (txt_checksum_it == checksum_files_txt.end() || txt_checksum_it->second.uncompressed_size == 0)
{
2020-04-07 17:08:46 +00:00
/// The file is not compressed.
auto file_buf = disk->readFile(it->path());
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
}
else /// If we have both compressed and uncompressed in txt, than calculate them
{
checksums_data.files[file_name] = checksum_compressed_file(disk, it->path());
}
}
}
2020-01-13 14:53:32 +00:00
if (is_cancelled())
return {};
if (require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, true);
return checksums_data;
}
2020-01-13 14:53:32 +00:00
IMergeTreeDataPart::Checksums checkDataPart(
2019-03-20 16:18:13 +00:00
MergeTreeData::DataPartPtr data_part,
bool require_checksums,
std::function<bool()> is_cancelled)
{
return checkDataPart(
2020-02-28 17:14:55 +00:00
data_part->disk,
data_part->getFullRelativePath(),
2020-01-16 16:15:01 +00:00
data_part->getColumns(),
2020-01-13 14:53:32 +00:00
data_part->getType(),
2019-03-20 16:18:13 +00:00
require_checksums,
is_cancelled);
}
}