Merge pull request #5865 from yandex/fix_fetch_of_existing_part

Fix fetch of existing part and add CHECK QUERY support
This commit is contained in:
alexey-milovidov 2019-07-09 15:18:48 +03:00 committed by GitHub
commit 62d7c03f8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 432 additions and 51 deletions

View File

@ -42,35 +42,40 @@ void FileChecker::update(const Files::const_iterator & begin, const Files::const
save(); save();
} }
bool FileChecker::check() const CheckResults FileChecker::check() const
{ {
/** Read the files again every time you call `check` - so as not to violate the constancy. /** Read the files again every time you call `check` - so as not to violate the constancy.
* `check` method is rarely called. * `check` method is rarely called.
*/ */
CheckResults results;
Map local_map; Map local_map;
load(local_map, files_info_path); load(local_map, files_info_path);
if (local_map.empty()) if (local_map.empty())
return true; return {};
for (const auto & name_size : local_map) for (const auto & name_size : local_map)
{ {
Poco::File file(Poco::Path(files_info_path).parent().toString() + "/" + name_size.first); Poco::Path path = Poco::Path(files_info_path).parent().toString() + "/" + name_size.first;
Poco::File file(path);
if (!file.exists()) if (!file.exists())
{ {
LOG_ERROR(log, "File " << file.path() << " doesn't exist"); results.emplace_back(path.getFileName(), false, "File " + file.path() + " doesn't exist");
return false; break;
} }
size_t real_size = file.getSize(); size_t real_size = file.getSize();
if (real_size != name_size.second) if (real_size != name_size.second)
{ {
LOG_ERROR(log, "Size of " << file.path() << " is wrong. Size is " << real_size << " but should be " << name_size.second); results.emplace_back(path.getFileName(), false, "Size of " + file.path() + " is wrong. Size is " + toString(real_size) + " but should be " + toString(name_size.second));
return false; break;
} }
results.emplace_back(path.getFileName(), true, "");
} }
return true; return results;
} }
void FileChecker::initialize() void FileChecker::initialize()

View File

@ -3,6 +3,7 @@
#include <string> #include <string>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <Storages/CheckResults.h>
namespace DB namespace DB
@ -24,7 +25,7 @@ public:
void update(const Files::const_iterator & begin, const Files::const_iterator & end); void update(const Files::const_iterator & begin, const Files::const_iterator & end);
/// Check the files whose parameters are specified in sizes.json /// Check the files whose parameters are specified in sizes.json
bool check() const; CheckResults check() const;
private: private:
void initialize(); void initialize();

View File

@ -4,6 +4,7 @@
#include <Parsers/ASTCheckQuery.h> #include <Parsers/ASTCheckQuery.h>
#include <DataStreams/OneBlockInputStream.h> #include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
@ -11,6 +12,21 @@
namespace DB namespace DB
{ {
namespace
{
NamesAndTypes getBlockStructure()
{
return {
{"part_path", std::make_shared<DataTypeString>()},
{"is_passed", std::make_shared<DataTypeUInt8>()},
{"message", std::make_shared<DataTypeString>()},
};
}
}
InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_) InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_)
: query_ptr(query_ptr_), context(context_) : query_ptr(query_ptr_), context(context_)
{ {
@ -19,18 +35,32 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Co
BlockIO InterpreterCheckQuery::execute() BlockIO InterpreterCheckQuery::execute()
{ {
const auto & alter = query_ptr->as<ASTCheckQuery &>(); const auto & check = query_ptr->as<ASTCheckQuery &>();
const String & table_name = alter.table; const String & table_name = check.table;
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database; String database_name = check.database.empty() ? context.getCurrentDatabase() : check.database;
StoragePtr table = context.getTable(database_name, table_name); StoragePtr table = context.getTable(database_name, table_name);
auto check_results = table->checkData(query_ptr, context);
auto column = ColumnUInt8::create(); auto block_structure = getBlockStructure();
column->insertValue(UInt64(table->checkData())); auto path_column = block_structure[0].type->createColumn();
result = Block{{ std::move(column), std::make_shared<DataTypeUInt8>(), "result" }}; auto is_passed_column = block_structure[1].type->createColumn();
auto message_column = block_structure[2].type->createColumn();
for (const auto & check_result : check_results)
{
path_column->insert(check_result.fs_path);
is_passed_column->insert(static_cast<UInt8>(check_result.success));
message_column->insert(check_result.failure_message);
}
Block block({
{std::move(path_column), block_structure[0].type, block_structure[0].name},
{std::move(is_passed_column), block_structure[1].type, block_structure[1].name},
{std::move(message_column), block_structure[2].type, block_structure[2].name}});
BlockIO res; BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(result); res.in = std::make_shared<OneBlockInputStream>(block);
return res; return res;
} }

View File

@ -1,12 +1,17 @@
#pragma once #pragma once
#include <Parsers/ASTQueryWithTableAndOutput.h> #include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTPartition.h>
namespace DB namespace DB
{ {
struct ASTCheckQuery : public ASTQueryWithTableAndOutput struct ASTCheckQuery : public ASTQueryWithTableAndOutput
{ {
ASTPtr partition;
/** Get the text that identifies this element. */ /** Get the text that identifies this element. */
String getID(char delim) const override { return "CheckQuery" + (delim + database) + delim + table; } String getID(char delim) const override { return "CheckQuery" + (delim + database) + delim + table; }
@ -19,7 +24,7 @@ struct ASTCheckQuery : public ASTQueryWithTableAndOutput
} }
protected: protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked frame) const override void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{ {
std::string nl_or_nothing = settings.one_line ? "" : "\n"; std::string nl_or_nothing = settings.one_line ? "" : "\n";
@ -37,6 +42,12 @@ protected:
} }
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << backQuoteIfNeed(table) << (settings.hilite ? hilite_none : ""); settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << backQuoteIfNeed(table) << (settings.hilite ? hilite_none : "");
} }
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
} }
}; };

View File

@ -3,6 +3,7 @@
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ExpressionElementParsers.h> #include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ASTCheckQuery.h> #include <Parsers/ASTCheckQuery.h>
#include <Parsers/ParserPartition.h>
namespace DB namespace DB
@ -11,9 +12,11 @@ namespace DB
bool ParserCheckQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool ParserCheckQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
ParserKeyword s_check_table("CHECK TABLE"); ParserKeyword s_check_table("CHECK TABLE");
ParserKeyword s_partition("PARTITION");
ParserToken s_dot(TokenType::Dot); ParserToken s_dot(TokenType::Dot);
ParserIdentifier table_parser; ParserIdentifier table_parser;
ParserPartition partition_parser;
ASTPtr table; ASTPtr table;
ASTPtr database; ASTPtr database;
@ -23,24 +26,28 @@ bool ParserCheckQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!table_parser.parse(pos, database, expected)) if (!table_parser.parse(pos, database, expected))
return false; return false;
auto query = std::make_shared<ASTCheckQuery>();
if (s_dot.ignore(pos)) if (s_dot.ignore(pos))
{ {
if (!table_parser.parse(pos, table, expected)) if (!table_parser.parse(pos, table, expected))
return false; return false;
auto query = std::make_shared<ASTCheckQuery>();
getIdentifierName(database, query->database); getIdentifierName(database, query->database);
getIdentifierName(table, query->table); getIdentifierName(table, query->table);
node = query;
} }
else else
{ {
table = database; table = database;
auto query = std::make_shared<ASTCheckQuery>();
getIdentifierName(table, query->table); getIdentifierName(table, query->table);
node = query;
} }
if (s_partition.ignore(pos, expected))
{
if (!partition_parser.parse(pos, query->partition, expected))
return false;
}
node = query;
return true; return true;
} }

View File

@ -0,0 +1,27 @@
#pragma once
#include <Core/Types.h>
#include <vector>
namespace DB
{
/// Result of CHECK TABLE query for single part of table
struct CheckResult
{
/// Part name for merge tree or file name for simplier tables
String fs_path;
/// Does check passed
bool success = false;
/// Failure message if any
String failure_message;
CheckResult() = default;
CheckResult(const String & fs_path_, bool success_, String failure_message_)
: fs_path(fs_path_), success(success_), failure_message(failure_message_)
{}
};
using CheckResults = std::vector<CheckResult>;
}

View File

@ -8,6 +8,7 @@
#include <Storages/IStorage_fwd.h> #include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/TableStructureLockHolder.h> #include <Storages/TableStructureLockHolder.h>
#include <Storages/CheckResults.h>
#include <Common/ActionLock.h> #include <Common/ActionLock.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/RWLock.h> #include <Common/RWLock.h>
@ -285,7 +286,7 @@ public:
virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, const Context & /* query_context */) const { return false; } virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, const Context & /* query_context */) const { return false; }
/// Checks validity of the data /// Checks validity of the data
virtual bool checkData() const { throw Exception("Check query is not supported for " + getName() + " storage", ErrorCodes::NOT_IMPLEMENTED); } virtual CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) { throw Exception("Check query is not supported for " + getName() + " storage", ErrorCodes::NOT_IMPLEMENTED); }
/// Checks that table could be dropped right now /// Checks that table could be dropped right now
/// Otherwise - throws an exception with detailed information. /// Otherwise - throws an exception with detailed information.

View File

@ -181,7 +181,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
} }
void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
{ {
LOG_WARNING(log, "Checking part " << part_name); LOG_WARNING(log, "Checking part " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks); ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
@ -197,6 +197,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
if (!part) if (!part)
{ {
searchForMissingPart(part_name); searchForMissingPart(part_name);
return {part_name, false, "Part is missing, will search for it"};
} }
/// We have this part, and it's active. We will check whether we need this part and whether it has the right data. /// We have this part, and it's active. We will check whether we need this part and whether it has the right data.
else if (part->name == part_name) else if (part->name == part_name)
@ -242,7 +243,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
if (need_stop) if (need_stop)
{ {
LOG_INFO(log, "Checking part was cancelled."); LOG_INFO(log, "Checking part was cancelled.");
return; return {part_name, false, "Checking part was cancelled"};
} }
LOG_INFO(log, "Part " << part_name << " looks good."); LOG_INFO(log, "Part " << part_name << " looks good.");
@ -253,13 +254,15 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
tryLogCurrentException(log, __PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch."); String message = "Part " + part_name + " looks broken. Removing it and queueing a fetch.";
LOG_ERROR(log, message);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
storage.removePartAndEnqueueFetch(part_name); storage.removePartAndEnqueueFetch(part_name);
/// Delete part locally. /// Delete part locally.
storage.forgetPartAndMoveToDetached(part, "broken"); storage.forgetPartAndMoveToDetached(part, "broken");
return {part_name, false, message};
} }
} }
else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr)) else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
@ -269,8 +272,10 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
/// Therefore, delete only if the part is old (not very reliable). /// Therefore, delete only if the part is old (not very reliable).
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
LOG_ERROR(log, "Unexpected part " << part_name << " in filesystem. Removing."); String message = "Unexpected part " + part_name + " in filesystem. Removing.";
LOG_ERROR(log, message);
storage.forgetPartAndMoveToDetached(part, "unexpected"); storage.forgetPartAndMoveToDetached(part, "unexpected");
return {part_name, false, message};
} }
else else
{ {
@ -290,6 +295,8 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
/// In the worst case, errors will still appear `old_parts_lifetime` seconds in error log until the part is removed as the old one. /// In the worst case, errors will still appear `old_parts_lifetime` seconds in error log until the part is removed as the old one.
LOG_WARNING(log, "We have part " << part->name << " covering part " << part_name); LOG_WARNING(log, "We have part " << part->name << " covering part " << part_name);
} }
return {part_name, true, ""};
} }

View File

@ -11,6 +11,7 @@
#include <Core/Types.h> #include <Core/Types.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Storages/CheckResults.h>
namespace DB namespace DB
{ {
@ -66,12 +67,12 @@ public:
/// Get the number of parts in the queue for check. /// Get the number of parts in the queue for check.
size_t size() const; size_t size() const;
/// Check part by name
CheckResult checkPart(const String & part_name);
private: private:
void run(); void run();
void checkPart(const String & part_name);
void searchForMissingPart(const String & part_name); void searchForMissingPart(const String & part_name);
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;

View File

@ -53,6 +53,20 @@ public:
private: private:
ReadBufferFromFile mrk_file_buf; ReadBufferFromFile mrk_file_buf;
std::pair<MarkInCompressedFile, size_t> readMarkFromFile()
{
size_t mrk_rows;
MarkInCompressedFile mrk_mark;
readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
readIntBinary(mrk_mark.offset_in_decompressed_block, mrk_hashing_buf);
if (mrk_file_extension == ".mrk2")
readIntBinary(mrk_rows, mrk_hashing_buf);
else
mrk_rows = index_granularity.getMarkRows(mark_position);
return {mrk_mark, mrk_rows};
}
public: public:
HashingReadBuffer mrk_hashing_buf; HashingReadBuffer mrk_hashing_buf;
@ -78,15 +92,8 @@ public:
void assertMark(bool only_read=false) void assertMark(bool only_read=false)
{ {
MarkInCompressedFile mrk_mark;
readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
readIntBinary(mrk_mark.offset_in_decompressed_block, mrk_hashing_buf);
size_t mrk_rows;
if (mrk_file_extension == ".mrk2")
readIntBinary(mrk_rows, mrk_hashing_buf);
else
mrk_rows = index_granularity.getMarkRows(mark_position);
auto [mrk_mark, mrk_rows] = readMarkFromFile();
bool has_alternative_mark = false; bool has_alternative_mark = false;
MarkInCompressedFile alternative_data_mark = {}; MarkInCompressedFile alternative_data_mark = {};
MarkInCompressedFile data_mark = {}; MarkInCompressedFile data_mark = {};
@ -136,6 +143,12 @@ public:
+ toString(compressed_hashing_buf.count()) + " (compressed), " + toString(compressed_hashing_buf.count()) + " (compressed), "
+ toString(uncompressed_hashing_buf.count()) + " (uncompressed)", ErrorCodes::CORRUPTED_DATA); + toString(uncompressed_hashing_buf.count()) + " (uncompressed)", ErrorCodes::CORRUPTED_DATA);
if (index_granularity.hasFinalMark())
{
auto [final_mark, final_mark_rows] = readMarkFromFile();
if (final_mark_rows != 0)
throw Exception("Incorrect final mark at the end of " + mrk_file_path + " expected 0 rows, got " + toString(final_mark_rows), ErrorCodes::CORRUPTED_DATA);
}
if (!mrk_hashing_buf.eof()) if (!mrk_hashing_buf.eof())
throw Exception("EOF expected in " + mrk_file_path + " file" throw Exception("EOF expected in " + mrk_file_path + " file"
+ " at position " + " at position "

View File

@ -625,7 +625,7 @@ BlockOutputStreamPtr StorageLog::write(
return std::make_shared<LogBlockOutputStream>(*this); return std::make_shared<LogBlockOutputStream>(*this);
} }
bool StorageLog::checkData() const CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
std::shared_lock<std::shared_mutex> lock(rwlock); std::shared_lock<std::shared_mutex> lock(rwlock);
return file_checker.check(); return file_checker.check();

View File

@ -38,7 +38,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
bool checkData() const override; CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
void truncate(const ASTPtr &, const Context &) override; void truncate(const ASTPtr &, const Context &) override;

View File

@ -7,8 +7,10 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Interpreters/InterpreterAlterQuery.h> #include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h> #include <Storages/MergeTree/ActiveDataPartSet.h>
@ -17,6 +19,7 @@
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h> #include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h> #include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeList.h> #include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <optional> #include <optional>
@ -1121,4 +1124,59 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
return {}; return {};
} }
CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & context)
{
CheckResults results;
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{
String partition_id = getPartitionIDFromQuery(check_query.partition, context);
data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
}
else
data_parts = getDataPartsVector();
for (auto & part : data_parts)
{
String full_part_path = part->getFullPath();
/// If the checksums file is not present, calculate the checksums and write them to disk.
String checksums_path = full_part_path + "checksums.txt";
String tmp_checksums_path = full_part_path + "checksums.txt.tmp";
if (!Poco::File(checksums_path).exists())
{
try
{
auto calculated_checksums = checkDataPart(part, false, primary_key_data_types, skip_indices);
calculated_checksums.checkEqual(part->checksums, true);
WriteBufferFromFile out(tmp_checksums_path, 4096);
part->checksums.write(out);
Poco::File(tmp_checksums_path).renameTo(checksums_path);
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
{
Poco::File tmp_file(tmp_checksums_path);
if (tmp_file.exists())
tmp_file.remove();
results.emplace_back(part->name, false,
"Check of part finished with error: '" + ex.message() + "'");
}
}
else
{
try
{
checkDataPart(part, true, primary_key_data_types, skip_indices);
results.emplace_back(part->name, true, "");
}
catch (const Exception & ex)
{
results.emplace_back(part->name, false, ex.message());
}
}
}
return results;
}
} }

