mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Local backup and restore
This commit is contained in:
parent
f06d3b29be
commit
86ba6ad1e8
23
src/Backups/BackupCoordinationKeeperMapTables.cpp
Normal file
23
src/Backups/BackupCoordinationKeeperMapTables.cpp
Normal file
@ -0,0 +1,23 @@
|
||||
#include <Backups/BackupCoordinationKeeperMapTables.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void BackupCoordinationKeeperMapTables::addTable(const std::string & table_zookeeper_root_path, const std::string & table_id, const std::string & data_path_in_backup)
|
||||
{
|
||||
if (auto it = tables_with_info.find(table_zookeeper_root_path); it != tables_with_info.end())
|
||||
{
|
||||
if (table_id > it->second.table_id)
|
||||
it->second = KeeperMapTableInfo{table_id, data_path_in_backup};
|
||||
return;
|
||||
}
|
||||
|
||||
tables_with_info.emplace(table_zookeeper_root_path, KeeperMapTableInfo{table_id, data_path_in_backup});
|
||||
}
|
||||
|
||||
std::string BackupCoordinationKeeperMapTables::getDataPath(const std::string & table_zookeeper_root_path) const
|
||||
{
|
||||
return tables_with_info.at(table_zookeeper_root_path).data_path_in_backup;
|
||||
}
|
||||
|
||||
}
|
23
src/Backups/BackupCoordinationKeeperMapTables.h
Normal file
23
src/Backups/BackupCoordinationKeeperMapTables.h
Normal file
@ -0,0 +1,23 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct BackupCoordinationKeeperMapTables
|
||||
{
|
||||
void addTable(const std::string & table_zookeeper_root_path, const std::string & table_id, const std::string & data_path_in_backup);
|
||||
std::string getDataPath(const std::string & table_zookeeper_root_path) const;
|
||||
private:
|
||||
struct KeeperMapTableInfo
|
||||
{
|
||||
std::string table_id;
|
||||
std::string data_path_in_backup;
|
||||
};
|
||||
|
||||
std::unordered_map<std::string /* root zookeeper path */, KeeperMapTableInfo> tables_with_info;
|
||||
};
|
||||
|
||||
}
|
@ -97,6 +97,18 @@ Strings BackupCoordinationLocal::getReplicatedSQLObjectsDirs(const String & load
|
||||
return replicated_sql_objects.getDirectories(loader_zk_path, object_type, "");
|
||||
}
|
||||
|
||||
void BackupCoordinationLocal::addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup)
|
||||
{
|
||||
std::lock_guard lock(keeper_map_tables_mutex);
|
||||
keeper_map_tables.addTable(table_zookeeper_root_path, table_id, data_path_in_backup);
|
||||
}
|
||||
|
||||
String BackupCoordinationLocal::getKeeperMapDataPath(const String & table_zookeeper_root_path) const
|
||||
{
|
||||
std::lock_guard lock(keeper_map_tables_mutex);
|
||||
return keeper_map_tables.getDataPath(table_zookeeper_root_path);
|
||||
}
|
||||
|
||||
|
||||
void BackupCoordinationLocal::addFileInfos(BackupFileInfos && file_infos_)
|
||||
{
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <Backups/BackupCoordinationReplicatedSQLObjects.h>
|
||||
#include <Backups/BackupCoordinationReplicatedTables.h>
|
||||
#include <base/defines.h>
|
||||
#include "Backups/BackupCoordinationKeeperMapTables.h"
|
||||
#include <cstddef>
|
||||
#include <mutex>
|
||||
#include <unordered_set>
|
||||
|
||||
@ -44,6 +46,9 @@ public:
|
||||
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
|
||||
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;
|
||||
|
||||
void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) override;
|
||||
String getKeeperMapDataPath(const String & table_zookeeper_root_path) const override;
|
||||
|
||||
void addFileInfos(BackupFileInfos && file_infos) override;
|
||||
BackupFileInfos getFileInfos() const override;
|
||||
BackupFileInfos getFileInfosForAllHosts() const override;
|
||||
@ -58,13 +63,22 @@ private:
|
||||
BackupCoordinationReplicatedAccess TSA_GUARDED_BY(replicated_access_mutex) replicated_access;
|
||||
BackupCoordinationReplicatedSQLObjects TSA_GUARDED_BY(replicated_sql_objects_mutex) replicated_sql_objects;
|
||||
BackupCoordinationFileInfos TSA_GUARDED_BY(file_infos_mutex) file_infos;
|
||||
BackupCoordinationKeeperMapTables keeper_map_tables TSA_GUARDED_BY(keeper_map_tables_mutex);
|
||||
std::unordered_set<size_t> TSA_GUARDED_BY(writing_files_mutex) writing_files;
|
||||
|
||||
struct KeeperMapTableInfo
|
||||
{
|
||||
String table_id;
|
||||
String data_path_in_backup;
|
||||
};
|
||||
|
||||
|
||||
mutable std::mutex replicated_tables_mutex;
|
||||
mutable std::mutex replicated_access_mutex;
|
||||
mutable std::mutex replicated_sql_objects_mutex;
|
||||
mutable std::mutex file_infos_mutex;
|
||||
mutable std::mutex writing_files_mutex;
|
||||
mutable std::mutex keeper_map_tables_mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -666,6 +666,19 @@ void BackupCoordinationRemote::prepareReplicatedSQLObjects() const
|
||||
replicated_sql_objects->addDirectory(std::move(directory));
|
||||
}
|
||||
|
||||
void BackupCoordinationRemote::addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup)
|
||||
{
|
||||
std::lock_guard lock(keeper_map_tables_mutex);
|
||||
keeper_map_tables.addTable(table_zookeeper_root_path, table_id, data_path_in_backup);
|
||||
}
|
||||
|
||||
String BackupCoordinationRemote::getKeeperMapDataPath(const String & table_zookeeper_root_path) const
|
||||
{
|
||||
std::lock_guard lock(keeper_map_tables_mutex);
|
||||
return keeper_map_tables.getDataPath(table_zookeeper_root_path);
|
||||
}
|
||||
|
||||
|
||||
void BackupCoordinationRemote::addFileInfos(BackupFileInfos && file_infos_)
|
||||
{
|
||||
{
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Backups/BackupCoordinationReplicatedTables.h>
|
||||
#include <Backups/BackupCoordinationStageSync.h>
|
||||
#include <Backups/WithRetries.h>
|
||||
#include "Backups/BackupCoordinationKeeperMapTables.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -63,6 +64,9 @@ public:
|
||||
void addReplicatedSQLObjectsDir(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & dir_path) override;
|
||||
Strings getReplicatedSQLObjectsDirs(const String & loader_zk_path, UserDefinedSQLObjectType object_type) const override;
|
||||
|
||||
void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) override;
|
||||
String getKeeperMapDataPath(const String & table_zookeeper_root_path) const override;
|
||||
|
||||
void addFileInfos(BackupFileInfos && file_infos) override;
|
||||
BackupFileInfos getFileInfos() const override;
|
||||
BackupFileInfos getFileInfosForAllHosts() const override;
|
||||
@ -108,12 +112,21 @@ private:
|
||||
mutable std::optional<BackupCoordinationFileInfos> TSA_GUARDED_BY(file_infos_mutex) file_infos;
|
||||
std::unordered_set<size_t> TSA_GUARDED_BY(writing_files_mutex) writing_files;
|
||||
|
||||
struct KeeperMapTableInfo
|
||||
{
|
||||
String table_id;
|
||||
String data_path_in_backup;
|
||||
};
|
||||
|
||||
mutable BackupCoordinationKeeperMapTables keeper_map_tables TSA_GUARDED_BY(keeper_map_tables_mutex);
|
||||
|
||||
mutable std::mutex zookeeper_mutex;
|
||||
mutable std::mutex replicated_tables_mutex;
|
||||
mutable std::mutex replicated_access_mutex;
|
||||
mutable std::mutex replicated_sql_objects_mutex;
|
||||
mutable std::mutex file_infos_mutex;
|
||||
mutable std::mutex writing_files_mutex;
|
||||
mutable std::mutex keeper_map_tables_mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -58,16 +58,7 @@ namespace
|
||||
|
||||
auto get_zookeeper = [global_context = context->getGlobalContext()] { return global_context->getZooKeeper(); };
|
||||
|
||||
BackupCoordinationRemote::BackupKeeperSettings keeper_settings
|
||||
{
|
||||
.keeper_max_retries = context->getSettingsRef().backup_restore_keeper_max_retries,
|
||||
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
|
||||
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms,
|
||||
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_restore_batch_size_for_keeper_multiread,
|
||||
.keeper_fault_injection_probability = context->getSettingsRef().backup_restore_keeper_fault_injection_probability,
|
||||
.keeper_fault_injection_seed = context->getSettingsRef().backup_restore_keeper_fault_injection_seed,
|
||||
.keeper_value_max_size = context->getSettingsRef().backup_restore_keeper_value_max_size,
|
||||
};
|
||||
BackupCoordinationRemote::BackupKeeperSettings keeper_settings = WithRetries::KeeperSettings::fromContext(context);
|
||||
|
||||
auto all_hosts = BackupSettings::Util::filterHostIDs(
|
||||
backup_settings.cluster_host_ids, backup_settings.shard_num, backup_settings.replica_num);
|
||||
|
@ -56,6 +56,12 @@ public:
|
||||
/// Returns all mutations of a replicated table which are not finished for some data parts added by addReplicatedPartNames().
|
||||
virtual std::vector<MutationInfo> getReplicatedMutations(const String & table_shared_id, const String & replica_name) const = 0;
|
||||
|
||||
/// Adds information about KeeperMap tables
|
||||
virtual void addKeeperMapTable(const String & table_zookeeper_root_path, const String & table_id, const String & data_path_in_backup) = 0;
|
||||
|
||||
/// KeeperMap tables use shared storage without local data so only one table should backup the data
|
||||
virtual String getKeeperMapDataPath(const String & table_zookeeper_root_path) const = 0;
|
||||
|
||||
/// Adds a data path in backup for a replicated table.
|
||||
/// Multiple replicas of the replicated table call this function and then all the added paths can be returned by call of the function
|
||||
/// getReplicatedDataPaths().
|
||||
|
@ -41,6 +41,10 @@ public:
|
||||
/// The function returns false if user-defined function at a specified zk path are being already restored by another replica.
|
||||
virtual bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) = 0;
|
||||
|
||||
/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
|
||||
/// The function returns false if data for this specific root path is already being restored by another table.
|
||||
virtual bool acquireInsertingDataForKeeperMap(const String & root_zk_path) = 0;
|
||||
|
||||
/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
|
||||
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
|
||||
virtual void generateUUIDForTable(ASTCreateQuery & create_query) = 0;
|
||||
|
@ -52,6 +52,12 @@ bool RestoreCoordinationLocal::acquireReplicatedSQLObjects(const String &, UserD
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RestoreCoordinationLocal::acquireInsertingDataForKeeperMap(const String & root_zk_path)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return acquired_data_in_keeper_map_tables.emplace(root_zk_path).second;
|
||||
}
|
||||
|
||||
void RestoreCoordinationLocal::generateUUIDForTable(ASTCreateQuery & create_query)
|
||||
{
|
||||
String query_str = serializeAST(create_query);
|
||||
|
@ -40,6 +40,10 @@ public:
|
||||
/// The function returns false if user-defined function at a specified zk path are being already restored by another replica.
|
||||
bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) override;
|
||||
|
||||
/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
|
||||
/// The function returns false if data for this specific root path is already being restored by another table.
|
||||
bool acquireInsertingDataForKeeperMap(const String & root_zk_path) override;
|
||||
|
||||
/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
|
||||
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
|
||||
void generateUUIDForTable(ASTCreateQuery & create_query) override;
|
||||
@ -52,6 +56,7 @@ private:
|
||||
std::set<std::pair<String /* database_zk_path */, String /* table_name */>> acquired_tables_in_replicated_databases;
|
||||
std::unordered_set<String /* table_zk_path */> acquired_data_in_replicated_tables;
|
||||
std::unordered_map<String, ASTCreateQuery::UUIDs> create_query_uuids;
|
||||
std::unordered_set<String /* root_zk_path */> acquired_data_in_keeper_map_tables;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
@ -234,6 +234,11 @@ bool RestoreCoordinationRemote::acquireReplicatedSQLObjects(const String & loade
|
||||
return result;
|
||||
}
|
||||
|
||||
bool RestoreCoordinationRemote::acquireInsertingDataForKeeperMap(const String & /*root_zk_path*/)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
void RestoreCoordinationRemote::generateUUIDForTable(ASTCreateQuery & create_query)
|
||||
{
|
||||
String query_str = serializeAST(create_query);
|
||||
|
@ -46,6 +46,10 @@ public:
|
||||
/// The function returns false if user-defined function at a specified zk path are being already restored by another replica.
|
||||
bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) override;
|
||||
|
||||
/// Sets that this table is going to restore data into Keeper for all KeeperMap tables defined on root_zk_path.
|
||||
/// The function returns false if data for this specific root path is already being restored by another table.
|
||||
bool acquireInsertingDataForKeeperMap(const String & root_zk_path) override;
|
||||
|
||||
/// Generates a new UUID for a table. The same UUID must be used for a replicated table on each replica,
|
||||
/// (because otherwise the macro "{uuid}" in the ZooKeeper path will not work correctly).
|
||||
void generateUUIDForTable(ASTCreateQuery & create_query) override;
|
||||
|
@ -5,6 +5,21 @@ namespace DB
|
||||
{
|
||||
|
||||
|
||||
WithRetries::KeeperSettings WithRetries::KeeperSettings::fromContext(ContextPtr context)
|
||||
{
|
||||
return
|
||||
{
|
||||
.keeper_max_retries = context->getSettingsRef().backup_restore_keeper_max_retries,
|
||||
.keeper_retry_initial_backoff_ms = context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
|
||||
.keeper_retry_max_backoff_ms = context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms,
|
||||
.batch_size_for_keeper_multiread = context->getSettingsRef().backup_restore_batch_size_for_keeper_multiread,
|
||||
.keeper_fault_injection_probability = context->getSettingsRef().backup_restore_keeper_fault_injection_probability,
|
||||
.keeper_fault_injection_seed = context->getSettingsRef().backup_restore_keeper_fault_injection_seed,
|
||||
.keeper_value_max_size = context->getSettingsRef().backup_restore_keeper_value_max_size,
|
||||
.batch_size_for_keeper_multi = context->getSettingsRef().backup_restore_batch_size_for_keeper_multi,
|
||||
};
|
||||
}
|
||||
|
||||
WithRetries::WithRetries(Poco::Logger * log_, zkutil::GetZooKeeper get_zookeeper_, const KeeperSettings & settings_, RenewerCallback callback_)
|
||||
: log(log_)
|
||||
, get_zookeeper(get_zookeeper_)
|
||||
@ -42,6 +57,11 @@ void WithRetries::renewZooKeeper(FaultyKeeper my_faulty_zookeeper) const
|
||||
}
|
||||
}
|
||||
|
||||
const WithRetries::KeeperSettings & WithRetries::getKeeperSettings() const
|
||||
{
|
||||
return settings;
|
||||
}
|
||||
|
||||
WithRetries::FaultyKeeper WithRetries::getFaultyZooKeeper() const
|
||||
{
|
||||
/// We need to create new instance of ZooKeeperWithFaultInjection each time a copy a pointer to ZooKeeper client there
|
||||
|
@ -26,6 +26,9 @@ public:
|
||||
Float64 keeper_fault_injection_probability{0};
|
||||
UInt64 keeper_fault_injection_seed{42};
|
||||
UInt64 keeper_value_max_size{1048576};
|
||||
UInt64 batch_size_for_keeper_multi{1000};
|
||||
|
||||
static KeeperSettings fromContext(ContextPtr context);
|
||||
};
|
||||
|
||||
/// For simplicity a separate ZooKeeperRetriesInfo and a faulty [Zoo]Keeper client
|
||||
@ -53,6 +56,8 @@ public:
|
||||
|
||||
/// Used to re-establish new connection inside a retry loop.
|
||||
void renewZooKeeper(FaultyKeeper my_faulty_zookeeper) const;
|
||||
|
||||
const KeeperSettings & getKeeperSettings() const;
|
||||
private:
|
||||
/// This will provide a special wrapper which is useful for testing
|
||||
FaultyKeeper getFaultyZooKeeper() const;
|
||||
|
@ -465,6 +465,7 @@ class IColumn;
|
||||
M(UInt64, backup_restore_keeper_fault_injection_seed, 0, "0 - random seed, otherwise the setting value", 0) \
|
||||
M(UInt64, backup_restore_keeper_value_max_size, 1048576, "Maximum size of data of a [Zoo]Keeper's node during backup", 0) \
|
||||
M(UInt64, backup_restore_batch_size_for_keeper_multiread, 10000, "Maximum size of batch for multiread request to [Zoo]Keeper during backup or restore", 0) \
|
||||
M(UInt64, backup_restore_batch_size_for_keeper_multi, 1000, "Maximum size of batch for multi request to [Zoo]Keeper during backup or restore", 0) \
|
||||
M(UInt64, max_backup_bandwidth, 0, "The maximum read speed in bytes per second for particular backup on server. Zero means unlimited.", 0) \
|
||||
\
|
||||
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <memory>
|
||||
#include <Storages/StorageKeeperMap.h>
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
@ -13,6 +14,9 @@
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/MutationsInterpreter.h>
|
||||
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
@ -38,6 +42,16 @@
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
#include <Backups/IBackupCoordination.h>
|
||||
#include <Backups/IBackupEntriesLazyBatch.h>
|
||||
#include <Backups/BackupEntryFromAppendOnlyFile.h>
|
||||
#include <Backups/BackupEntryFromMemory.h>
|
||||
#include <Backups/IBackup.h>
|
||||
#include <Backups/IRestoreCoordination.h>
|
||||
#include <Backups/RestorerFromBackup.h>
|
||||
#include <Backups/WithRetries.h>
|
||||
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
||||
#include <base/types.h>
|
||||
@ -54,6 +68,7 @@ namespace ErrorCodes
|
||||
extern const int KEEPER_EXCEPTION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int LIMIT_EXCEEDED;
|
||||
extern const int CANNOT_RESTORE_TABLE;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -296,13 +311,13 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
const StorageInMemoryMetadata & metadata,
|
||||
bool attach,
|
||||
std::string_view primary_key_,
|
||||
const std::string & root_path_,
|
||||
const std::string & zk_root_path_,
|
||||
UInt64 keys_limit_)
|
||||
: IStorage(table_id)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, root_path(zkutil::extractZooKeeperPath(root_path_, false))
|
||||
, zk_root_path(zkutil::extractZooKeeperPath(zk_root_path_, false))
|
||||
, primary_key(primary_key_)
|
||||
, zookeeper_name(zkutil::extractZooKeeperName(root_path_))
|
||||
, zookeeper_name(zkutil::extractZooKeeperName(zk_root_path_))
|
||||
, keys_limit(keys_limit_)
|
||||
, log(&Poco::Logger::get(fmt::format("StorageKeeperMap ({})", table_id.getNameForLogs())))
|
||||
{
|
||||
@ -320,10 +335,10 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
<< "primary key: " << formattedAST(metadata.getPrimaryKey().expression_list_ast) << "\n";
|
||||
metadata_string = out.str();
|
||||
|
||||
if (root_path.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "root_path should not be empty");
|
||||
if (!root_path.starts_with('/'))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "root_path should start with '/'");
|
||||
if (zk_root_path.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "zk_root_path should not be empty");
|
||||
if (!zk_root_path.starts_with('/'))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "zk_root_path should start with '/'");
|
||||
|
||||
auto config_keys_limit = context_->getConfigRef().getUInt64("keeper_map_keys_limit", 0);
|
||||
if (config_keys_limit != 0 && (keys_limit == 0 || keys_limit > config_keys_limit))
|
||||
@ -341,20 +356,20 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
LOG_INFO(log, "Keys limit will be set to {}", keys_limit);
|
||||
}
|
||||
|
||||
auto root_path_fs = fs::path(path_prefix) / std::string_view{root_path}.substr(1);
|
||||
root_path = root_path_fs.generic_string();
|
||||
auto zk_root_path_fs = fs::path(path_prefix) / std::string_view{zk_root_path}.substr(1);
|
||||
zk_root_path = zk_root_path_fs.generic_string();
|
||||
|
||||
data_path = root_path_fs / "data";
|
||||
zk_data_path = zk_root_path_fs / "data";
|
||||
|
||||
auto metadata_path_fs = root_path_fs / "metadata";
|
||||
metadata_path = metadata_path_fs;
|
||||
tables_path = metadata_path_fs / "tables";
|
||||
auto metadata_path_fs = zk_root_path_fs / "metadata";
|
||||
zk_metadata_path = metadata_path_fs;
|
||||
zk_tables_path = metadata_path_fs / "tables";
|
||||
|
||||
auto table_unique_id = toString(table_id.uuid) + toString(ServerUUID::get());
|
||||
table_path = fs::path(tables_path) / table_unique_id;
|
||||
zk_table_path = fs::path(zk_tables_path) / table_unique_id;
|
||||
|
||||
dropped_path = metadata_path_fs / "dropped";
|
||||
dropped_lock_path = fs::path(dropped_path) / "lock";
|
||||
zk_dropped_path = metadata_path_fs / "dropped";
|
||||
zk_dropped_lock_path = fs::path(zk_dropped_path) / "lock";
|
||||
|
||||
if (attach)
|
||||
{
|
||||
@ -364,17 +379,17 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
|
||||
auto client = getClient();
|
||||
|
||||
if (root_path != "/" && !client->exists(root_path))
|
||||
if (zk_root_path != "/" && !client->exists(zk_root_path))
|
||||
{
|
||||
LOG_TRACE(log, "Creating root path {}", root_path);
|
||||
client->createAncestors(root_path);
|
||||
client->createIfNotExists(root_path, "");
|
||||
LOG_TRACE(log, "Creating root path {}", zk_root_path);
|
||||
client->createAncestors(zk_root_path);
|
||||
client->createIfNotExists(zk_root_path, "");
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
{
|
||||
std::string stored_metadata_string;
|
||||
auto exists = client->tryGet(metadata_path, stored_metadata_string);
|
||||
auto exists = client->tryGet(zk_metadata_path, stored_metadata_string);
|
||||
|
||||
if (exists)
|
||||
{
|
||||
@ -384,10 +399,10 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
|
||||
root_path,
|
||||
zk_root_path,
|
||||
stored_metadata_string);
|
||||
|
||||
auto code = client->tryCreate(table_path, "", zkutil::CreateMode::Persistent);
|
||||
auto code = client->tryCreate(zk_table_path, "", zkutil::CreateMode::Persistent);
|
||||
|
||||
// tables_path was removed with drop
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
@ -397,16 +412,16 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
throw zkutil::KeeperException(code, "Failed to create table on path {} because a table with same UUID already exists", root_path);
|
||||
throw zkutil::KeeperException(code, "Failed to create table on path {} because a table with same UUID already exists", zk_root_path);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (client->exists(dropped_path))
|
||||
if (client->exists(zk_dropped_path))
|
||||
{
|
||||
LOG_INFO(log, "Removing leftover nodes");
|
||||
auto code = client->tryCreate(dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
|
||||
auto code = client->tryCreate(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
{
|
||||
@ -419,11 +434,11 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
throw Coordination::Exception::fromPath(code, dropped_lock_path);
|
||||
throw Coordination::Exception::fromPath(code, zk_dropped_lock_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
|
||||
if (!dropTable(client, metadata_drop_lock))
|
||||
continue;
|
||||
}
|
||||
@ -431,17 +446,17 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
|
||||
Coordination::Requests create_requests
|
||||
{
|
||||
zkutil::makeCreateRequest(metadata_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(data_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(tables_path, "", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(table_path, "", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_metadata_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_data_path, metadata_string, zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_tables_path, "", zkutil::CreateMode::Persistent),
|
||||
zkutil::makeCreateRequest(zk_table_path, "", zkutil::CreateMode::Persistent),
|
||||
};
|
||||
|
||||
Coordination::Responses create_responses;
|
||||
auto code = client->tryMulti(create_requests, create_responses);
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(log, "It looks like a table on path {} was created by another server at the same moment, will retry", root_path);
|
||||
LOG_INFO(log, "It looks like a table on path {} was created by another server at the same moment, will retry", zk_root_path);
|
||||
continue;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
@ -456,7 +471,7 @@ StorageKeeperMap::StorageKeeperMap(
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Cannot create metadata for table, because it is removed concurrently or because "
|
||||
"of wrong root_path ({})", root_path);
|
||||
"of wrong zk_root_path ({})", zk_root_path);
|
||||
}
|
||||
|
||||
|
||||
@ -519,7 +534,7 @@ Pipe StorageKeeperMap::read(
|
||||
|
||||
auto client = getClient();
|
||||
if (all_scan)
|
||||
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(data_path)));
|
||||
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(zk_data_path)));
|
||||
|
||||
return process_keys(std::move(filtered_keys));
|
||||
}
|
||||
@ -534,19 +549,19 @@ void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
|
||||
{
|
||||
checkTable<true>();
|
||||
auto client = getClient();
|
||||
client->tryRemoveChildrenRecursive(data_path, true);
|
||||
client->tryRemoveChildrenRecursive(zk_data_path, true);
|
||||
}
|
||||
|
||||
bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock)
|
||||
{
|
||||
zookeeper->removeChildrenRecursive(data_path);
|
||||
zookeeper->removeChildrenRecursive(zk_data_path);
|
||||
|
||||
bool completely_removed = false;
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(dropped_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(data_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(metadata_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zk_dropped_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zk_data_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zk_metadata_path, -1));
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
@ -557,7 +572,7 @@ bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::E
|
||||
{
|
||||
metadata_drop_lock->setAlreadyRemoved();
|
||||
completely_removed = true;
|
||||
LOG_INFO(log, "Metadata ({}) and data ({}) was successfully removed from ZooKeeper", metadata_path, data_path);
|
||||
LOG_INFO(log, "Metadata ({}) and data ({}) was successfully removed from ZooKeeper", zk_metadata_path, zk_data_path);
|
||||
break;
|
||||
}
|
||||
case ZNONODE:
|
||||
@ -578,25 +593,25 @@ void StorageKeeperMap::drop()
|
||||
auto client = getClient();
|
||||
|
||||
// we allow ZNONODE in case we got hardware error on previous drop
|
||||
if (auto code = client->tryRemove(table_path); code == Coordination::Error::ZNOTEMPTY)
|
||||
if (auto code = client->tryRemove(zk_table_path); code == Coordination::Error::ZNOTEMPTY)
|
||||
{
|
||||
throw zkutil::KeeperException(
|
||||
code, "{} contains children which shouldn't happen. Please DETACH the table if you want to delete it", table_path);
|
||||
code, "{} contains children which shouldn't happen. Please DETACH the table if you want to delete it", zk_table_path);
|
||||
}
|
||||
|
||||
std::vector<std::string> children;
|
||||
// if the tables_path is not found, some other table removed it
|
||||
// if there are children, some other tables are still using this path as storage
|
||||
if (auto code = client->tryGetChildren(tables_path, children);
|
||||
if (auto code = client->tryGetChildren(zk_tables_path, children);
|
||||
code != Coordination::Error::ZOK || !children.empty())
|
||||
return;
|
||||
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(tables_path, -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(dropped_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(dropped_lock_path, "", zkutil::CreateMode::Ephemeral));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(zk_tables_path, -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zk_dropped_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zk_dropped_lock_path, "", zkutil::CreateMode::Ephemeral));
|
||||
|
||||
auto code = client->tryMulti(ops, responses);
|
||||
|
||||
@ -613,7 +628,7 @@ void StorageKeeperMap::drop()
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(zk_dropped_lock_path, *client);
|
||||
dropTable(client, metadata_drop_lock);
|
||||
}
|
||||
|
||||
@ -623,6 +638,285 @@ NamesAndTypesList StorageKeeperMap::getVirtuals() const
|
||||
{std::string{version_column_name}, std::make_shared<DataTypeInt32>()}};
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr std::string_view backup_data_filename = "data.bin";
|
||||
constexpr std::string_view backup_data_location_filename = "data_location.bin";
|
||||
|
||||
class KeeperMapBackup : public IBackupEntriesLazyBatch, boost::noncopyable
|
||||
{
|
||||
public:
|
||||
KeeperMapBackup(
|
||||
const std::string & data_zookeeper_path_,
|
||||
const std::string & data_path_in_backup,
|
||||
const DiskPtr & temp_disk_,
|
||||
UInt64 max_compress_block_size_,
|
||||
std::shared_ptr<WithRetries> with_retries_)
|
||||
: data_zookeeper_path(data_zookeeper_path_)
|
||||
, temp_disk(temp_disk_)
|
||||
, max_compress_block_size(max_compress_block_size_)
|
||||
, with_retries(std::move(with_retries_))
|
||||
{
|
||||
file_path = fs::path(data_path_in_backup) / backup_data_filename;
|
||||
}
|
||||
|
||||
private:
|
||||
size_t getSize() const override
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
const String & getName(size_t i) const override
|
||||
{
|
||||
chassert(i == 0);
|
||||
return file_path;
|
||||
}
|
||||
|
||||
BackupEntries generate() override
|
||||
{
|
||||
temp_dir_owner.emplace(temp_disk);
|
||||
fs::path temp_dir = temp_dir_owner->getRelativePath();
|
||||
temp_disk->createDirectories(temp_dir);
|
||||
|
||||
auto data_file_path = temp_dir / fs::path{file_path}.filename();
|
||||
auto data_out_compressed = temp_disk->writeFile(data_file_path);
|
||||
auto data_out = std::make_unique<CompressedWriteBuffer>(*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size);
|
||||
std::vector<std::string> data_children;
|
||||
{
|
||||
auto holder = with_retries->createRetriesControlHolder("getKeeperMapDataKeys");
|
||||
holder.retries_ctl.retryLoop(
|
||||
[&, &zk = holder.faulty_zookeeper]()
|
||||
{
|
||||
with_retries->renewZooKeeper(zk);
|
||||
data_children = zk->getChildren(data_zookeeper_path);
|
||||
});
|
||||
}
|
||||
LOG_INFO(&Poco::Logger::get("BACKUPER"), "Got {} children", data_children.size());
|
||||
|
||||
const auto write_rows = [&](std::span<std::string> keys)
|
||||
{
|
||||
std::vector<std::string> keys_full_path;
|
||||
keys_full_path.reserve(data_children.size());
|
||||
|
||||
for (const auto & key : data_children)
|
||||
keys_full_path.push_back(data_zookeeper_path / key);
|
||||
|
||||
zkutil::ZooKeeper::MultiGetResponse data;
|
||||
auto holder = with_retries->createRetriesControlHolder("getKeeperMapDataKeys");
|
||||
holder.retries_ctl.retryLoop(
|
||||
[&, &zk = holder.faulty_zookeeper]
|
||||
{
|
||||
with_retries->renewZooKeeper(zk);
|
||||
data = zk->get(keys_full_path);
|
||||
});
|
||||
|
||||
for (size_t i = 0; i < keys.size(); ++i)
|
||||
{
|
||||
auto & child_data = data[i];
|
||||
if (child_data.error != Coordination::Error::ZOK)
|
||||
continue;
|
||||
|
||||
writeStringBinary(keys[i], *data_out);
|
||||
writeStringBinary(child_data.data, *data_out);
|
||||
}
|
||||
};
|
||||
|
||||
auto max_multiread_size = with_retries->getKeeperSettings().batch_size_for_keeper_multiread;
|
||||
|
||||
auto keys_it = data_children.begin();
|
||||
while (keys_it != data_children.end())
|
||||
{
|
||||
auto step = std::min(static_cast<UInt64>(std::distance(keys_it, data_children.end())), max_multiread_size);
|
||||
write_rows(std::span{keys_it, keys_it + step});
|
||||
keys_it = keys_it + step;
|
||||
}
|
||||
|
||||
data_out->finalize();
|
||||
data_out.reset();
|
||||
data_out_compressed->finalize();
|
||||
data_out_compressed.reset();
|
||||
|
||||
return {{file_path, std::make_shared<BackupEntryFromAppendOnlyFile>(temp_disk, data_file_path)}};
|
||||
}
|
||||
|
||||
fs::path data_zookeeper_path;
|
||||
DiskPtr temp_disk;
|
||||
std::optional<TemporaryFileOnDisk> temp_dir_owner;
|
||||
UInt64 max_compress_block_size;
|
||||
String file_path;
|
||||
std::shared_ptr<WithRetries> with_retries;
|
||||
};
|
||||
}
|
||||
|
||||
void StorageKeeperMap::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & /*partitions*/)
|
||||
{
|
||||
auto table_id = toString(getStorageID().uuid);
|
||||
|
||||
std::cout << "Backing up for path " << zk_root_path << " table id " << table_id << std::endl;
|
||||
auto coordination = backup_entries_collector.getBackupCoordination();
|
||||
coordination->addKeeperMapTable(zk_root_path, table_id, data_path_in_backup);
|
||||
|
||||
/// This task will be executed after all tables have registered their root zk path and the coordination is ready to
|
||||
/// assign each path to a single table only.
|
||||
auto post_collecting_task = [my_table_id = std::move(table_id), coordination, &backup_entries_collector, my_data_path_in_backup = data_path_in_backup, this]
|
||||
{
|
||||
auto path_with_data = coordination->getKeeperMapDataPath(zk_root_path);
|
||||
if (path_with_data == my_data_path_in_backup)
|
||||
{
|
||||
std::cout << "Will be backing up data for path " << zk_root_path << " table id " << my_table_id << std::endl;
|
||||
|
||||
auto temp_disk = backup_entries_collector.getContext()->getGlobalTemporaryVolume()->getDisk(0);
|
||||
auto max_compress_block_size = backup_entries_collector.getContext()->getSettingsRef().max_compress_block_size;
|
||||
|
||||
auto with_retries = std::make_shared<WithRetries>
|
||||
(
|
||||
&Poco::Logger::get(fmt::format("StorageKeeperMapBackup ({})", getStorageID().getNameForLogs())),
|
||||
[&] { return getClient(); },
|
||||
WithRetries::KeeperSettings::fromContext(backup_entries_collector.getContext()),
|
||||
[](WithRetries::FaultyKeeper &) {}
|
||||
);
|
||||
|
||||
backup_entries_collector.addBackupEntries(
|
||||
std::make_shared<KeeperMapBackup>(this->zk_data_path, path_with_data, temp_disk, max_compress_block_size, std::move(with_retries))
|
||||
->getBackupEntries());
|
||||
return;
|
||||
}
|
||||
|
||||
std::cout << "Not backing up data for path " << zk_root_path << " table id " << my_table_id << " writing only path with data " << path_with_data << std::endl;
|
||||
auto file_path = fs::path(my_data_path_in_backup) / backup_data_location_filename;
|
||||
backup_entries_collector.addBackupEntries({{file_path, std::make_shared<BackupEntryFromMemory>(path_with_data)}});
|
||||
};
|
||||
|
||||
backup_entries_collector.addPostTask(post_collecting_task);
|
||||
}
|
||||
|
||||
void StorageKeeperMap::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & /*partitions*/)
|
||||
{
|
||||
auto backup = restorer.getBackup();
|
||||
if (!backup->hasFiles(data_path_in_backup))
|
||||
return;
|
||||
|
||||
if (!restorer.getRestoreCoordination()->acquireInsertingDataForKeeperMap(zk_root_path))
|
||||
{
|
||||
/// Other table is already restoring the data for this Keeper path.
|
||||
/// Tables defined on the same path share data
|
||||
return;
|
||||
}
|
||||
|
||||
auto with_retries = std::make_shared<WithRetries>
|
||||
(
|
||||
&Poco::Logger::get(fmt::format("StorageKeeperMapRestore ({})", getStorageID().getNameForLogs())),
|
||||
[&] { return getClient(); },
|
||||
WithRetries::KeeperSettings::fromContext(restorer.getContext()),
|
||||
[](WithRetries::FaultyKeeper &) {}
|
||||
);
|
||||
|
||||
bool allow_non_empty_tables = restorer.isNonEmptyTableAllowed();
|
||||
if (!allow_non_empty_tables)
|
||||
{
|
||||
Coordination::Stat data_stats;
|
||||
|
||||
auto holder = with_retries->createRetriesControlHolder("checkKeeperMapData");
|
||||
holder.retries_ctl.retryLoop(
|
||||
[&, &zk = holder.faulty_zookeeper]()
|
||||
{
|
||||
with_retries->renewZooKeeper(zk);
|
||||
zk->get(zk_data_path, &data_stats);
|
||||
});
|
||||
|
||||
if (data_stats.numChildren != 0)
|
||||
RestorerFromBackup::throwTableIsNotEmpty(getStorageID());
|
||||
}
|
||||
|
||||
/// TODO: Should we backup and verify the table structure?
|
||||
|
||||
//auto temp_disk = restorer.getContext()->getGlobalTemporaryVolume()->getDisk(0);
|
||||
/// only 1 table should restore data for a single path
|
||||
restorer.addDataRestoreTask(
|
||||
[storage = std::static_pointer_cast<StorageKeeperMap>(shared_from_this()), backup, data_path_in_backup, with_retries, allow_non_empty_tables]
|
||||
{ storage->restoreDataImpl(backup, data_path_in_backup, with_retries, allow_non_empty_tables); });
|
||||
}
|
||||
|
||||
void StorageKeeperMap::restoreDataImpl(const BackupPtr & backup, const String & data_path_in_backup, std::shared_ptr<WithRetries> with_retries, bool allow_non_empty_tables)
|
||||
{
|
||||
auto table_id = toString(getStorageID().uuid);
|
||||
|
||||
std::cout << "Restoring into " << zk_root_path << " table id " << table_id << std::endl;
|
||||
|
||||
fs::path data_path_in_backup_fs = data_path_in_backup;
|
||||
|
||||
String data_file = data_path_in_backup_fs / backup_data_filename;
|
||||
|
||||
if (!backup->fileExists(data_file))
|
||||
{
|
||||
String data_location_file = data_path_in_backup_fs / "data_location.bin";
|
||||
if (!backup->fileExists(data_location_file))
|
||||
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Files {} or {} in backup are required to restore table", data_file, data_location_file);
|
||||
|
||||
auto in = backup->readFile(data_location_file);
|
||||
readStringUntilEOF(data_file, *in);
|
||||
|
||||
data_file = fs::path(data_file) / backup_data_filename;
|
||||
|
||||
if (!backup->fileExists(data_file))
|
||||
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "File {} in backup is required to restore table", data_file);
|
||||
}
|
||||
|
||||
/// should we store locally in temp file?
|
||||
auto in = backup->readFile(data_file);
|
||||
CompressedReadBuffer compressed_in{*in};
|
||||
fs::path data_path_fs(zk_data_path);
|
||||
|
||||
auto max_multi_size = with_retries->getKeeperSettings().batch_size_for_keeper_multi;
|
||||
|
||||
Coordination::Requests create_requests;
|
||||
const auto flush_create_requests = [&]
|
||||
{
|
||||
auto holder = with_retries->createRetriesControlHolder("addKeeperMapData");
|
||||
holder.retries_ctl.retryLoop(
|
||||
[&, &zk = holder.faulty_zookeeper]()
|
||||
{
|
||||
with_retries->renewZooKeeper(zk);
|
||||
zk->multi(create_requests);
|
||||
});
|
||||
};
|
||||
|
||||
while (!in->eof())
|
||||
{
|
||||
std::string key;
|
||||
std::string value;
|
||||
readStringBinary(key, compressed_in);
|
||||
readStringBinary(value, compressed_in);
|
||||
|
||||
/// if a table can be non empty we can have conflicting keys so we need to do single create for each row
|
||||
if (allow_non_empty_tables)
|
||||
{
|
||||
auto holder = with_retries->createRetriesControlHolder("addKeeperMapData");
|
||||
holder.retries_ctl.retryLoop(
|
||||
[&, &zk = holder.faulty_zookeeper]()
|
||||
{
|
||||
with_retries->renewZooKeeper(zk);
|
||||
zk->tryCreate(data_path_fs / key, value, zkutil::CreateMode::Persistent);
|
||||
});
|
||||
}
|
||||
/// otherwise we can do multi requests
|
||||
else
|
||||
{
|
||||
create_requests.push_back(zkutil::makeCreateRequest(data_path_fs / key, value, zkutil::CreateMode::Persistent));
|
||||
|
||||
if (create_requests.size() == max_multi_size)
|
||||
{
|
||||
flush_create_requests();
|
||||
create_requests.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!create_requests.empty())
|
||||
flush_create_requests();
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
|
||||
{
|
||||
std::lock_guard lock{zookeeper_mutex};
|
||||
@ -634,7 +928,7 @@ zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
|
||||
else
|
||||
zookeeper_client = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
|
||||
zookeeper_client->sync(root_path);
|
||||
zookeeper_client->sync(zk_root_path);
|
||||
}
|
||||
|
||||
return zookeeper_client;
|
||||
@ -642,12 +936,12 @@ zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
|
||||
|
||||
const std::string & StorageKeeperMap::dataPath() const
|
||||
{
|
||||
return data_path;
|
||||
return zk_data_path;
|
||||
}
|
||||
|
||||
std::string StorageKeeperMap::fullPathForKey(const std::string_view key) const
|
||||
{
|
||||
return fs::path(data_path) / key;
|
||||
return fs::path(zk_data_path) / key;
|
||||
}
|
||||
|
||||
UInt64 StorageKeeperMap::keysLimit() const
|
||||
@ -668,7 +962,7 @@ std::optional<bool> StorageKeeperMap::isTableValid() const
|
||||
auto client = getClient();
|
||||
|
||||
Coordination::Stat metadata_stat;
|
||||
auto stored_metadata_string = client->get(metadata_path, &metadata_stat);
|
||||
auto stored_metadata_string = client->get(zk_metadata_path, &metadata_stat);
|
||||
|
||||
if (metadata_stat.numChildren == 0)
|
||||
{
|
||||
@ -681,7 +975,7 @@ std::optional<bool> StorageKeeperMap::isTableValid() const
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
|
||||
root_path,
|
||||
zk_root_path,
|
||||
stored_metadata_string);
|
||||
table_is_valid = false;
|
||||
return;
|
||||
@ -689,9 +983,9 @@ std::optional<bool> StorageKeeperMap::isTableValid() const
|
||||
|
||||
// validate all metadata and data nodes are present
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeCheckRequest(table_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(data_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(dropped_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_table_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_data_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(zk_dropped_path, -1));
|
||||
|
||||
Coordination::Responses responses;
|
||||
client->tryMulti(requests, responses);
|
||||
@ -699,19 +993,19 @@ std::optional<bool> StorageKeeperMap::isTableValid() const
|
||||
table_is_valid = false;
|
||||
if (responses[0]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Table node ({}) is missing", table_path);
|
||||
LOG_ERROR(log, "Table node ({}) is missing", zk_table_path);
|
||||
return;
|
||||
}
|
||||
|
||||
if (responses[1]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Data node ({}) is missing", data_path);
|
||||
LOG_ERROR(log, "Data node ({}) is missing", zk_data_path);
|
||||
return;
|
||||
}
|
||||
|
||||
if (responses[2]->error == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Tables with root node {} are being dropped", root_path);
|
||||
LOG_ERROR(log, "Tables with root node {} are being dropped", zk_root_path);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -962,11 +1256,11 @@ StoragePtr create(const StorageFactory::Arguments & args)
|
||||
throw Exception(
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Storage KeeperMap requires 1-3 arguments:\n"
|
||||
"root_path: path in the Keeper where the values will be stored (required)\n"
|
||||
"zk_root_path: path in the Keeper where the values will be stored (required)\n"
|
||||
"keys_limit: number of keys allowed to be stored, 0 is no limit (default: 0)");
|
||||
|
||||
const auto root_path_node = evaluateConstantExpressionAsLiteral(engine_args[0], args.getLocalContext());
|
||||
auto root_path = checkAndGetLiteralArgument<std::string>(root_path_node, "root_path");
|
||||
const auto zk_root_path_node = evaluateConstantExpressionAsLiteral(engine_args[0], args.getLocalContext());
|
||||
auto zk_root_path = checkAndGetLiteralArgument<std::string>(zk_root_path_node, "zk_root_path");
|
||||
|
||||
UInt64 keys_limit = 0;
|
||||
if (engine_args.size() > 1)
|
||||
@ -985,7 +1279,7 @@ StoragePtr create(const StorageFactory::Arguments & args)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "StorageKeeperMap requires one column in primary key");
|
||||
|
||||
return std::make_shared<StorageKeeperMap>(
|
||||
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], root_path, keys_limit);
|
||||
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], zk_root_path, keys_limit);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,6 +10,9 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
|
||||
#include <Backups/IBackup.h>
|
||||
#include <Backups/WithRetries.h>
|
||||
|
||||
#include <span>
|
||||
|
||||
namespace DB
|
||||
@ -72,6 +75,9 @@ public:
|
||||
}
|
||||
bool supportsDelete() const override { return true; }
|
||||
|
||||
void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
|
||||
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
|
||||
|
||||
zkutil::ZooKeeperPtr getClient() const;
|
||||
const std::string & dataPath() const;
|
||||
std::string fullPathForKey(std::string_view key) const;
|
||||
@ -114,18 +120,20 @@ private:
|
||||
|
||||
std::optional<bool> isTableValid() const;
|
||||
|
||||
std::string root_path;
|
||||
void restoreDataImpl(const BackupPtr & backup, const String & data_path_in_backup, std::shared_ptr<WithRetries> with_retries, bool allow_non_empty_tables);
|
||||
|
||||
std::string zk_root_path;
|
||||
std::string primary_key;
|
||||
|
||||
std::string data_path;
|
||||
std::string zk_data_path;
|
||||
|
||||
std::string metadata_path;
|
||||
std::string zk_metadata_path;
|
||||
|
||||
std::string tables_path;
|
||||
std::string table_path;
|
||||
std::string zk_tables_path;
|
||||
std::string zk_table_path;
|
||||
|
||||
std::string dropped_path;
|
||||
std::string dropped_lock_path;
|
||||
std::string zk_dropped_path;
|
||||
std::string zk_dropped_lock_path;
|
||||
|
||||
std::string zookeeper_name;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user