Merge branch 'master' of github.com:ClickHouse/ClickHouse into fp16

This commit is contained in:
Alexey Milovidov 2024-11-13 22:11:31 +01:00
commit 78083130f1
27 changed files with 950 additions and 655 deletions

View File

@ -42,7 +42,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
@ -53,6 +52,7 @@ Upcoming meetups
Recently completed meetups
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1

View File

@ -14,12 +14,12 @@ namespace ErrorCodes
BackupConcurrencyCheck::BackupConcurrencyCheck(
const UUID & backup_or_restore_uuid_,
bool is_restore_,
bool on_cluster_,
const String & zookeeper_path_,
bool allow_concurrency_,
BackupConcurrencyCounters & counters_)
: is_restore(is_restore_), backup_or_restore_uuid(backup_or_restore_uuid_), on_cluster(on_cluster_), counters(counters_)
: is_restore(is_restore_), on_cluster(on_cluster_), zookeeper_path(zookeeper_path_), counters(counters_)
{
std::lock_guard lock{counters.mutex};
@ -32,7 +32,7 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
size_t num_on_cluster_restores = counters.on_cluster_restores.size();
if (on_cluster)
{
if (!counters.on_cluster_restores.contains(backup_or_restore_uuid))
if (!counters.on_cluster_restores.contains(zookeeper_path))
++num_on_cluster_restores;
}
else
@ -47,7 +47,7 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
size_t num_on_cluster_backups = counters.on_cluster_backups.size();
if (on_cluster)
{
if (!counters.on_cluster_backups.contains(backup_or_restore_uuid))
if (!counters.on_cluster_backups.contains(zookeeper_path))
++num_on_cluster_backups;
}
else
@ -64,9 +64,9 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
if (on_cluster)
{
if (is_restore)
++counters.on_cluster_restores[backup_or_restore_uuid];
++counters.on_cluster_restores[zookeeper_path];
else
++counters.on_cluster_backups[backup_or_restore_uuid];
++counters.on_cluster_backups[zookeeper_path];
}
else
{
@ -86,7 +86,7 @@ BackupConcurrencyCheck::~BackupConcurrencyCheck()
{
if (is_restore)
{
auto it = counters.on_cluster_restores.find(backup_or_restore_uuid);
auto it = counters.on_cluster_restores.find(zookeeper_path);
if (it != counters.on_cluster_restores.end())
{
if (!--it->second)
@ -95,7 +95,7 @@ BackupConcurrencyCheck::~BackupConcurrencyCheck()
}
else
{
auto it = counters.on_cluster_backups.find(backup_or_restore_uuid);
auto it = counters.on_cluster_backups.find(zookeeper_path);
if (it != counters.on_cluster_backups.end())
{
if (!--it->second)

View File

@ -1,7 +1,8 @@
#pragma once
#include <Core/UUID.h>
#include <base/defines.h>
#include <base/scope_guard.h>
#include <base/types.h>
#include <mutex>
#include <unordered_map>
@ -19,9 +20,9 @@ public:
/// Checks concurrency of a BACKUP operation or a RESTORE operation.
/// Keep a constructed instance of BackupConcurrencyCheck until the operation is done.
BackupConcurrencyCheck(
const UUID & backup_or_restore_uuid_,
bool is_restore_,
bool on_cluster_,
const String & zookeeper_path_,
bool allow_concurrency_,
BackupConcurrencyCounters & counters_);
@ -31,8 +32,8 @@ public:
private:
const bool is_restore;
const UUID backup_or_restore_uuid;
const bool on_cluster;
const String zookeeper_path;
BackupConcurrencyCounters & counters;
};
@ -47,8 +48,8 @@ private:
friend class BackupConcurrencyCheck;
size_t local_backups TSA_GUARDED_BY(mutex) = 0;
size_t local_restores TSA_GUARDED_BY(mutex) = 0;
std::unordered_map<UUID /* backup_uuid */, size_t /* num_refs */> on_cluster_backups TSA_GUARDED_BY(mutex);
std::unordered_map<UUID /* restore_uuid */, size_t /* num_refs */> on_cluster_restores TSA_GUARDED_BY(mutex);
std::unordered_map<String /* zookeeper_path */, size_t /* num_refs */> on_cluster_backups TSA_GUARDED_BY(mutex);
std::unordered_map<String /* zookeeper_path */, size_t /* num_refs */> on_cluster_restores TSA_GUARDED_BY(mutex);
std::mutex mutex;
};

View File

@ -4,31 +4,29 @@
namespace DB
{
BackupCoordinationCleaner::BackupCoordinationCleaner(const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_)
: zookeeper_path(zookeeper_path_), with_retries(with_retries_), log(log_)
BackupCoordinationCleaner::BackupCoordinationCleaner(bool is_restore_, const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_)
: is_restore(is_restore_), zookeeper_path(zookeeper_path_), with_retries(with_retries_), log(log_)
{
}
void BackupCoordinationCleaner::cleanup()
bool BackupCoordinationCleaner::cleanup(bool throw_if_error)
{
tryRemoveAllNodes(/* throw_if_error = */ true, /* retries_kind = */ WithRetries::kNormal);
WithRetries::Kind retries_kind = throw_if_error ? WithRetries::kNormal : WithRetries::kErrorHandling;
return cleanupImpl(throw_if_error, retries_kind);
}
bool BackupCoordinationCleaner::tryCleanupAfterError() noexcept
{
return tryRemoveAllNodes(/* throw_if_error = */ false, /* retries_kind = */ WithRetries::kNormal);
}
bool BackupCoordinationCleaner::tryRemoveAllNodes(bool throw_if_error, WithRetries::Kind retries_kind)
bool BackupCoordinationCleaner::cleanupImpl(bool throw_if_error, WithRetries::Kind retries_kind)
{
{
std::lock_guard lock{mutex};
if (cleanup_result.succeeded)
return true;
if (cleanup_result.exception)
if (succeeded)
{
if (throw_if_error)
std::rethrow_exception(cleanup_result.exception);
LOG_TRACE(log, "Nodes from ZooKeeper are already removed");
return true;
}
if (tried)
{
LOG_INFO(log, "Skipped removing nodes from ZooKeeper because because earlier we failed to do that");
return false;
}
}
@ -44,16 +42,18 @@ bool BackupCoordinationCleaner::tryRemoveAllNodes(bool throw_if_error, WithRetri
});
std::lock_guard lock{mutex};
cleanup_result.succeeded = true;
tried = true;
succeeded = true;
return true;
}
catch (...)
{
LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this restore: {}",
LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this {}: {}",
is_restore ? "restore" : "backup",
getCurrentExceptionMessage(/* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true));
std::lock_guard lock{mutex};
cleanup_result.exception = std::current_exception();
tried = true;
if (throw_if_error)
throw;

View File

@ -12,14 +12,14 @@ namespace DB
class BackupCoordinationCleaner
{
public:
BackupCoordinationCleaner(const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_);
BackupCoordinationCleaner(bool is_restore_, const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_);
void cleanup();
bool tryCleanupAfterError() noexcept;
bool cleanup(bool throw_if_error);
private:
bool tryRemoveAllNodes(bool throw_if_error, WithRetries::Kind retries_kind);
bool cleanupImpl(bool throw_if_error, WithRetries::Kind retries_kind);
const bool is_restore;
const String zookeeper_path;
/// A reference to a field of the parent object which is either BackupCoordinationOnCluster or RestoreCoordinationOnCluster.
@ -27,13 +27,8 @@ private:
const LoggerPtr log;
struct CleanupResult
{
bool succeeded = false;
std::exception_ptr exception;
};
CleanupResult cleanup_result TSA_GUARDED_BY(mutex);
bool tried TSA_GUARDED_BY(mutex) = false;
bool succeeded TSA_GUARDED_BY(mutex) = false;
std::mutex mutex;
};

View File

@ -11,12 +11,11 @@ namespace DB
{
BackupCoordinationLocal::BackupCoordinationLocal(
const UUID & backup_uuid_,
bool is_plain_backup_,
bool allow_concurrent_backup_,
BackupConcurrencyCounters & concurrency_counters_)
: log(getLogger("BackupCoordinationLocal"))
, concurrency_check(backup_uuid_, /* is_restore = */ false, /* on_cluster = */ false, allow_concurrent_backup_, concurrency_counters_)
, concurrency_check(/* is_restore = */ false, /* on_cluster = */ false, /* zookeeper_path = */ "", allow_concurrent_backup_, concurrency_counters_)
, file_infos(is_plain_backup_)
{
}

View File

@ -23,20 +23,19 @@ class BackupCoordinationLocal : public IBackupCoordination
{
public:
explicit BackupCoordinationLocal(
const UUID & backup_uuid_,
bool is_plain_backup_,
bool allow_concurrent_backup_,
BackupConcurrencyCounters & concurrency_counters_);
~BackupCoordinationLocal() override;
void setBackupQueryIsSentToOtherHosts() override {}
bool isBackupQuerySentToOtherHosts() const override { return false; }
Strings setStage(const String &, const String &, bool) override { return {}; }
void setBackupQueryWasSentToOtherHosts() override {}
bool trySetError(std::exception_ptr) override { return true; }
void finish() override {}
bool tryFinishAfterError() noexcept override { return true; }
void waitForOtherHostsToFinish() override {}
bool tryWaitForOtherHostsToFinishAfterError() noexcept override { return true; }
bool setError(std::exception_ptr, bool) override { return true; }
bool waitOtherHostsFinish(bool) const override { return true; }
bool finish(bool) override { return true; }
bool cleanup(bool) override { return true; }
void addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;

View File

@ -184,17 +184,21 @@ BackupCoordinationOnCluster::BackupCoordinationOnCluster(
, plain_backup(is_plain_backup_)
, log(getLogger("BackupCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, concurrency_check(backup_uuid_, /* is_restore = */ false, /* on_cluster = */ true, allow_concurrent_backup_, concurrency_counters_)
, stage_sync(/* is_restore = */ false, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_backup_, with_retries, schedule_, process_list_element_, log)
, cleaner(zookeeper_path, with_retries, log)
, cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log)
, stage_sync(/* is_restore = */ false, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_backup_, concurrency_counters_, with_retries, schedule_, process_list_element_, log)
{
createRootNodes();
try
{
createRootNodes();
}
catch (...)
{
stage_sync.setError(std::current_exception(), /* throw_if_error = */ false);
throw;
}
}
BackupCoordinationOnCluster::~BackupCoordinationOnCluster()
{
tryFinishImpl();
}
BackupCoordinationOnCluster::~BackupCoordinationOnCluster() = default;
void BackupCoordinationOnCluster::createRootNodes()
{
@ -217,69 +221,52 @@ void BackupCoordinationOnCluster::createRootNodes()
});
}
void BackupCoordinationOnCluster::setBackupQueryIsSentToOtherHosts()
{
stage_sync.setQueryIsSentToOtherHosts();
}
bool BackupCoordinationOnCluster::isBackupQuerySentToOtherHosts() const
{
return stage_sync.isQuerySentToOtherHosts();
}
Strings BackupCoordinationOnCluster::setStage(const String & new_stage, const String & message, bool sync)
{
stage_sync.setStage(new_stage, message);
if (!sync)
return {};
return stage_sync.waitForHostsToReachStage(new_stage, all_hosts_without_initiator);
if (sync)
return stage_sync.waitHostsReachStage(all_hosts_without_initiator, new_stage);
return {};
}
void BackupCoordinationOnCluster::setBackupQueryWasSentToOtherHosts()
bool BackupCoordinationOnCluster::setError(std::exception_ptr exception, bool throw_if_error)
{
backup_query_was_sent_to_other_hosts = true;
return stage_sync.setError(exception, throw_if_error);
}
bool BackupCoordinationOnCluster::trySetError(std::exception_ptr exception)
bool BackupCoordinationOnCluster::waitOtherHostsFinish(bool throw_if_error) const
{
return stage_sync.trySetError(exception);
return stage_sync.waitOtherHostsFinish(throw_if_error);
}
void BackupCoordinationOnCluster::finish()
bool BackupCoordinationOnCluster::finish(bool throw_if_error)
{
bool other_hosts_also_finished = false;
stage_sync.finish(other_hosts_also_finished);
if ((current_host == kInitiator) && (other_hosts_also_finished || !backup_query_was_sent_to_other_hosts))
cleaner.cleanup();
return stage_sync.finish(throw_if_error);
}
bool BackupCoordinationOnCluster::tryFinishAfterError() noexcept
bool BackupCoordinationOnCluster::cleanup(bool throw_if_error)
{
return tryFinishImpl();
}
bool BackupCoordinationOnCluster::tryFinishImpl() noexcept
{
bool other_hosts_also_finished = false;
if (!stage_sync.tryFinishAfterError(other_hosts_also_finished))
return false;
if ((current_host == kInitiator) && (other_hosts_also_finished || !backup_query_was_sent_to_other_hosts))
/// All the hosts must finish before we remove the coordination nodes.
bool expect_other_hosts_finished = stage_sync.isQuerySentToOtherHosts() || !stage_sync.isErrorSet();
bool all_hosts_finished = stage_sync.finished() && (stage_sync.otherHostsFinished() || !expect_other_hosts_finished);
if (!all_hosts_finished)
{
if (!cleaner.tryCleanupAfterError())
return false;
}
return true;
}
void BackupCoordinationOnCluster::waitForOtherHostsToFinish()
{
if ((current_host != kInitiator) || !backup_query_was_sent_to_other_hosts)
return;
stage_sync.waitForOtherHostsToFinish();
}
bool BackupCoordinationOnCluster::tryWaitForOtherHostsToFinishAfterError() noexcept
{
if (current_host != kInitiator)
auto unfinished_hosts = expect_other_hosts_finished ? stage_sync.getUnfinishedHosts() : Strings{current_host};
LOG_INFO(log, "Skipping removing nodes from ZooKeeper because hosts {} didn't finish",
BackupCoordinationStageSync::getHostsDesc(unfinished_hosts));
return false;
if (!backup_query_was_sent_to_other_hosts)
return true;
return stage_sync.tryWaitForOtherHostsToFinishAfterError();
}
return cleaner.cleanup(throw_if_error);
}
ZooKeeperRetriesInfo BackupCoordinationOnCluster::getOnClusterInitializationKeeperRetriesInfo() const

View File

@ -1,7 +1,6 @@
#pragma once
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupConcurrencyCheck.h>
#include <Backups/BackupCoordinationCleaner.h>
#include <Backups/BackupCoordinationFileInfos.h>
#include <Backups/BackupCoordinationReplicatedAccess.h>
@ -20,7 +19,7 @@ class BackupCoordinationOnCluster : public IBackupCoordination
{
public:
/// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER query.
static const constexpr std::string_view kInitiator;
static const constexpr std::string_view kInitiator = BackupCoordinationStageSync::kInitiator;
BackupCoordinationOnCluster(
const UUID & backup_uuid_,
@ -37,13 +36,13 @@ public:
~BackupCoordinationOnCluster() override;
void setBackupQueryIsSentToOtherHosts() override;
bool isBackupQuerySentToOtherHosts() const override;
Strings setStage(const String & new_stage, const String & message, bool sync) override;
void setBackupQueryWasSentToOtherHosts() override;
bool trySetError(std::exception_ptr exception) override;
void finish() override;
bool tryFinishAfterError() noexcept override;
void waitForOtherHostsToFinish() override;
bool tryWaitForOtherHostsToFinishAfterError() noexcept override;
bool setError(std::exception_ptr exception, bool throw_if_error) override;
bool waitOtherHostsFinish(bool throw_if_error) const override;
bool finish(bool throw_if_error) override;
bool cleanup(bool throw_if_error) override;
void addReplicatedPartNames(
const String & table_zk_path,
@ -110,11 +109,10 @@ private:
const bool plain_backup;
LoggerPtr const log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;
BackupConcurrencyCheck concurrency_check;
BackupCoordinationStageSync stage_sync;
BackupCoordinationCleaner cleaner;
std::atomic<bool> backup_query_was_sent_to_other_hosts = false;
BackupCoordinationStageSync stage_sync;
mutable std::optional<BackupCoordinationReplicatedTables> replicated_tables TSA_GUARDED_BY(replicated_tables_mutex);
mutable std::optional<BackupCoordinationReplicatedAccess> replicated_access TSA_GUARDED_BY(replicated_access_mutex);

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,9 @@
#pragma once
#include <Backups/BackupConcurrencyCheck.h>
#include <Backups/WithRetries.h>
namespace DB
{
@ -9,12 +11,16 @@ namespace DB
class BackupCoordinationStageSync
{
public:
/// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER or RESTORE ON CLUSTER query.
static const constexpr std::string_view kInitiator;
BackupCoordinationStageSync(
bool is_restore_, /// true if this is a RESTORE ON CLUSTER command, false if this is a BACKUP ON CLUSTER command
const String & zookeeper_path_, /// path to the "stage" folder in ZooKeeper
const String & current_host_, /// the current host, or an empty string if it's the initiator of the BACKUP/RESTORE ON CLUSTER command
const Strings & all_hosts_, /// all the hosts (including the initiator and the current host) performing the BACKUP/RESTORE ON CLUSTER command
bool allow_concurrency_, /// whether it's allowed to have concurrent backups or restores.
BackupConcurrencyCounters & concurrency_counters_,
const WithRetries & with_retries_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
QueryStatusPtr process_list_element_,
@ -22,30 +28,37 @@ public:
~BackupCoordinationStageSync();
/// Sets that the BACKUP or RESTORE query was sent to other hosts.
void setQueryIsSentToOtherHosts();
bool isQuerySentToOtherHosts() const;
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
void setStage(const String & stage, const String & stage_result = {});
/// Waits until all the specified hosts come to the specified stage.
/// The function returns the results which specified hosts set when they came to the required stage.
/// If it doesn't happen before the timeout then the function will stop waiting and throw an exception.
Strings waitForHostsToReachStage(const String & stage_to_wait, const Strings & hosts, std::optional<std::chrono::milliseconds> timeout = {}) const;
/// Waits until all the other hosts finish their work.
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
void waitForOtherHostsToFinish() const;
/// Lets other host know that the current host has finished its work.
void finish(bool & other_hosts_also_finished);
/// Waits until specified hosts come to the specified stage.
/// The function returns the results which the specified hosts set when they came to the required stage.
Strings waitHostsReachStage(const Strings & hosts, const String & stage_to_wait) const;
/// Lets other hosts know that the current host has encountered an error.
bool trySetError(std::exception_ptr exception) noexcept;
/// The function returns true if it successfully created the error node or if the error node was found already exist.
bool setError(std::exception_ptr exception, bool throw_if_error);
bool isErrorSet() const;
/// Waits until all the other hosts finish their work (as a part of error-handling process).
/// Doesn't stops waiting if some host encounters an error or gets cancelled.
bool tryWaitForOtherHostsToFinishAfterError() const noexcept;
/// Waits until the hosts other than the current host finish their work. Must be called before finish().
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
bool waitOtherHostsFinish(bool throw_if_error) const;
bool otherHostsFinished() const;
/// Lets other host know that the current host has finished its work (as a part of error-handling process).
bool tryFinishAfterError(bool & other_hosts_also_finished) noexcept;
/// Lets other hosts know that the current host has finished its work.
bool finish(bool throw_if_error);
bool finished() const;
/// Returns true if all the hosts have finished.
bool allHostsFinished() const { return finished() && otherHostsFinished(); }
/// Returns a list of the hosts which haven't finished yet.
Strings getUnfinishedHosts() const;
Strings getUnfinishedOtherHosts() const;
/// Returns a printable name of a specific host. For empty host the function returns "initiator".
static String getHostDesc(const String & host);
@ -78,14 +91,17 @@ private:
/// Reads the current state from ZooKeeper without throwing exceptions.
void readCurrentState(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
/// Creates a stage node to let other hosts know we've reached the specified stage.
void createStageNode(const String & stage, const String & stage_result, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
String getStageNodePath(const String & stage) const;
/// Lets other hosts know that the current host has encountered an error.
bool trySetError(const Exception & exception);
void setError(const Exception & exception);
bool setError(const Exception & exception, bool throw_if_error);
void createErrorNode(const Exception & exception, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
/// Deserializes an error stored in the error node.
static std::pair<std::exception_ptr, String> parseErrorNode(const String & error_node_contents);
std::pair<std::exception_ptr, String> parseErrorNode(const String & error_node_contents) const;
/// Reset the `connected` flag for each host.
void resetConnectedFlag();
@ -102,19 +118,27 @@ private:
void cancelQueryIfDisconnectedTooLong();
/// Used by waitForHostsToReachStage() to check if everything is ready to return.
bool checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, bool time_is_out, std::optional<std::chrono::milliseconds> timeout, Strings & results) const TSA_REQUIRES(mutex);
bool checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, Strings & results) const TSA_REQUIRES(mutex);
/// Creates the 'finish' node.
bool tryFinishImpl();
bool tryFinishImpl(bool & other_hosts_also_finished, bool throw_if_error, WithRetries::Kind retries_kind);
void createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
bool finishImpl(bool throw_if_error, WithRetries::Kind retries_kind);
void createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper, bool throw_if_error);
/// Returns the version used by the initiator.
int getInitiatorVersion() const;
/// Waits until all the other hosts finish their work.
bool tryWaitForOtherHostsToFinishImpl(const String & reason, bool throw_if_error, std::optional<std::chrono::seconds> timeout) const;
bool checkIfOtherHostsFinish(const String & reason, bool throw_if_error, bool time_is_out, std::optional<std::chrono::milliseconds> timeout) const TSA_REQUIRES(mutex);
bool waitOtherHostsFinishImpl(const String & reason, std::optional<std::chrono::seconds> timeout, bool throw_if_error) const;
bool checkIfOtherHostsFinish(const String & reason, std::optional<std::chrono::milliseconds> timeout, bool time_is_out, bool & result, bool throw_if_error) const TSA_REQUIRES(mutex);
/// Returns true if all the hosts have finished.
bool allHostsFinishedNoLock() const TSA_REQUIRES(mutex);
bool finishedNoLock() const TSA_REQUIRES(mutex);
bool otherHostsFinishedNoLock() const TSA_REQUIRES(mutex);
/// Returns a list of the hosts which haven't finished yet.
Strings getUnfinishedHostsNoLock() const TSA_REQUIRES(mutex);
Strings getUnfinishedOtherHostsNoLock() const TSA_REQUIRES(mutex);
const bool is_restore;
const String operation_name;
@ -138,15 +162,16 @@ private:
/// Paths in ZooKeeper.
const std::filesystem::path zookeeper_path;
const String root_zookeeper_path;
const String operation_node_path;
const String operation_zookeeper_path;
const String operation_node_name;
const String stage_node_path;
const String start_node_path;
const String finish_node_path;
const String num_hosts_node_path;
const String error_node_path;
const String alive_node_path;
const String alive_tracker_node_path;
const String error_node_path;
std::optional<BackupConcurrencyCheck> concurrency_check;
std::shared_ptr<Poco::Event> zk_nodes_changed;
@ -176,25 +201,21 @@ private:
{
std::map<String /* host */, HostInfo> hosts; /// std::map because we need to compare states
std::optional<String> host_with_error;
bool cancelled = false;
bool operator ==(const State & other) const;
bool operator !=(const State & other) const;
void merge(const State & other);
};
State state TSA_GUARDED_BY(mutex);
mutable std::condition_variable state_changed;
std::future<void> watching_thread_future;
std::atomic<bool> should_stop_watching_thread = false;
bool should_stop_watching_thread TSA_GUARDED_BY(mutex) = false;
struct FinishResult
{
bool succeeded = false;
std::exception_ptr exception;
bool other_hosts_also_finished = false;
};
FinishResult finish_result TSA_GUARDED_BY(mutex);
bool query_is_sent_to_other_hosts TSA_GUARDED_BY(mutex) = false;
bool tried_to_finish TSA_GUARDED_BY(mutex) = false;
bool tried_to_set_error TSA_GUARDED_BY(mutex) = false;
mutable std::mutex mutex;
};

View File

@ -329,6 +329,7 @@ std::pair<OperationID, BackupStatus> BackupsWorker::start(const ASTPtr & backup_
struct BackupsWorker::BackupStarter
{
BackupsWorker & backups_worker;
LoggerPtr log;
std::shared_ptr<ASTBackupQuery> backup_query;
ContextPtr query_context; /// We have to keep `query_context` until the end of the operation because a pointer to it is stored inside the ThreadGroup we're using.
ContextMutablePtr backup_context;
@ -345,6 +346,7 @@ struct BackupsWorker::BackupStarter
BackupStarter(BackupsWorker & backups_worker_, const ASTPtr & query_, const ContextPtr & context_)
: backups_worker(backups_worker_)
, log(backups_worker.log)
, backup_query(std::static_pointer_cast<ASTBackupQuery>(query_->clone()))
, query_context(context_)
, backup_context(Context::createCopy(query_context))
@ -399,9 +401,20 @@ struct BackupsWorker::BackupStarter
chassert(!backup);
backup = backups_worker.openBackupForWriting(backup_info, backup_settings, backup_coordination, backup_context);
backups_worker.doBackup(
backup, backup_query, backup_id, backup_name_for_logging, backup_settings, backup_coordination, backup_context,
on_cluster, cluster);
backups_worker.doBackup(backup, backup_query, backup_id, backup_settings, backup_coordination, backup_context,
on_cluster, cluster);
backup_coordination->finish(/* throw_if_error = */ true);
backup.reset();
/// The backup coordination is not needed anymore.
if (!is_internal_backup)
backup_coordination->cleanup(/* throw_if_error = */ true);
backup_coordination.reset();
/// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
LOG_INFO(log, "{} {} was created successfully", (is_internal_backup ? "Internal backup" : "Backup"), backup_name_for_logging);
backups_worker.setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
void onException()
@ -416,16 +429,29 @@ struct BackupsWorker::BackupStarter
if (backup && !backup->setIsCorrupted())
should_remove_files_in_backup = false;
if (backup_coordination && backup_coordination->trySetError(std::current_exception()))
bool all_hosts_finished = false;
if (backup_coordination && backup_coordination->setError(std::current_exception(), /* throw_if_error = */ false))
{
bool other_hosts_finished = backup_coordination->tryWaitForOtherHostsToFinishAfterError();
bool other_hosts_finished = !is_internal_backup
&& (!backup_coordination->isBackupQuerySentToOtherHosts() || backup_coordination->waitOtherHostsFinish(/* throw_if_error = */ false));
if (should_remove_files_in_backup && other_hosts_finished)
backup->tryRemoveAllFiles();
backup_coordination->tryFinishAfterError();
all_hosts_finished = backup_coordination->finish(/* throw_if_error = */ false) && other_hosts_finished;
}
if (!all_hosts_finished)
should_remove_files_in_backup = false;
if (backup && should_remove_files_in_backup)
backup->tryRemoveAllFiles();
backup.reset();
if (backup_coordination && all_hosts_finished)
backup_coordination->cleanup(/* throw_if_error = */ false);
backup_coordination.reset();
backups_worker.setStatusSafe(backup_id, getBackupStatusFromCurrentException());
}
};
@ -497,7 +523,6 @@ void BackupsWorker::doBackup(
BackupMutablePtr backup,
const std::shared_ptr<ASTBackupQuery> & backup_query,
const OperationID & backup_id,
const String & backup_name_for_logging,
const BackupSettings & backup_settings,
std::shared_ptr<IBackupCoordination> backup_coordination,
ContextMutablePtr context,
@ -521,10 +546,10 @@ void BackupsWorker::doBackup(
backup_settings.copySettingsToQuery(*backup_query);
sendQueryToOtherHosts(*backup_query, cluster, backup_settings.shard_num, backup_settings.replica_num,
context, required_access, backup_coordination->getOnClusterInitializationKeeperRetriesInfo());
backup_coordination->setBackupQueryWasSentToOtherHosts();
backup_coordination->setBackupQueryIsSentToOtherHosts();
/// Wait until all the hosts have written their backup entries.
backup_coordination->waitForOtherHostsToFinish();
backup_coordination->waitOtherHostsFinish(/* throw_if_error = */ true);
}
else
{
@ -569,18 +594,8 @@ void BackupsWorker::doBackup(
compressed_size = backup->getCompressedSize();
}
/// Close the backup.
backup.reset();
/// The backup coordination is not needed anymore.
backup_coordination->finish();
/// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there.
setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0);
/// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
LOG_INFO(log, "{} {} was created successfully", (is_internal_backup ? "Internal backup" : "Backup"), backup_name_for_logging);
setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
@ -687,6 +702,7 @@ void BackupsWorker::writeBackupEntries(
struct BackupsWorker::RestoreStarter
{
BackupsWorker & backups_worker;
LoggerPtr log;
std::shared_ptr<ASTBackupQuery> restore_query;
ContextPtr query_context; /// We have to keep `query_context` until the end of the operation because a pointer to it is stored inside the ThreadGroup we're using.
ContextMutablePtr restore_context;
@ -702,6 +718,7 @@ struct BackupsWorker::RestoreStarter
RestoreStarter(BackupsWorker & backups_worker_, const ASTPtr & query_, const ContextPtr & context_)
: backups_worker(backups_worker_)
, log(backups_worker.log)
, restore_query(std::static_pointer_cast<ASTBackupQuery>(query_->clone()))
, query_context(context_)
, restore_context(Context::createCopy(query_context))
@ -753,16 +770,17 @@ struct BackupsWorker::RestoreStarter
}
restore_coordination = backups_worker.makeRestoreCoordination(on_cluster, restore_settings, restore_context);
backups_worker.doRestore(
restore_query,
restore_id,
backup_name_for_logging,
backup_info,
restore_settings,
restore_coordination,
restore_context,
on_cluster,
cluster);
backups_worker.doRestore(restore_query, restore_id, backup_info, restore_settings, restore_coordination, restore_context,
on_cluster, cluster);
/// The restore coordination is not needed anymore.
restore_coordination->finish(/* throw_if_error = */ true);
if (!is_internal_restore)
restore_coordination->cleanup(/* throw_if_error = */ true);
restore_coordination.reset();
LOG_INFO(log, "Restored from {} {} successfully", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging);
backups_worker.setStatus(restore_id, BackupStatus::RESTORED);
}
void onException()
@ -770,12 +788,16 @@ struct BackupsWorker::RestoreStarter
/// Something bad happened, some data were not restored.
tryLogCurrentException(backups_worker.log, fmt::format("Failed to restore from {} {}", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging));
if (restore_coordination && restore_coordination->trySetError(std::current_exception()))
if (restore_coordination && restore_coordination->setError(std::current_exception(), /* throw_if_error = */ false))
{
restore_coordination->tryWaitForOtherHostsToFinishAfterError();
restore_coordination->tryFinishAfterError();
bool other_hosts_finished = !is_internal_restore
&& (!restore_coordination->isRestoreQuerySentToOtherHosts() || restore_coordination->waitOtherHostsFinish(/* throw_if_error = */ false));
if (restore_coordination->finish(/* throw_if_error = */ false) && other_hosts_finished)
restore_coordination->cleanup(/* throw_if_error = */ false);
}
restore_coordination.reset();
backups_worker.setStatusSafe(restore_id, getRestoreStatusFromCurrentException());
}
};
@ -838,7 +860,6 @@ BackupPtr BackupsWorker::openBackupForReading(const BackupInfo & backup_info, co
void BackupsWorker::doRestore(
const std::shared_ptr<ASTBackupQuery> & restore_query,
const OperationID & restore_id,
const String & backup_name_for_logging,
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr<IRestoreCoordination> restore_coordination,
@ -882,10 +903,10 @@ void BackupsWorker::doRestore(
restore_settings.copySettingsToQuery(*restore_query);
sendQueryToOtherHosts(*restore_query, cluster, restore_settings.shard_num, restore_settings.replica_num,
context, {}, restore_coordination->getOnClusterInitializationKeeperRetriesInfo());
restore_coordination->setRestoreQueryWasSentToOtherHosts();
restore_coordination->setRestoreQueryIsSentToOtherHosts();
/// Wait until all the hosts have done with their restoring work.
restore_coordination->waitForOtherHostsToFinish();
restore_coordination->waitOtherHostsFinish(/* throw_if_error = */ true);
}
else
{
@ -905,12 +926,6 @@ void BackupsWorker::doRestore(
backup, context, getThreadPool(ThreadPoolId::RESTORE), after_task_callback};
restorer.run(RestorerFromBackup::RESTORE);
}
/// The restore coordination is not needed anymore.
restore_coordination->finish();
LOG_INFO(log, "Restored from {} {} successfully", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging);
setStatus(restore_id, BackupStatus::RESTORED);
}
@ -943,7 +958,7 @@ BackupsWorker::makeBackupCoordination(bool on_cluster, const BackupSettings & ba
if (!on_cluster)
{
return std::make_shared<BackupCoordinationLocal>(
*backup_settings.backup_uuid, !backup_settings.deduplicate_files, allow_concurrent_backups, *concurrency_counters);
!backup_settings.deduplicate_files, allow_concurrent_backups, *concurrency_counters);
}
bool is_internal_backup = backup_settings.internal;
@ -981,8 +996,7 @@ BackupsWorker::makeRestoreCoordination(bool on_cluster, const RestoreSettings &
{
if (!on_cluster)
{
return std::make_shared<RestoreCoordinationLocal>(
*restore_settings.restore_uuid, allow_concurrent_restores, *concurrency_counters);
return std::make_shared<RestoreCoordinationLocal>(allow_concurrent_restores, *concurrency_counters);
}
bool is_internal_restore = restore_settings.internal;

View File

@ -81,7 +81,6 @@ private:
BackupMutablePtr backup,
const std::shared_ptr<ASTBackupQuery> & backup_query,
const BackupOperationID & backup_id,
const String & backup_name_for_logging,
const BackupSettings & backup_settings,
std::shared_ptr<IBackupCoordination> backup_coordination,
ContextMutablePtr context,
@ -102,7 +101,6 @@ private:
void doRestore(
const std::shared_ptr<ASTBackupQuery> & restore_query,
const BackupOperationID & restore_id,
const String & backup_name_for_logging,
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr<IRestoreCoordination> restore_coordination,

View File

@ -20,29 +20,27 @@ class IBackupCoordination
public:
virtual ~IBackupCoordination() = default;
/// Sets that the backup query was sent to other hosts.
/// Function waitOtherHostsFinish() will check that to find out if it should really wait or not.
virtual void setBackupQueryIsSentToOtherHosts() = 0;
virtual bool isBackupQuerySentToOtherHosts() const = 0;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual Strings setStage(const String & new_stage, const String & message, bool sync) = 0;
/// Sets that the backup query was sent to other hosts.
/// Function waitForOtherHostsToFinish() will check that to find out if it should really wait or not.
virtual void setBackupQueryWasSentToOtherHosts() = 0;
/// Lets other hosts know that the current host has encountered an error.
virtual bool trySetError(std::exception_ptr exception) = 0;
/// Lets other hosts know that the current host has finished its work.
virtual void finish() = 0;
/// Lets other hosts know that the current host has finished its work (as a part of error-handling process).
virtual bool tryFinishAfterError() noexcept = 0;
/// Returns true if the information is successfully passed so other hosts can read it.
virtual bool setError(std::exception_ptr exception, bool throw_if_error) = 0;
/// Waits until all the other hosts finish their work.
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
virtual void waitForOtherHostsToFinish() = 0;
virtual bool waitOtherHostsFinish(bool throw_if_error) const = 0;
/// Waits until all the other hosts finish their work (as a part of error-handling process).
/// Doesn't stops waiting if some host encounters an error or gets cancelled.
virtual bool tryWaitForOtherHostsToFinishAfterError() noexcept = 0;
/// Lets other hosts know that the current host has finished its work.
virtual bool finish(bool throw_if_error) = 0;
/// Removes temporary nodes in ZooKeeper.
virtual bool cleanup(bool throw_if_error) = 0;
struct PartNameAndChecksum
{

View File

@ -18,29 +18,27 @@ class IRestoreCoordination
public:
virtual ~IRestoreCoordination() = default;
/// Sets that the restore query was sent to other hosts.
/// Function waitOtherHostsFinish() will check that to find out if it should really wait or not.
virtual void setRestoreQueryIsSentToOtherHosts() = 0;
virtual bool isRestoreQuerySentToOtherHosts() const = 0;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual Strings setStage(const String & new_stage, const String & message, bool sync) = 0;
/// Sets that the restore query was sent to other hosts.
/// Function waitForOtherHostsToFinish() will check that to find out if it should really wait or not.
virtual void setRestoreQueryWasSentToOtherHosts() = 0;
/// Lets other hosts know that the current host has encountered an error.
virtual bool trySetError(std::exception_ptr exception) = 0;
/// Lets other hosts know that the current host has finished its work.
virtual void finish() = 0;
/// Lets other hosts know that the current host has finished its work (as a part of error-handling process).
virtual bool tryFinishAfterError() noexcept = 0;
/// Returns true if the information is successfully passed so other hosts can read it.
virtual bool setError(std::exception_ptr exception, bool throw_if_error) = 0;
/// Waits until all the other hosts finish their work.
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
virtual void waitForOtherHostsToFinish() = 0;
virtual bool waitOtherHostsFinish(bool throw_if_error) const = 0;
/// Waits until all the other hosts finish their work (as a part of error-handling process).
/// Doesn't stops waiting if some host encounters an error or gets cancelled.
virtual bool tryWaitForOtherHostsToFinishAfterError() noexcept = 0;
/// Lets other hosts know that the current host has finished its work.
virtual bool finish(bool throw_if_error) = 0;
/// Removes temporary nodes in ZooKeeper.
virtual bool cleanup(bool throw_if_error) = 0;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
virtual bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) = 0;

View File

@ -10,9 +10,9 @@ namespace DB
{
RestoreCoordinationLocal::RestoreCoordinationLocal(
const UUID & restore_uuid, bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_)
bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_)
: log(getLogger("RestoreCoordinationLocal"))
, concurrency_check(restore_uuid, /* is_restore = */ true, /* on_cluster = */ false, allow_concurrent_restore_, concurrency_counters_)
, concurrency_check(/* is_restore = */ true, /* on_cluster = */ false, /* zookeeper_path = */ "", allow_concurrent_restore_, concurrency_counters_)
{
}

View File

@ -17,16 +17,16 @@ class ASTCreateQuery;
class RestoreCoordinationLocal : public IRestoreCoordination
{
public:
RestoreCoordinationLocal(const UUID & restore_uuid_, bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_);
RestoreCoordinationLocal(bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_);
~RestoreCoordinationLocal() override;
void setRestoreQueryIsSentToOtherHosts() override {}
bool isRestoreQuerySentToOtherHosts() const override { return false; }
Strings setStage(const String &, const String &, bool) override { return {}; }
void setRestoreQueryWasSentToOtherHosts() override {}
bool trySetError(std::exception_ptr) override { return true; }
void finish() override {}
bool tryFinishAfterError() noexcept override { return true; }
void waitForOtherHostsToFinish() override {}
bool tryWaitForOtherHostsToFinishAfterError() noexcept override { return true; }
bool setError(std::exception_ptr, bool) override { return true; }
bool waitOtherHostsFinish(bool) const override { return true; }
bool finish(bool) override { return true; }
bool cleanup(bool) override { return true; }
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -35,17 +35,21 @@ RestoreCoordinationOnCluster::RestoreCoordinationOnCluster(
, current_host_index(BackupCoordinationOnCluster::findCurrentHostIndex(current_host, all_hosts))
, log(getLogger("RestoreCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, concurrency_check(restore_uuid_, /* is_restore = */ true, /* on_cluster = */ true, allow_concurrent_restore_, concurrency_counters_)
, stage_sync(/* is_restore = */ true, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_restore_, with_retries, schedule_, process_list_element_, log)
, cleaner(zookeeper_path, with_retries, log)
, cleaner(/* is_restore = */ true, zookeeper_path, with_retries, log)
, stage_sync(/* is_restore = */ true, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_restore_, concurrency_counters_, with_retries, schedule_, process_list_element_, log)
{
createRootNodes();
try
{
createRootNodes();
}
catch (...)
{
stage_sync.setError(std::current_exception(), /* throw_if_error = */ false);
throw;
}
}
RestoreCoordinationOnCluster::~RestoreCoordinationOnCluster()
{
tryFinishImpl();
}
RestoreCoordinationOnCluster::~RestoreCoordinationOnCluster() = default;
void RestoreCoordinationOnCluster::createRootNodes()
{
@ -66,69 +70,52 @@ void RestoreCoordinationOnCluster::createRootNodes()
});
}
void RestoreCoordinationOnCluster::setRestoreQueryIsSentToOtherHosts()
{
stage_sync.setQueryIsSentToOtherHosts();
}
bool RestoreCoordinationOnCluster::isRestoreQuerySentToOtherHosts() const
{
return stage_sync.isQuerySentToOtherHosts();
}
Strings RestoreCoordinationOnCluster::setStage(const String & new_stage, const String & message, bool sync)
{
stage_sync.setStage(new_stage, message);
if (!sync)
return {};
return stage_sync.waitForHostsToReachStage(new_stage, all_hosts_without_initiator);
if (sync)
return stage_sync.waitHostsReachStage(all_hosts_without_initiator, new_stage);
return {};
}
void RestoreCoordinationOnCluster::setRestoreQueryWasSentToOtherHosts()
bool RestoreCoordinationOnCluster::setError(std::exception_ptr exception, bool throw_if_error)
{
restore_query_was_sent_to_other_hosts = true;
return stage_sync.setError(exception, throw_if_error);
}
bool RestoreCoordinationOnCluster::trySetError(std::exception_ptr exception)
bool RestoreCoordinationOnCluster::waitOtherHostsFinish(bool throw_if_error) const
{
return stage_sync.trySetError(exception);
return stage_sync.waitOtherHostsFinish(throw_if_error);
}
void RestoreCoordinationOnCluster::finish()
bool RestoreCoordinationOnCluster::finish(bool throw_if_error)
{
bool other_hosts_also_finished = false;
stage_sync.finish(other_hosts_also_finished);
if ((current_host == kInitiator) && (other_hosts_also_finished || !restore_query_was_sent_to_other_hosts))
cleaner.cleanup();
return stage_sync.finish(throw_if_error);
}
bool RestoreCoordinationOnCluster::tryFinishAfterError() noexcept
bool RestoreCoordinationOnCluster::cleanup(bool throw_if_error)
{
return tryFinishImpl();
}
bool RestoreCoordinationOnCluster::tryFinishImpl() noexcept
{
bool other_hosts_also_finished = false;
if (!stage_sync.tryFinishAfterError(other_hosts_also_finished))
return false;
if ((current_host == kInitiator) && (other_hosts_also_finished || !restore_query_was_sent_to_other_hosts))
/// All the hosts must finish before we remove the coordination nodes.
bool expect_other_hosts_finished = stage_sync.isQuerySentToOtherHosts() || !stage_sync.isErrorSet();
bool all_hosts_finished = stage_sync.finished() && (stage_sync.otherHostsFinished() || !expect_other_hosts_finished);
if (!all_hosts_finished)
{
if (!cleaner.tryCleanupAfterError())
return false;
}
return true;
}
void RestoreCoordinationOnCluster::waitForOtherHostsToFinish()
{
if ((current_host != kInitiator) || !restore_query_was_sent_to_other_hosts)
return;
stage_sync.waitForOtherHostsToFinish();
}
bool RestoreCoordinationOnCluster::tryWaitForOtherHostsToFinishAfterError() noexcept
{
if (current_host != kInitiator)
auto unfinished_hosts = expect_other_hosts_finished ? stage_sync.getUnfinishedHosts() : Strings{current_host};
LOG_INFO(log, "Skipping removing nodes from ZooKeeper because hosts {} didn't finish",
BackupCoordinationStageSync::getHostsDesc(unfinished_hosts));
return false;
if (!restore_query_was_sent_to_other_hosts)
return true;
return stage_sync.tryWaitForOtherHostsToFinishAfterError();
}
return cleaner.cleanup(throw_if_error);
}
ZooKeeperRetriesInfo RestoreCoordinationOnCluster::getOnClusterInitializationKeeperRetriesInfo() const

View File

@ -1,7 +1,6 @@
#pragma once
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupConcurrencyCheck.h>
#include <Backups/BackupCoordinationCleaner.h>
#include <Backups/BackupCoordinationStageSync.h>
#include <Backups/WithRetries.h>
@ -15,7 +14,7 @@ class RestoreCoordinationOnCluster : public IRestoreCoordination
{
public:
/// Empty string as the current host is used to mark the initiator of a RESTORE ON CLUSTER query.
static const constexpr std::string_view kInitiator;
static const constexpr std::string_view kInitiator = BackupCoordinationStageSync::kInitiator;
RestoreCoordinationOnCluster(
const UUID & restore_uuid_,
@ -31,13 +30,13 @@ public:
~RestoreCoordinationOnCluster() override;
void setRestoreQueryIsSentToOtherHosts() override;
bool isRestoreQuerySentToOtherHosts() const override;
Strings setStage(const String & new_stage, const String & message, bool sync) override;
void setRestoreQueryWasSentToOtherHosts() override;
bool trySetError(std::exception_ptr exception) override;
void finish() override;
bool tryFinishAfterError() noexcept override;
void waitForOtherHostsToFinish() override;
bool tryWaitForOtherHostsToFinishAfterError() noexcept override;
bool setError(std::exception_ptr exception, bool throw_if_error) override;
bool waitOtherHostsFinish(bool throw_if_error) const override;
bool finish(bool throw_if_error) override;
bool cleanup(bool throw_if_error) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
@ -78,11 +77,10 @@ private:
const size_t current_host_index;
LoggerPtr const log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;
BackupConcurrencyCheck concurrency_check;
BackupCoordinationStageSync stage_sync;
BackupCoordinationCleaner cleaner;
std::atomic<bool> restore_query_was_sent_to_other_hosts = false;
BackupCoordinationStageSync stage_sync;
};
}

View File

@ -3669,6 +3669,11 @@ Given that, for example, dictionaries, can be out of sync across nodes, mutation
</profiles>
```
)", 0) \
DECLARE(Bool, validate_mutation_query, true, R"(
Validate mutation queries before accepting them. Mutations are executed in the background, and running an invalid query will cause mutations to get stuck, requiring manual intervention.
Only change this setting if you encounter a backward-incompatible bug.
)", 0) \
DECLARE(Seconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, R"(
Defines how many seconds a locking request waits before failing.

View File

@ -64,6 +64,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
},
{"24.11",
{
{"validate_mutation_query", false, true, "New setting to validate mutation queries by default."},
{"enable_job_stack_trace", false, true, "Enable by default collecting stack traces from job's scheduling."},
{"allow_suspicious_types_in_group_by", true, false, "Don't allow Variant/Dynamic types in GROUP BY by default"},
{"allow_suspicious_types_in_order_by", true, false, "Don't allow Variant/Dynamic types in ORDER BY by default"},

View File

@ -53,6 +53,7 @@ namespace Setting
extern const SettingsBool allow_nondeterministic_mutations;
extern const SettingsUInt64 max_block_size;
extern const SettingsBool use_concurrency_control;
extern const SettingsBool validate_mutation_query;
}
namespace MergeTreeSetting
@ -1386,6 +1387,18 @@ void MutationsInterpreter::validate()
}
}
// Make sure the mutation query is valid
if (context->getSettingsRef()[Setting::validate_mutation_query])
{
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
prepareQueryAffectedQueryTree(commands, source.getStorage(), context);
else
{
ASTPtr select_query = prepareQueryAffectedAST(commands, source.getStorage(), context);
InterpreterSelectQuery(select_query, context, source.getStorage(), metadata_snapshot);
}
}
QueryPlan plan;
initQueryPlan(stages.front(), plan);

View File

@ -251,23 +251,16 @@ def kill_query(
if is_initial_query is not None
else ""
)
old_time = time.monotonic()
node.query(
f"KILL QUERY WHERE (query_kind='{query_kind}') AND (query LIKE '%{id}%'){filter_for_is_initial_query} SYNC"
)
node.query("SYSTEM FLUSH LOGS")
duration = (
int(
node.query(
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{id}%' AND type='QueryFinish'"
)
)
/ 1000
)
waited = time.monotonic() - old_time
print(
f"{get_node_name(node)}: Cancelled {operation_name} {id} after {duration} seconds"
f"{get_node_name(node)}: Cancelled {operation_name} {id} after {waited} seconds"
)
if timeout is not None:
assert duration < timeout
assert waited < timeout
# Stops all ZooKeeper servers.
@ -305,7 +298,7 @@ def sleep(seconds):
class NoTrashChecker:
def __init__(self):
self.expect_backups = []
self.expect_unfinished_backups = []
self.allow_unfinished_backups = []
self.expect_errors = []
self.allow_errors = []
self.check_zookeeper = True
@ -373,7 +366,7 @@ class NoTrashChecker:
if unfinished_backups:
print(f"Found unfinished backups: {unfinished_backups}")
assert new_backups == set(self.expect_backups)
assert unfinished_backups == set(self.expect_unfinished_backups)
assert unfinished_backups.difference(self.allow_unfinished_backups) == set()
all_errors = set()
start_time = time.strftime(
@ -641,7 +634,7 @@ def test_long_disconnection_stops_backup():
assert get_status(initiator, backup_id=backup_id) == "CREATING_BACKUP"
assert get_num_system_processes(initiator, backup_id=backup_id) >= 1
no_trash_checker.expect_unfinished_backups = [backup_id]
no_trash_checker.allow_unfinished_backups = [backup_id]
no_trash_checker.allow_errors = [
"FAILED_TO_SYNC_BACKUP_OR_RESTORE",
"KEEPER_EXCEPTION",
@ -674,7 +667,7 @@ def test_long_disconnection_stops_backup():
# A backup is expected to fail, but it isn't expected to fail too soon.
print(f"Backup failed after {time_to_fail} seconds disconnection")
assert time_to_fail > 3
assert time_to_fail < 30
assert time_to_fail < 35
# A backup must NOT be stopped if Zookeeper is disconnected shorter than `failure_after_host_disconnected_for_seconds`.
@ -695,7 +688,7 @@ def test_short_disconnection_doesnt_stop_backup():
backup_id = random_id()
initiator.query(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {get_backup_name(backup_id)} SETTINGS id='{backup_id}' ASYNC",
settings={"backup_restore_failure_after_host_disconnected_for_seconds": 6},
settings={"backup_restore_failure_after_host_disconnected_for_seconds": 10},
)
assert get_status(initiator, backup_id=backup_id) == "CREATING_BACKUP"
@ -703,13 +696,13 @@ def test_short_disconnection_doesnt_stop_backup():
# Dropping connection for less than `failure_after_host_disconnected_for_seconds`
with PartitionManager() as pm:
random_sleep(3)
random_sleep(4)
node_to_drop_zk_connection = random_node()
print(
f"Dropping connection between {get_node_name(node_to_drop_zk_connection)} and ZooKeeper"
)
pm.drop_instance_zk_connections(node_to_drop_zk_connection)
random_sleep(3)
random_sleep(4)
print(
f"Restoring connection between {get_node_name(node_to_drop_zk_connection)} and ZooKeeper"
)

View File

@ -27,6 +27,7 @@ REPLICATED_POSTPONE_MUTATION_LOG = (
POSTPONE_MUTATION_LOG = (
"According to exponential backoff policy, do not perform mutations for the part"
)
FAILING_MUTATION_QUERY = "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT throwIf(1)) SETTINGS allow_nondeterministic_mutations = 1"
all_nodes = [node_with_backoff, node_no_backoff]
@ -83,17 +84,13 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log
assert not node.contains_in_log(POSTPONE_MUTATION_LOG)
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
check_logs()
node.query("KILL MUTATION WHERE table='test_mutations'")
# Check that after kill new parts mutations are postponing.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
check_logs()
@ -101,9 +98,7 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log
def test_exponential_backoff_with_replicated_tree(started_cluster):
prepare_cluster(True)
node_with_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
node_with_backoff.query(FAILING_MUTATION_QUERY)
assert node_with_backoff.wait_for_log_line(REPLICATED_POSTPONE_MUTATION_LOG)
assert not node_no_backoff.contains_in_log(REPLICATED_POSTPONE_MUTATION_LOG)
@ -114,7 +109,7 @@ def test_exponential_backoff_create_dependent_table(started_cluster):
# Executing incorrect mutation.
node_with_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations = 1, validate_mutation_query = 0"
)
# Creating dependent table for mutation.
@ -148,9 +143,7 @@ def test_exponential_backoff_setting_override(started_cluster):
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
assert not node.contains_in_log(POSTPONE_MUTATION_LOG)
@ -166,9 +159,7 @@ def test_backoff_clickhouse_restart(started_cluster, replicated_table):
node = node_with_backoff
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
assert node.wait_for_log_line(
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)
@ -193,11 +184,10 @@ def test_no_backoff_after_killing_mutation(started_cluster, replicated_table):
node = node_with_backoff
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
# Executing correct mutation.
node.query("ALTER TABLE test_mutations DELETE WHERE x=1")
node.query("ALTER TABLE test_mutations DELETE WHERE x=1")
assert node.wait_for_log_line(
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)

View File

@ -7,5 +7,5 @@ select count() from test_qualify; -- 100
select * from test_qualify qualify row_number() over (order by number) = 50 SETTINGS enable_analyzer = 1; -- 49
select * from test_qualify qualify row_number() over (order by number) = 50 SETTINGS enable_analyzer = 0; -- { serverError NOT_IMPLEMENTED }
delete from test_qualify where number in (select number from test_qualify qualify row_number() over (order by number) = 50); -- { serverError UNFINISHED }
delete from test_qualify where number in (select number from test_qualify qualify row_number() over (order by number) = 50) SETTINGS validate_mutation_query = 0; -- { serverError UNFINISHED }
select count() from test_qualify; -- 100

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS t2;
CREATE TABLE t (x int) ENGINE = MergeTree() ORDER BY ();
DELETE FROM t WHERE y in (SELECT x FROM t); -- { serverError UNKNOWN_IDENTIFIER }
DELETE FROM t WHERE x in (SELECT y FROM t); -- { serverError UNKNOWN_IDENTIFIER }
DELETE FROM t WHERE x IN (SELECT * FROM t2); -- { serverError UNKNOWN_TABLE }
ALTER TABLE t DELETE WHERE x in (SELECT y FROM t); -- { serverError UNKNOWN_IDENTIFIER }
ALTER TABLE t UPDATE x = 1 WHERE x IN (SELECT y FROM t); -- { serverError UNKNOWN_IDENTIFIER }
DELETE FROM t WHERE x IN (SELECT foo FROM bar) SETTINGS validate_mutation_query = 0;
ALTER TABLE t ADD COLUMN y int;
DELETE FROM t WHERE y in (SELECT y FROM t);
CREATE TABLE t2 (x int) ENGINE = MergeTree() ORDER BY ();
DELETE FROM t WHERE x IN (SELECT * FROM t2);
DROP TABLE t;
DROP TABLE t2;