View File

@ -70,6 +70,8 @@ public:
String getDataPath() const override { return full_path; } String getDataPath() const override { return full_path; }
CheckResults checkData(const ASTPtr & query, const Context & context) override;
private: private:
String path; String path;

View File

@ -30,6 +30,7 @@
#include <Parsers/ASTOptimizeQuery.h> #include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
@ -2380,7 +2381,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
auto results = zookeeper->multi(ops); auto results = zookeeper->multi(ops);
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results[0]).path_created; String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
queue.insert(zookeeper, log_entry); queue.insert(zookeeper, log_entry);
} }
@ -5107,4 +5108,31 @@ bool StorageReplicatedMergeTree::dropPartsInPartition(
return true; return true;
} }
CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const Context & context)
{
CheckResults results;
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{
String partition_id = getPartitionIDFromQuery(check_query.partition, context);
data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
}
else
data_parts = getDataPartsVector();
for (auto & part : data_parts)
{
try
{
results.push_back(part_check_thread.checkPart(part->name));
}
catch (const Exception & ex)
{
results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
}
return results;
}
} }

View File

@ -170,6 +170,8 @@ public:
String getDataPath() const override { return full_path; } String getDataPath() const override { return full_path; }
CheckResults checkData(const ASTPtr & query, const Context & context) override;
private: private:
/// Delete old parts from disk and from ZooKeeper. /// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK(); void clearOldPartsAndRemoveFromZK();

View File

@ -282,7 +282,7 @@ BlockOutputStreamPtr StorageStripeLog::write(
} }
bool StorageStripeLog::checkData() const CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
std::shared_lock<std::shared_mutex> lock(rwlock); std::shared_lock<std::shared_mutex> lock(rwlock);
return file_checker.check(); return file_checker.check();

View File

@ -40,7 +40,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
bool checkData() const override; CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
/// Data of the file. /// Data of the file.
struct ColumnData struct ColumnData

View File

@ -32,6 +32,7 @@
#include <Storages/StorageTinyLog.h> #include <Storages/StorageTinyLog.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/CheckResults.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
@ -404,7 +405,7 @@ BlockOutputStreamPtr StorageTinyLog::write(
} }
bool StorageTinyLog::checkData() const CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
{ {
return file_checker.check(); return file_checker.check();
} }

View File

@ -39,7 +39,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
bool checkData() const override; CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
/// Column data /// Column data
struct ColumnData struct ColumnData

View File

@ -0,0 +1,129 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in [node1, node2]:
node.query('''
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') PARTITION BY toYYYYMM(date) ORDER BY id;
'''.format(replica=node.name))
node1.query('''
CREATE TABLE non_replicated_mt(date Date, id UInt32, value Int32)
ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY id;
''')
yield cluster
finally:
cluster.shutdown()
def corrupt_data_part_on_disk(node, table, part_name):
part_path = node.query("SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c', 'cd {p} && ls *.bin | head -n 1 | xargs -I{{}} sh -c \'echo "1" >> $1\' -- {{}}'.format(p=part_path)], privileged=True)
def remove_checksums_on_disk(node, table, part_name):
part_path = node.query("SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c', 'rm -r {p}/checksums.txt'.format(p=part_path)], privileged=True)
def remove_part_from_disk(node, table, part_name):
part_path = node.query("SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
if not part_path:
raise Exception("Part " + part_name + "doesn't exist")
node.exec_in_container(['bash', '-c', 'rm -r {p}/*'.format(p=part_path)], privileged=True)
def test_check_normal_table_corruption(started_cluster):
node1.query("INSERT INTO non_replicated_mt VALUES (toDate('2019-02-01'), 1, 10), (toDate('2019-02-01'), 2, 12)")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201902") == "201902_1_1_0\t1\t\n"
remove_checksums_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert node1.query("CHECK TABLE non_replicated_mt").strip() == "201902_1_1_0\t1\tChecksums recounted and written to disk."
assert node1.query("SELECT COUNT() FROM non_replicated_mt") == "2\n"
remove_checksums_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201902").strip() == "201902_1_1_0\t1\tChecksums recounted and written to disk."
assert node1.query("SELECT COUNT() FROM non_replicated_mt") == "2\n"
corrupt_data_part_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert node1.query("CHECK TABLE non_replicated_mt").strip() == "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 16."
assert node1.query("CHECK TABLE non_replicated_mt").strip() == "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 16."
node1.query("INSERT INTO non_replicated_mt VALUES (toDate('2019-01-01'), 1, 10), (toDate('2019-01-01'), 2, 12)")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201901") == "201901_2_2_0\t1\t\n"
corrupt_data_part_on_disk(node1, "non_replicated_mt", "201901_2_2_0")
remove_checksums_on_disk(node1, "non_replicated_mt", "201901_2_2_0")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201901") == "201901_2_2_0\t0\tCheck of part finished with error: \\'Cannot read all data. Bytes read: 2. Bytes expected: 16.\\'\n"
def test_check_replicated_table_simple(started_cluster):
node1.query("TRUNCATE TABLE replicated_mt")
node2.query("SYSTEM SYNC REPLICA replicated_mt")
node1.query("INSERT INTO replicated_mt VALUES (toDate('2019-02-01'), 1, 10), (toDate('2019-02-01'), 2, 12)")
node2.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("SELECT count() from replicated_mt") == "2\n"
assert node2.query("SELECT count() from replicated_mt") == "2\n"
assert node1.query("CHECK TABLE replicated_mt") == "201902_0_0_0\t1\t\n"
assert node2.query("CHECK TABLE replicated_mt") == "201902_0_0_0\t1\t\n"
node2.query("INSERT INTO replicated_mt VALUES (toDate('2019-01-02'), 3, 10), (toDate('2019-01-02'), 4, 12)")
node1.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("SELECT count() from replicated_mt") == "4\n"
assert node2.query("SELECT count() from replicated_mt") == "4\n"
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_0_0_0\t1\t\n"
assert node2.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_0_0_0\t1\t\n"
def test_check_replicated_table_corruption(started_cluster):
node1.query("TRUNCATE TABLE replicated_mt")
node2.query("SYSTEM SYNC REPLICA replicated_mt")
node1.query("INSERT INTO replicated_mt VALUES (toDate('2019-02-01'), 1, 10), (toDate('2019-02-01'), 2, 12)")
node1.query("INSERT INTO replicated_mt VALUES (toDate('2019-01-02'), 3, 10), (toDate('2019-01-02'), 4, 12)")
node2.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("SELECT count() from replicated_mt") == "4\n"
assert node2.query("SELECT count() from replicated_mt") == "4\n"
part_name = node1.query("SELECT name from system.parts where table = 'replicated_mt' and partition_id = '201901' and active = 1").strip()
corrupt_data_part_on_disk(node1, "replicated_mt", part_name)
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "{p}\t0\tPart {p} looks broken. Removing it and queueing a fetch.\n".format(p=part_name)
node1.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "{}\t1\t\n".format(part_name)
assert node1.query("SELECT count() from replicated_mt") == "4\n"
remove_part_from_disk(node2, "replicated_mt", part_name)
assert node2.query("CHECK TABLE replicated_mt PARTITION 201901") == "{p}\t0\tPart {p} looks broken. Removing it and queueing a fetch.\n".format(p=part_name)
node1.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "{}\t1\t\n".format(part_name)
assert node1.query("SELECT count() from replicated_mt") == "4\n"

View File

@ -1,2 +1,5 @@
1 N.bin 1
1 S.bin 1
N.bin 1
S.bin 1
__marks.mrk 1

View File

@ -0,0 +1,11 @@
201901_1_1_0 1
========
201901_1_1_0 1
201901_2_2_0 1
========
201901_1_2_1 1
========
201901_1_2_1 1
201902_3_3_0 1
========
201902_3_4_1 1

View File

@ -0,0 +1,37 @@
DROP TABLE IF EXISTS mt_table;
CREATE TABLE mt_table (d Date, key UInt64, data String) ENGINE = MergeTree() PARTITION BY toYYYYMM(d) ORDER BY key;
CHECK TABLE mt_table;
INSERT INTO mt_table VALUES (toDate('2019-01-02'), 1, 'Hello'), (toDate('2019-01-02'), 2, 'World');
CHECK TABLE mt_table;
INSERT INTO mt_table VALUES (toDate('2019-01-02'), 3, 'quick'), (toDate('2019-01-02'), 4, 'brown');
SELECT '========';
CHECK TABLE mt_table;
OPTIMIZE TABLE mt_table FINAL;
SELECT '========';
CHECK TABLE mt_table;
SELECT '========';
INSERT INTO mt_table VALUES (toDate('2019-02-03'), 5, '!'), (toDate('2019-02-03'), 6, '?');
CHECK TABLE mt_table;
SELECT '========';
INSERT INTO mt_table VALUES (toDate('2019-02-03'), 7, 'jump'), (toDate('2019-02-03'), 8, 'around');
OPTIMIZE TABLE mt_table FINAL;
CHECK TABLE mt_table PARTITION 201902;
DROP TABLE IF EXISTS mt_table;

View File

@ -1,10 +1,17 @@
1
1
1
8873898 12457120258355519194 8873898 12457120258355519194
8873898 12457120258355519194 8873898 12457120258355519194
8873898 12457120258355519194 8873898 12457120258355519194
8873898 12457120258355519194 8873898 12457120258355519194
1 AdvEngineID.bin 1
1 CounterID.bin 1
1 RegionID.bin 1
SearchPhrase.bin 1
UserID.bin 1
__marks.mrk 1
AdvEngineID.bin 1
CounterID.bin 1
RegionID.bin 1
SearchPhrase.bin 1
UserID.bin 1
data.bin 1
index.mrk 1