remove function checkPartMetadataCache

This commit is contained in:
taiyang-li 2022-01-07 18:37:08 +08:00
parent 3803cc3d5e
commit cf413f16a8
11 changed files with 101 additions and 220 deletions

View File

@ -1,156 +0,0 @@
#include "config_core.h"
#if USE_ROCKSDB
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>
#include <Common/hex.h>
#include <Core/Field.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
}
class FunctionCheckPartMetadataCache : public IFunction, WithContext
{
public:
using uint128 = IMergeTreeDataPart::uint128;
using DataPartPtr = MergeTreeData::DataPartPtr;
using DataPartState = MergeTreeData::DataPartState;
using DataPartStates = MergeTreeData::DataPartStates;
static constexpr auto name = "checkPartMetadataCache";
static FunctionPtr create(ContextPtr context_) { return std::make_shared<FunctionCheckPartMetadataCache>(context_); }
static constexpr DataPartStates part_states
= {DataPartState::Active,
DataPartState::Temporary,
DataPartState::PreActive,
DataPartState::Outdated,
DataPartState::Deleting,
DataPartState::DeleteOnDestroy};
explicit FunctionCheckPartMetadataCache(ContextPtr context_) : WithContext(context_) { }
String getName() const override { return name; }
bool isDeterministic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (const auto & argument : arguments)
{
if (!isString(argument))
throw Exception("The argument of function " + getName() + " must have String type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
DataTypePtr key_type = std::make_unique<DataTypeString>();
DataTypePtr state_type = std::make_unique<DataTypeString>();
DataTypePtr cache_checksum_type = std::make_unique<DataTypeFixedString>(32);
DataTypePtr disk_checksum_type = std::make_unique<DataTypeFixedString>(32);
DataTypePtr match_type = std::make_unique<DataTypeUInt8>();
DataTypePtr tuple_type
= std::make_unique<DataTypeTuple>(DataTypes{key_type, state_type, cache_checksum_type, disk_checksum_type, match_type});
return std::make_shared<DataTypeArray>(tuple_type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
/// Get database name
const auto * arg_database = arguments[0].column.get();
const ColumnString * column_database = checkAndGetColumnConstData<ColumnString>(arg_database);
if (!column_database)
throw Exception("The first argument of function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_COLUMN);
String database_name = column_database->getDataAt(0).toString();
/// Get table name
const auto * arg_table = arguments[1].column.get();
const ColumnString * column_table = checkAndGetColumnConstData<ColumnString>(arg_table);
if (!column_table)
throw Exception("The second argument of function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_COLUMN);
String table_name = column_table->getDataAt(0).toString();
/// Get storage
StorageID storage_id(database_name, table_name);
auto storage = DatabaseCatalog::instance().getTable(storage_id, getContext());
auto data = std::dynamic_pointer_cast<MergeTreeData>(storage);
if (!data || !data->getSettings()->use_metadata_cache)
throw Exception("The table in function " + getName() + " must be in MergeTree Family", ErrorCodes::ILLEGAL_COLUMN);
/// Fill in checking results.
auto col_result = result_type->createColumn();
auto & col_arr = assert_cast<ColumnArray &>(*col_result);
auto & col_tuple = assert_cast<ColumnTuple &>(col_arr.getData());
col_tuple.reserve(data->fileNumberOfDataParts(part_states));
auto & col_key = assert_cast<ColumnString &>(col_tuple.getColumn(0));
auto & col_state = assert_cast<ColumnString &>(col_tuple.getColumn(1));
auto & col_cache_checksum = assert_cast<ColumnFixedString &>(col_tuple.getColumn(2));
auto & col_disk_checksum = assert_cast<ColumnFixedString &>(col_tuple.getColumn(3));
auto & col_match = assert_cast<ColumnUInt8 &>(col_tuple.getColumn(4));
auto parts = data->getDataParts(part_states);
for (const auto & part : parts)
executePart(part, col_key, col_state, col_cache_checksum, col_disk_checksum, col_match);
col_arr.getOffsets().push_back(col_tuple.size());
return result_type->createColumnConst(input_rows_count, col_arr[0]);
}
static void executePart(
const DataPartPtr & part,
ColumnString & col_key,
ColumnString & col_state,
ColumnFixedString & col_cache_checksum,
ColumnFixedString & col_disk_checksum,
ColumnUInt8 & col_match)
{
Strings keys;
auto state_view = part->stateString();
String state(state_view.data(), state_view.size());
std::vector<uint128> cache_checksums;
std::vector<uint128> disk_checksums;
uint8_t match = 0;
size_t file_number = part->fileNumberOfColumnsChecksumsIndexes();
keys.reserve(file_number);
cache_checksums.reserve(file_number);
disk_checksums.reserve(file_number);
// part->checkMetadataCache(keys, cache_checksums, disk_checksums);
for (size_t i = 0; i < keys.size(); ++i)
{
col_key.insert(keys[i]);
col_state.insert(state);
col_cache_checksum.insert(getHexUIntUppercase(cache_checksums[i].first) + getHexUIntUppercase(cache_checksums[i].second));
col_disk_checksum.insert(getHexUIntUppercase(disk_checksums[i].first) + getHexUIntUppercase(disk_checksums[i].second));
match = cache_checksums[i] == disk_checksums[i] ? 1 : 0;
col_match.insertValue(match);
}
}
};
void registerFunctionCheckPartMetadataCache(FunctionFactory & factory)
{
factory.registerFunction<FunctionCheckPartMetadataCache>();
}
}
#endif

View File

@ -81,10 +81,6 @@ void registerFunctionServerUUID(FunctionFactory &);
void registerFunctionZooKeeperSessionUptime(FunctionFactory &);
void registerFunctionGetOSKernelVersion(FunctionFactory &);
#if USE_ROCKSDB
void registerFunctionCheckPartMetadataCache(FunctionFactory &);
#endif
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
#endif
@ -171,10 +167,6 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionZooKeeperSessionUptime(factory);
registerFunctionGetOSKernelVersion(factory);
#if USE_ROCKSDB
registerFunctionCheckPartMetadataCache(factory);
#endif
#if USE_ICU
registerFunctionConvertCharset(factory);
#endif

View File

@ -61,8 +61,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IMergeTreeDataPart::MinMaxIndex::load(
const MergeTreeData & data, const PartMetadataManagerPtr & manager)
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
@ -1782,8 +1781,6 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID() const
return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
}
/*
#if USE_ROCKSDB
IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const String & file_path) const
{
assert(use_metadata_cache);
@ -1808,52 +1805,10 @@ IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const St
return in_hash.getHash();
}
void IMergeTreeDataPart::checkMetadataCache(Strings & files, std::vector<uint128> & cache_checksums, std::vector<uint128> & disk_checksums) const
std::unordered_map<String, IMergeTreeDataPart::uint128> IMergeTreeDataPart::checkMetadata() const
{
assert(use_metadata_cache);
/// Only applies for normal part
if (isProjectionPart())
return;
/// the directory of projection part is under the directory of its parent part
const auto filenames_without_checksums = getFileNamesWithoutChecksums();
metadata_cache->getFilesAndCheckSums(files, cache_checksums);
for (const auto & file : files)
{
// std::cout << "check key:" << file << std::endl;
String file_name = fs::path(file).filename();
/// file belongs to normal part
if (fs::path(getFullRelativePath()) / file_name == file)
{
auto disk_checksum = getActualChecksumByFile(file);
disk_checksums.push_back(disk_checksum);
continue;
}
/// file belongs to projection part
String proj_dir_name = fs::path(file).parent_path().filename();
auto pos = proj_dir_name.find_last_of('.');
if (pos == String::npos)
{
disk_checksums.push_back({});
continue;
}
String proj_name = proj_dir_name.substr(0, pos);
auto it = projection_parts.find(proj_name);
if (it == projection_parts.end())
{
disk_checksums.push_back({});
continue;
}
auto disk_checksum = it->second->getActualChecksumByFile(file);
disk_checksums.push_back(disk_checksum);
}
return metadata_manager->check();
}
#endif
*/
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{

View File

@ -371,8 +371,6 @@ public:
String getRelativePathForPrefix(const String & prefix, bool detached = false) const;
// virtual void checkMetadataCache(Strings & files, std::vector<uint128> & cache_checksums, std::vector<uint128> & disk_checksums) const;
bool isProjectionPart() const { return parent_part != nullptr; }
const IMergeTreeDataPart * getParentPart() const { return parent_part; }
@ -418,6 +416,12 @@ public:
/// Required for distinguish different copies of the same part on S3
String getUniqueId() const;
/// Get checksums of metadata file in part directory
IMergeTreeDataPart::uint128 getActualChecksumByFile(const String & file_path) const;
/// Check metadata in cache is consistent with actual metadata on disk(if use_metadata_cache is true)
std::unordered_map<String, uint128> checkMetadata() const;
protected:
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
@ -464,6 +468,7 @@ protected:
void initializePartMetadataManager();
private:
/// In compact parts order of columns is necessary
NameToNumber column_name_to_position;
@ -526,9 +531,6 @@ private:
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;
// void modifyAllMetadataCaches(ModifyCacheType type, bool include_projection = false) const;
// IMergeTreeDataPart::uint128 getActualChecksumByFile(const String & file_path) const;
mutable State state{State::Temporary};
};

View File

@ -1,5 +1,6 @@
#pragma once
#include <unordered_map>
#include <city.h>
#include <base/types.h>
@ -33,6 +34,8 @@ public:
virtual void updateAll(bool include_projection) = 0;
virtual std::unordered_map<String, uint128> check() const = 0;
protected:
const IMergeTreeDataPart * part;
const DiskPtr disk;

View File

@ -22,6 +22,7 @@ public:
void updateAll(bool /*include_projection*/) override {}
std::unordered_map<String, uint128> check() const override { return {}; }
};

View File

@ -1,6 +1,8 @@
#include "PartMetadataManagerWithCache.h"
#if USE_ROCKSDB
#include <Common/hex.h>
#include <Common/ErrorCodes.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
@ -11,13 +13,16 @@ namespace ProfileEvents
extern const Event MergeTreeMetadataCacheMiss;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CORRUPTED_DATA;
extern const int NO_SUCH_PROJECTION_IN_TABLE;
}
namespace DB
{
PartMetadataManagerWithCache::PartMetadataManagerWithCache(const IMergeTreeDataPart * part_, const MergeTreeMetadataCachePtr & cache_)
: IPartMetadataManager(part_), cache(cache_)
{
@ -190,5 +195,79 @@ void PartMetadataManagerWithCache::getKeysAndCheckSums(Strings & keys, std::vect
}
}
std::unordered_map<String, IPartMetadataManager::uint128> PartMetadataManagerWithCache::check() const
{
/// Only applies for normal part stored on disk
if (part->isProjectionPart() || !part->isStoredOnDisk())
return {};
/// the directory of projection part is under the directory of its parent part
const auto filenames_without_checksums = part->getFileNamesWithoutChecksums();
std::unordered_map<String, uint128> results;
Strings keys;
std::vector<uint128> cache_checksums;
std::vector<uint128> disk_checksums;
getKeysAndCheckSums(keys, cache_checksums);
for (size_t i = 0; i < keys.size(); ++i)
{
const auto & key = keys[i];
String file_path = getFilePathFromKey(key);
String file_name = fs::path(file_path).filename();
results.emplace(file_name, cache_checksums[i]);
/// File belongs to normal part
if (fs::path(part->getFullRelativePath()) / file_name == file_path)
{
auto disk_checksum = part->getActualChecksumByFile(file_path);
if (disk_checksum != cache_checksums[i])
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Checksums doesn't match in part {}. Expected: {}. Found {}.",
part->name,
getHexUIntUppercase(disk_checksum.first) + getHexUIntUppercase(disk_checksum.second),
getHexUIntUppercase(cache_checksums[i].first) + getHexUIntUppercase(cache_checksums[i].second));
disk_checksums.push_back(disk_checksum);
continue;
}
/// File belongs to projection part
String proj_dir_name = fs::path(file_path).parent_path().filename();
auto pos = proj_dir_name.find_last_of('.');
if (pos == String::npos)
{
throw Exception(
ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE,
"There is no projection in part: {} contains file: {} with directory name: {}",
part->name,
file_path,
proj_dir_name);
}
String proj_name = proj_dir_name.substr(0, pos);
const auto & projection_parts = part->getProjectionParts();
auto it = projection_parts.find(proj_name);
if (it == projection_parts.end())
{
throw Exception(
ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE,
"There is no projection {} in part: {} contains file: {}",
proj_name, part->name, file_path);
}
auto disk_checksum = it->second->getActualChecksumByFile(file_path);
if (disk_checksum != cache_checksums[i])
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Checksums doesn't match in projection part {} {}. Expected: {}. Found {}.",
part->name, proj_name,
getHexUIntUppercase(disk_checksum.first) + getHexUIntUppercase(disk_checksum.second),
getHexUIntUppercase(cache_checksums[i].first) + getHexUIntUppercase(cache_checksums[i].second));
disk_checksums.push_back(disk_checksum);
}
return results;
}
}
#endif

View File

@ -26,6 +26,8 @@ public:
void updateAll(bool include_projection) override;
std::unordered_map<String, uint128> check() const override;
private:
String getKeyFromFilePath(const String & file_path) const;
String getFilePathFromKey(const String & key) const;

View File

@ -399,6 +399,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
LOG_WARNING(log, "We have part {} covering part {}", part->name, part_name);
}
part->checkMetadata();
return {part_name, true, ""};
}

View File

@ -270,7 +270,6 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (require_checksums || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, check_uncompressed);
return checksums_data;
}

View File

@ -1612,6 +1612,8 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
auto out = disk->writeFile(tmp_checksums_path, 4096);
part->checksums.write(*out);
disk->moveFile(tmp_checksums_path, checksums_path);
part->checkMetadata();
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
@ -1628,6 +1630,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
try
{
checkDataPart(part, true);
part->checkMetadata();
results.emplace_back(part->name, true, "");
}
catch (const Exception & ex)