IStorage::checkDataNext returns optional

This commit is contained in:
vdimir 2023-10-23 10:12:30 +00:00
parent 9d840c6532
commit 8f0d7954ff
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
14 changed files with 29 additions and 41 deletions

View File

@ -87,16 +87,13 @@ FileChecker::DataValidationTasksPtr FileChecker::getDataValidationTasks()
return std::make_unique<DataValidationTasks>(map);
}
CheckResult FileChecker::checkNextEntry(DataValidationTasksPtr & check_data_tasks, bool & has_nothing_to_do) const
std::optional<CheckResult> FileChecker::checkNextEntry(DataValidationTasksPtr & check_data_tasks) const
{
String name;
size_t expected_size;
bool is_finished = check_data_tasks->next(name, expected_size);
if (is_finished)
{
has_nothing_to_do = true;
return {};
}
String path = parentPath(files_info_path) + name;
bool exists = fileReallyExists(path);

View File

@ -33,7 +33,7 @@ public:
struct DataValidationTasks;
using DataValidationTasksPtr = std::unique_ptr<DataValidationTasks>;
DataValidationTasksPtr getDataValidationTasks();
CheckResult checkNextEntry(DataValidationTasksPtr & check_data_tasks, bool & has_nothing_to_do) const;
std::optional<CheckResult> checkNextEntry(DataValidationTasksPtr & check_data_tasks) const;
/// Truncate files that have excessive size to the expected size.
/// Throw exception if the file size is less than expected.

View File

@ -75,25 +75,24 @@ protected:
std::optional<Chunk> tryGenerate() override
{
bool has_nothing_to_do = false;
auto check_result = table->checkDataNext(check_data_tasks, has_nothing_to_do);
if (has_nothing_to_do)
auto check_result = table->checkDataNext(check_data_tasks);
if (!check_result)
return {};
/// We can omit manual `progess` call, ISource will may count it automatically by returned chunk
/// However, we want to report only rows in progress
progress(1, 0);
if (!check_result.success)
if (!check_result->success)
{
LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"),
"Check query for table {} failed, path {}, reason: {}",
table->getStorageID().getNameForLogs(),
check_result.fs_path,
check_result.failure_message);
check_result->fs_path,
check_result->failure_message);
}
return getChunkFromCheckResult(check_result);
return getChunkFromCheckResult(*check_result);
}
private:

View File

@ -281,9 +281,8 @@ IStorage::DataValidationTasksPtr IStorage::getCheckTaskList(const ASTPtr & /* qu
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName());
}
CheckResult IStorage::checkDataNext(DataValidationTasksPtr & /* check_task_list */, bool & has_nothing_to_do)
std::optional<CheckResult> IStorage::checkDataNext(DataValidationTasksPtr & /* check_task_list */)
{
has_nothing_to_do = true;
return {};
}

View File

@ -619,7 +619,7 @@ public:
virtual DataValidationTasksPtr getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */);
/** Executes one task from the list.
* If no tasks left, sets has_nothing_to_do to true.
* If no tasks left - returns nullopt.
* Note: Function `checkDataNext` is accessing `check_task_list` thread-safely,
* and can be called simultaneously for the same `getCheckTaskList` result
* to process different tasks in parallel.
@ -631,14 +631,13 @@ public:
* {
* size_t tasks_left = check_task_list->size();
* std::cout << "Checking data: " << (total_tasks - tasks_left) << " / " << total_tasks << " tasks done." << std::endl;
* bool has_nothing_to_do = false;
* auto result = storage.checkDataNext(check_task_list, has_nothing_to_do);
* if (has_nothing_to_do)
* auto result = storage.checkDataNext(check_task_list);
* if (!result)
* break;
* doSomething(result);
* doSomething(*result);
* }
*/
virtual CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do);
virtual std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list);
/// Checks that table could be dropped right now
/// Otherwise - throws an exception with detailed information.

View File

@ -882,9 +882,9 @@ IStorage::DataValidationTasksPtr StorageLog::getCheckTaskList(const ASTPtr & /*
return std::make_unique<DataValidationTasks>(file_checker.getDataValidationTasks(), std::move(lock));
}
CheckResult StorageLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
std::optional<CheckResult> StorageLog::checkDataNext(DataValidationTasksPtr & check_task_list)
{
return file_checker.checkNextEntry(assert_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks, has_nothing_to_do);
return file_checker.checkNextEntry(assert_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks);
}
IStorage::ColumnSizeByName StorageLog::getColumnSizes() const

View File

@ -60,7 +60,7 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;

View File

@ -2214,7 +2214,7 @@ IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList(const ASTPtr
return std::make_unique<DataValidationTasks>(std::move(data_parts), local_context);
}
CheckResult StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
std::optional<CheckResult> StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list)
{
auto * data_validation_tasks = assert_cast<DataValidationTasks *>(check_task_list.get());
auto local_context = data_validation_tasks->context;
@ -2259,11 +2259,8 @@ CheckResult StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_
}
}
}
else
{
has_nothing_to_do = true;
return {};
}
return {};
}

View File

@ -109,7 +109,7 @@ public:
void onActionLockRemove(StorageActionBlockType action_type) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;

View File

@ -150,7 +150,7 @@ public:
}
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override { return getNested()->getCheckTaskList(query, context); }
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override { return getNested()->checkDataNext(check_task_list, has_nothing_to_do); }
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override { return getNested()->checkDataNext(check_task_list); }
void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override { getNested()->checkTableCanBeDropped(query_context); }

View File

@ -8577,7 +8577,7 @@ IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(co
return std::make_unique<DataValidationTasks>(std::move(data_parts), std::move(part_check_lock));
}
CheckResult StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
std::optional<CheckResult> StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list)
{
if (auto part = assert_cast<DataValidationTasks *>(check_task_list.get())->next())
@ -8592,11 +8592,8 @@ CheckResult StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & c
return CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
}
else
{
has_nothing_to_do = true;
return {};
}
return {};
}

View File

@ -231,7 +231,7 @@ public:
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;

View File

@ -411,9 +411,9 @@ IStorage::DataValidationTasksPtr StorageStripeLog::getCheckTaskList(const ASTPtr
return std::make_unique<DataValidationTasks>(file_checker.getDataValidationTasks(), std::move(lock));
}
CheckResult StorageStripeLog::checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do)
std::optional<CheckResult> StorageStripeLog::checkDataNext(DataValidationTasksPtr & check_task_list)
{
return file_checker.checkNextEntry(assert_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks, has_nothing_to_do);
return file_checker.checkNextEntry(assert_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks);
}
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)

View File

@ -54,7 +54,7 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
CheckResult checkDataNext(DataValidationTasksPtr & check_task_list, bool & has_nothing_to_do) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }