Build proper pipeline for CHECK TABLE

This commit is contained in:
vdimir 2023-08-01 10:57:59 +00:00
parent a882ef295f
commit 1aedc4e892
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
15 changed files with 373 additions and 181 deletions

View File

@ -82,41 +82,35 @@ size_t FileChecker::getTotalSize() const
}
CheckResults FileChecker::check() const
FileChecker::DataValidationTasksPtr FileChecker::getDataValidationTasks()
{
CheckResults results;
auto callback = [&results](const CheckResult & result, size_t) -> bool
{
results.push_back(result);
return true;
};
check(callback);
return results;
return std::make_unique<DataValidationTasks>(map);
}
void FileChecker::check(CheckDataCallback check_callback) const
CheckResult FileChecker::checkNextEntry(DataValidationTasksPtr & check_data_tasks, bool & has_nothing_to_do) const
{
if (map.empty())
return;
for (const auto & name_size : map)
String name;
size_t expected_size;
bool is_finished = check_data_tasks->next(name, expected_size);
if (is_finished)
{
const String & name = name_size.first;
String path = parentPath(files_info_path) + name;
bool exists = fileReallyExists(path);
auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
if (real_size != name_size.second)
{
String failure_message = exists
? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(name_size.second))
: ("File " + path + " doesn't exist");
check_callback(CheckResult(name, false, failure_message), map.size());
break;
}
check_callback(CheckResult(name, true, ""), map.size());
has_nothing_to_do = true;
return {};
}
String path = parentPath(files_info_path) + name;
bool exists = fileReallyExists(path);
auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
if (real_size != expected_size)
{
String failure_message = exists
? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(expected_size))
: ("File " + path + " doesn't exist");
return CheckResult(name, false, failure_message);
}
return CheckResult(name, true, "");
}
void FileChecker::repair()

View File

@ -3,6 +3,7 @@
#include <Storages/CheckResults.h>
#include <map>
#include <base/types.h>
#include <mutex>
namespace Poco { class Logger; }
@ -28,8 +29,11 @@ public:
bool empty() const { return map.empty(); }
/// Check the files whose parameters are specified in sizes.json
CheckResults check() const;
void check(CheckDataCallback check_callback) const;
/// See comment in IStorage::checkDataNext
struct DataValidationTasks;
using DataValidationTasksPtr = std::unique_ptr<DataValidationTasks>;
DataValidationTasksPtr getDataValidationTasks();
CheckResult checkNextEntry(DataValidationTasksPtr & check_data_tasks, bool & has_nothing_to_do) const;
/// Truncate files that have excessive size to the expected size.
/// Throw exception if the file size is less than expected.
@ -42,6 +46,36 @@ public:
/// Returns total size of all files.
size_t getTotalSize() const;
struct DataValidationTasks
{
DataValidationTasks(const std::map<String, size_t> & map_)
: map(map_), it(map.begin())
{}
bool next(String & out_name, size_t & out_size)
{
std::lock_guard lock(mutex);
if (it == map.end())
return true;
out_name = it->first;
out_size = it->second;
++it;
return false;
}
size_t size() const
{
std::lock_guard lock(mutex);
return std::distance(it, map.end());
}
const std::map<String, size_t> & map;
mutable std::mutex mutex;
using Iterator = std::map<String, size_t>::const_iterator;
Iterator it;
};
private:
void load();

View File

@ -11,139 +11,133 @@
#include <Interpreters/ProcessList.h>
#include <algorithm>
#include <Columns/IColumn.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/IAccumulatingTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
Block getBlockFromCheckResult(const CheckResults & check_results, bool check_query_single_value_result)
Block getSingleValueBlock(UInt8 value)
{
if (check_query_single_value_result)
{
bool result = std::all_of(check_results.begin(), check_results.end(), [] (const CheckResult & res) { return res.success; });
return Block{{ColumnUInt8::create(1, static_cast<UInt8>(result)), std::make_shared<DataTypeUInt8>(), "result"}};
}
return Block{{ColumnUInt8::create(1, value), std::make_shared<DataTypeUInt8>(), "result"}};
}
NamesAndTypes block_structure = NamesAndTypes{
Block getHeaderForCheckResult()
{
auto names_and_types = NamesAndTypes{
{"part_path", std::make_shared<DataTypeString>()},
{"is_passed", std::make_shared<DataTypeUInt8>()},
{"message", std::make_shared<DataTypeString>()},
};
auto path_column = block_structure[0].type->createColumn();
auto is_passed_column = block_structure[1].type->createColumn();
auto message_column = block_structure[2].type->createColumn();
for (const auto & check_result : check_results)
{
path_column->insert(check_result.fs_path);
is_passed_column->insert(static_cast<UInt8>(check_result.success));
message_column->insert(check_result.failure_message);
}
return Block({
{std::move(path_column), block_structure[0].type, block_structure[0].name},
{std::move(is_passed_column), block_structure[1].type, block_structure[1].name},
{std::move(message_column), block_structure[2].type, block_structure[2].name},
{names_and_types[0].type->createColumn(), names_and_types[0].type, names_and_types[0].name},
{names_and_types[1].type->createColumn(), names_and_types[1].type, names_and_types[1].name},
{names_and_types[2].type->createColumn(), names_and_types[2].type, names_and_types[2].name},
});
}
class TableCheckResultSource : public ISource
Chunk getChunkFromCheckResult(const CheckResult & check_result)
{
MutableColumns columns = getHeaderForCheckResult().cloneEmptyColumns();
columns[0]->insert(check_result.fs_path);
columns[1]->insert(static_cast<UInt8>(check_result.success));
columns[2]->insert(check_result.failure_message);
return Chunk(std::move(columns), 1);
}
class TableCheckWorkerProcessor : public ISource
{
public:
explicit TableCheckResultSource(const ASTPtr & query_ptr_, StoragePtr table_, bool check_query_single_value_result_, ContextPtr context_)
: ISource(getBlockFromCheckResult({}, check_query_single_value_result_).cloneEmpty())
, query_ptr(query_ptr_)
TableCheckWorkerProcessor(IStorage::DataValidationTasksPtr check_data_tasks_, StoragePtr table_)
: ISource(getHeaderForCheckResult())
, table(table_)
, context(context_)
, check_query_single_value_result(check_query_single_value_result_)
, check_data_tasks(check_data_tasks_)
{
worker_result = std::async(std::launch::async, [this]{ worker(); });
}
String getName() const override { return "TableCheckResultSource"; }
String getName() const override { return "TableCheckWorkerProcessor"; }
protected:
std::optional<Chunk> tryGenerate() override
{
if (is_check_completed)
bool has_nothing_to_do = false;
auto check_result = table->checkDataNext(check_data_tasks, has_nothing_to_do);
if (has_nothing_to_do)
return {};
auto status = worker_result.wait_for(std::chrono::milliseconds(100));
is_check_completed = (status == std::future_status::ready);
/// We can omit manual `progess` call, ISource will may count it automatically by returned chunk
/// However, we want to report only rows in progress
progress(1, 0);
if (is_check_completed)
if (!check_result.success)
{
worker_result.get();
auto result_block = getBlockFromCheckResult(check_results, check_query_single_value_result);
check_results.clear();
return Chunk(result_block.getColumns(), result_block.rows());
LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"),
"Check query for table {} failed, path {}, reason: {}",
table->getStorageID().getNameForLogs(),
check_result.fs_path,
check_result.failure_message);
}
std::lock_guard lock(mutex);
progress(progress_rows, 0);
progress_rows = 0;
if (check_query_single_value_result || check_results.empty())
{
return Chunk();
}
auto result_block = getBlockFromCheckResult(check_results, check_query_single_value_result);
check_results.clear();
return Chunk(result_block.getColumns(), result_block.rows());
return getChunkFromCheckResult(check_result);
}
private:
void worker()
StoragePtr table;
IStorage::DataValidationTasksPtr check_data_tasks;
};
class TableCheckResultEmitter : public IAccumulatingTransform
{
public:
TableCheckResultEmitter() : IAccumulatingTransform(getHeaderForCheckResult(), getSingleValueBlock(1).cloneEmpty()) {}
String getName() const override { return "TableCheckResultEmitter"; }
void consume(Chunk chunk) override
{
table->checkData(query_ptr, context,
[this](const CheckResult & check_result, size_t new_total_rows)
if (result_value == 0)
return;
auto columns = chunk.getColumns();
if (columns.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong number of columns: {}", columns.size());
const auto * col = checkAndGetColumn<ColumnUInt8>(columns[1].get());
for (size_t i = 0; i < col->size(); ++i)
{
if (col->getElement(i) == 0)
{
if (isCancelled())
return false;
std::lock_guard lock(mutex);
if (new_total_rows > total_rows)
{
addTotalRowsApprox(new_total_rows - total_rows);
total_rows = new_total_rows;
}
progress_rows++;
if (!check_result.success)
{
LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"),
"Check query for table {} failed, path {}, reason: {}",
table->getStorageID().getNameForLogs(),
check_result.fs_path,
check_result.failure_message);
}
check_results.push_back(check_result);
bool should_continue = check_result.success || !check_query_single_value_result;
return should_continue;
});
result_value = 0;
return;
}
}
}
ASTPtr query_ptr;
StoragePtr table;
ContextPtr context;
bool check_query_single_value_result;
Chunk generate() override
{
if (is_valuer_emitted.exchange(true))
return {};
auto block = getSingleValueBlock(result_value);
return Chunk(block.getColumns(), block.rows());
}
std::future<void> worker_result;
std::mutex mutex;
CheckResults check_results;
size_t progress_rows = 0;
size_t total_rows = 0;
bool is_check_completed = false;
private:
std::atomic<UInt8> result_value{1};
std::atomic_bool is_valuer_emitted{false};
};
}
@ -154,7 +148,6 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextP
{
}
BlockIO InterpreterCheckQuery::execute()
{
const auto & check = query_ptr->as<ASTCheckQuery &>();
@ -164,11 +157,51 @@ BlockIO InterpreterCheckQuery::execute()
context->checkAccess(AccessType::SHOW_TABLES, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
auto check_data_tasks = table->getCheckTaskList(query_ptr, context);
const auto & settings = context->getSettingsRef();
BlockIO res;
{
bool check_query_single_value_result = context->getSettingsRef().check_query_single_value_result;
auto result_source = std::make_shared<TableCheckResultSource>(query_ptr, table, check_query_single_value_result, context);
res.pipeline = QueryPipeline(result_source);
auto processors = std::make_shared<Processors>();
std::vector<OutputPort *> worker_ports;
size_t num_streams = std::max<size_t>(1, settings.max_threads);
for (size_t i = 0; i < num_streams; ++i)
{
auto worker_processor = std::make_shared<TableCheckWorkerProcessor>(check_data_tasks, table);
if (i == 0)
worker_processor->addTotalRowsApprox(check_data_tasks->size());
worker_ports.emplace_back(&worker_processor->getPort());
processors->emplace_back(worker_processor);
}
OutputPort * resize_outport;
{
auto resize_processor = std::make_shared<ResizeProcessor>(getHeaderForCheckResult(), worker_ports.size(), 1);
auto & resize_inputs = resize_processor->getInputs();
auto resize_inport_it = resize_inputs.begin();
for (size_t i = 0; i < worker_ports.size(); ++i, ++resize_inport_it)
connect(*worker_ports[i], *resize_inport_it);
resize_outport = &resize_processor->getOutputs().front();
processors->emplace_back(resize_processor);
}
if (settings.check_query_single_value_result)
{
auto emitter_processor = std::make_shared<TableCheckResultEmitter>();
auto input_port = &emitter_processor->getInputPort();
processors->emplace_back(emitter_processor);
connect(*resize_outport, *input_port);
}
res.pipeline = QueryPipeline(Pipe(std::move(processors)));
res.pipeline.setNumThreads(num_streams);
}
return res;
}

View File

@ -22,11 +22,4 @@ struct CheckResult
{}
};
/// Process single result of checkData
/// Second argument is an estimated number of check results
/// Return true to continue checking, false to stop
using CheckDataCallback = std::function<bool(const CheckResult &, size_t)>;
using CheckResults = std::vector<CheckResult>;
}

View File

@ -273,12 +273,15 @@ bool IStorage::isStaticStorage() const
return false;
}
CheckResults IStorage::checkData(const ASTPtr & query, ContextPtr context)
IStorage::DataValidationTasksPtr IStorage::getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */)
{
CheckResults results;
auto callback = [&](const CheckResult & result, size_t) { results.push_back(result); return true;};
checkData(query, context, callback);
return results;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName());
}
CheckResult IStorage::checkDataNext(DataValidationTasksPtr & /* check_task_list */, bool & has_nothing_to_do)
{
has_nothing_to_do = true;
return {};
}
void IStorage::adjustCreateQueryForBackup(ASTPtr &) const

View File

@ -595,10 +595,45 @@ public:
/// Provides a hint that the storage engine may evaluate the IN-condition by using an index.
virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, ContextPtr /* query_context */, const StorageMetadataPtr & /* metadata_snapshot */) const { return false; }
/// Checks validity of the data
virtual CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */);
virtual void checkData(const ASTPtr & /* query */, ContextPtr /* context */, CheckDataCallback /* callback */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName()); }
/** A list of tasks to check a validity of data.
* Each IStorage implementation may interpret this task in its own way.
* E.g. for some storages it to check data it need to check a list of files in filesystem, for others it can be a list of parts.
* Also it may hold resources (e.g. locks) required during check.
*/
struct DataValidationTasksBase
{
/// Number of entries left to check.
/// It decreases after each call to checkDataNext().
virtual size_t size() const = 0;
virtual ~DataValidationTasksBase() = default;
};
using DataValidationTasksPtr = std::shared_ptr<DataValidationTasksBase>;
virtual DataValidationTasksPtr getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */);
/** Executes one task from the list.
* If no tasks left, sets has_nothing_to_do to true.
* Note: Function `checkDataNext` is accessing `check_task_list` thread-safely,
* and can be called simultaneously for the same `getCheckTaskList` result
* to process different tasks in parallel.
* Usage:
*
* auto check_task_list = storage.getCheckTaskList(query, context);
* size_t total_tasks = check_task_list->size();
* while (true)
* {
* size_t tasks_left = check_task_list->size();
* std::cout << "Checking data: " << (total_tasks - tasks_left) << " / " << total_tasks << " tasks done." << std::endl;
* bool has_nothing_to_do = false;
* auto result = storage.checkDataNext(check_task_list, has_nothing_to_do);
* if (has_nothing_to_do)
* break;
* doSomething(result);
* }
*/
virtual CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do);
/// Checks that table could be dropped right now
/// Otherwise - throws an exception with detailed information.

View File

@ -866,15 +866,18 @@ SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetada
return std::make_shared<LogSink>(*this, metadata_snapshot, std::move(lock));
}
void StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context, CheckDataCallback check_callback)
IStorage::DataValidationTasksPtr StorageLog::getCheckTaskList(const ASTPtr & /* query */, ContextPtr local_context)
{
ReadLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded");
file_checker.check(check_callback);
return std::make_unique<DataValidationTasks>(file_checker.getDataValidationTasks(), std::move(lock));
}
CheckResult StorageLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
{
return file_checker.checkNextEntry(static_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks, has_nothing_to_do);
}
IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
{

View File

@ -59,7 +59,8 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
@ -142,6 +143,19 @@ private:
std::atomic<UInt64> total_rows = 0;
std::atomic<UInt64> total_bytes = 0;
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
DataValidationTasks(FileChecker::DataValidationTasksPtr file_checker_tasks_, ReadLock && lock_)
: file_checker_tasks(std::move(file_checker_tasks_)), lock(std::move(lock_))
{}
size_t size() const override { return file_checker_tasks->size(); }
FileChecker::DataValidationTasksPtr file_checker_tasks;
/// Lock to prevent table modification while checking
ReadLock lock;
};
FileChecker file_checker;
const size_t max_compress_block_size;

View File

@ -2197,7 +2197,7 @@ void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type)
background_moves_assignee.trigger();
}
void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback)
IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList(const ASTPtr & query, ContextPtr local_context)
{
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
@ -2208,7 +2208,14 @@ void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context,
else
data_parts = getVisibleDataPartsVector(local_context);
for (auto & part : data_parts)
return std::make_unique<DataValidationTasks>(std::move(data_parts), local_context);
}
CheckResult StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
{
auto local_context = static_cast<DataValidationTasks *>(check_task_list.get())->context;
if (auto part = static_cast<DataValidationTasks *>(check_task_list.get())->next())
{
/// If the checksums file is not present, calculate the checksums and write them to disk.
static constexpr auto checksums_path = "checksums.txt";
@ -2223,16 +2230,12 @@ void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context,
part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings());
part->checkMetadata();
bool should_continue = check_callback(CheckResult(part->name, true, "Checksums recounted and written to disk."), data_parts.size());
if (!should_continue)
break;
return CheckResult(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
bool should_continue = check_callback(CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"), data_parts.size());
if (!should_continue)
break;
return CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
}
else
@ -2241,18 +2244,19 @@ void StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context,
{
checkDataPart(part, true);
part->checkMetadata();
bool should_continue = check_callback(CheckResult(part->name, true, ""), data_parts.size());
if (!should_continue)
break;
return CheckResult(part->name, true, "");
}
catch (const Exception & ex)
{
bool should_continue = check_callback(CheckResult(part->name, false, ex.message()), data_parts.size());
if (!should_continue)
break;
return CheckResult(part->name, false, ex.message());
}
}
}
else
{
has_nothing_to_do = true;
return {};
}
}

View File

@ -108,7 +108,8 @@ public:
void onActionLockRemove(StorageActionBlockType action_type) override;
void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
@ -278,6 +279,32 @@ private:
friend class MergePlainMergeTreeTask;
friend class MutatePlainMergeTreeTask;
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
DataValidationTasks(DataPartsVector && parts_, ContextPtr context_)
: parts(std::move(parts_)), it(parts.begin()), context(std::move(context_))
{}
DataPartPtr next()
{
std::lock_guard lock(mutex);
if (it == parts.end())
return nullptr;
return *(it++);
}
size_t size() const override
{
std::lock_guard lock(mutex);
return std::distance(it, parts.end());
}
mutable std::mutex mutex;
DataPartsVector parts;
DataPartsVector::const_iterator it;
ContextPtr context;
};
protected:
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;

View File

@ -149,7 +149,8 @@ public:
return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override { getNested()->checkData(query, context, check_callback); }
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override { return getNested()->getCheckTaskList(query, context); }
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override { return getNested()->checkDataNext(check_task_list, has_nothing_to_do); }
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }

View File

@ -8481,7 +8481,7 @@ void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, t
part_check_thread.enqueuePart(part_name, delay_to_check_seconds);
}
void StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr local_context, CheckDataCallback check_callback)
IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(const ASTPtr & query, ContextPtr local_context)
{
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
@ -8492,25 +8492,29 @@ void StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr loca
else
data_parts = getVisibleDataPartsVector(local_context);
{
auto part_check_lock = part_check_thread.pausePartsCheck();
auto part_check_lock = part_check_thread.pausePartsCheck();
return std::make_unique<DataValidationTasks>(std::move(data_parts), std::move(part_check_lock));
}
for (auto & part : data_parts)
CheckResult StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
{
if (auto part = static_cast<DataValidationTasks *>(check_task_list.get())->next())
{
try
{
try
{
bool should_continue = check_callback(part_check_thread.checkPartAndFix(part->name), data_parts.size());
if (!should_continue)
break;
}
catch (const Exception & ex)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
bool should_continue = check_callback(CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'"), data_parts.size());
if (!should_continue)
break;
}
return CheckResult(part_check_thread.checkPartAndFix(part->name));
}
catch (const Exception & ex)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
return CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
}
else
{
has_nothing_to_do = true;
return {};
}
}

View File

@ -230,7 +230,8 @@ public:
/// Add a part to the queue of parts whose data you want to check in the background thread.
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);
void checkData(const ASTPtr & query, ContextPtr context, CheckDataCallback check_callback) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
@ -990,6 +991,34 @@ private:
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
void startupImpl(bool from_attach_thread);
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
explicit DataValidationTasks(DataPartsVector && parts_, std::unique_lock<std::mutex> && parts_check_lock_)
: parts_check_lock(std::move(parts_check_lock_)), parts(std::move(parts_)), it(parts.begin())
{}
DataPartPtr next()
{
std::lock_guard lock(mutex);
if (it == parts.end())
return nullptr;
return *(it++);
}
size_t size() const override
{
std::lock_guard lock(mutex);
return std::distance(it, parts.end());
}
std::unique_lock<std::mutex> parts_check_lock;
mutable std::mutex mutex;
DataPartsVector parts;
DataPartsVector::const_iterator it;
};
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);

View File

@ -403,15 +403,18 @@ SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const Storage
return std::make_shared<StripeLogSink>(*this, metadata_snapshot, std::move(lock));
}
void StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context, CheckDataCallback check_callback)
IStorage::DataValidationTasksPtr StorageStripeLog::getCheckTaskList(const ASTPtr & /* query */, ContextPtr local_context)
{
ReadLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded");
file_checker.check(check_callback);
return std::make_unique<DataValidationTasks>(file_checker.getDataValidationTasks(), std::move(lock));
}
CheckResult StorageStripeLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
{
return file_checker.checkNextEntry(static_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks, has_nothing_to_do);
}
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{

View File

@ -53,7 +53,8 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void checkData(const ASTPtr & query, ContextPtr ocal_context, CheckDataCallback check_callback) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
@ -93,6 +94,20 @@ private:
const DiskPtr disk;
String table_path;
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
DataValidationTasks(FileChecker::DataValidationTasksPtr file_checker_tasks_, ReadLock && lock_)
: file_checker_tasks(std::move(file_checker_tasks_)), lock(std::move(lock_))
{}
size_t size() const override { return file_checker_tasks->size(); }
FileChecker::DataValidationTasksPtr file_checker_tasks;
/// Lock to prevent table modification while checking
ReadLock lock;
};
String data_file_path;
String index_file_path;
FileChecker file_checker;