From 53d57ffb52542583aac6c772797aa611d83890c8 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Tue, 24 Nov 2020 14:24:48 +0000 Subject: [PATCH] Part movement between shards Integrate query deduplication from #17348 --- src/Core/Settings.h | 6 +- src/Parsers/ParserAlterQuery.cpp | 5 + src/Storages/DataDestinationType.h | 1 + src/Storages/MergeTree/MergeTreeData.cpp | 18 + src/Storages/MergeTree/MergeTreeData.h | 10 + .../MergeTree/MergeTreeDataSelectExecutor.cpp | 24 +- .../MergeTree/MergeTreeDataSelectExecutor.h | 1 + src/Storages/MergeTree/MergeTreeSettings.h | 1 + .../PartMovesBetweenShardsOrchestrator.cpp | 411 ++++++++++++++++++ .../PartMovesBetweenShardsOrchestrator.h | 158 +++++++ src/Storages/MergeTree/PinnedPartUUIDs.cpp | 36 ++ src/Storages/MergeTree/PinnedPartUUIDs.h | 27 ++ .../ReplicatedMergeTreeBlockOutputStream.cpp | 4 +- .../ReplicatedMergeTreeBlockOutputStream.h | 2 +- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 20 + .../MergeTree/ReplicatedMergeTreeLogEntry.h | 13 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 25 ++ .../MergeTree/ReplicatedMergeTreeQueue.h | 4 + src/Storages/PartitionCommands.cpp | 10 +- src/Storages/PartitionCommands.h | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 178 ++++++++ src/Storages/StorageReplicatedMergeTree.h | 13 +- .../StorageSystemPartMovesBetweenShards.cpp | 135 ++++++ .../StorageSystemPartMovesBetweenShards.h | 27 ++ src/Storages/System/attachSystemTables.cpp | 2 + .../test_move_part_to_shard/__init__.py | 0 .../configs/merge_tree.xml | 6 + .../configs/remote_servers.xml | 26 ++ .../test_move_part_to_shard/test.py | 162 +++++++ .../configs/profiles.xml | 7 + .../test_query_deduplication/test.py | 9 +- 31 files changed, 1328 insertions(+), 14 deletions(-) create mode 100644 src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp create mode 100644 src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h create mode 100644 src/Storages/MergeTree/PinnedPartUUIDs.cpp create mode 100644 src/Storages/MergeTree/PinnedPartUUIDs.h create mode 100644 src/Storages/System/StorageSystemPartMovesBetweenShards.cpp create mode 100644 src/Storages/System/StorageSystemPartMovesBetweenShards.h create mode 100644 tests/integration/test_move_part_to_shard/__init__.py create mode 100644 tests/integration/test_move_part_to_shard/configs/merge_tree.xml create mode 100644 tests/integration/test_move_part_to_shard/configs/remote_servers.xml create mode 100644 tests/integration/test_move_part_to_shard/test.py create mode 100644 tests/integration/test_query_deduplication/configs/profiles.xml diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 51ea501b949..3489331acd4 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -440,7 +440,11 @@ class IColumn; \ M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \ M(UInt64, insert_shard_id, 0, "If non zero, when insert into a distributed table, the data will be inserted into the shard `insert_shard_id` synchronously. Possible values range from 1 to `shards_number` of corresponding distributed table", 0) \ - M(Bool, allow_experimental_query_deduplication, false, "Allow sending parts' UUIDs for a query in order to deduplicate data parts if any", 0) \ + \ + /** Experimental feature for moving data between shards. */ \ + \ + M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \ + M(Bool, experimental_query_deduplication_send_all_part_uuids, false, "If false only part UUIDs for currently moving parts are sent. If true all read part UUIDs are sent (useful only for testing).", 0) \ M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \ M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \ M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index de524342fb4..36dd81e129c 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -79,6 +79,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_to_disk("TO DISK"); ParserKeyword s_to_volume("TO VOLUME"); ParserKeyword s_to_table("TO TABLE"); + ParserKeyword s_to_shard("TO SHARD"); ParserKeyword s_delete("DELETE"); ParserKeyword s_update("UPDATE"); @@ -295,6 +296,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected return false; command->move_destination_type = DataDestinationType::TABLE; } + else if (s_to_shard.ignore(pos)) + { + command->move_destination_type = DataDestinationType::SHARD; + } else return false; diff --git a/src/Storages/DataDestinationType.h b/src/Storages/DataDestinationType.h index 05d1d89c2b5..4729019b5cb 100644 --- a/src/Storages/DataDestinationType.h +++ b/src/Storages/DataDestinationType.h @@ -10,6 +10,7 @@ enum class DataDestinationType VOLUME, TABLE, DELETE, + SHARD, }; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5d1400138b4..572df6486be 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -151,6 +151,7 @@ MergeTreeData::MergeTreeData( , log_name(table_id_.getNameForLogs()) , log(&Poco::Logger::get(log_name)) , storage_settings(std::move(storage_settings_)) + , pinned_part_uuids(std::make_shared()) , data_parts_by_info(data_parts_indexes.get()) , data_parts_by_state_and_info(data_parts_indexes.get()) , parts_mover(this) @@ -2920,6 +2921,11 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED); } +void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, const Context & /*query_context*/) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MOVE PARTITION TO SHARD is not supported by storage {}", getName()); +} + void MergeTreeData::fetchPartition( const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, @@ -2969,10 +2975,16 @@ Pipe MergeTreeData::alterPartition( break; case PartitionCommand::MoveDestinationType::TABLE: + { checkPartitionCanBeDropped(command.partition); String dest_database = query_context->resolveDatabase(command.to_database); auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); movePartitionToTable(dest_storage, command.partition, query_context); + } + break; + + case PartitionCommand::MoveDestinationType::SHARD: + movePartitionToShard(command.partition, command.part, command.move_destination_name, query_context); break; } } @@ -4054,6 +4066,12 @@ catch (...) tryLogCurrentException(log, __PRETTY_FUNCTION__); } +StorageMergeTree::PinnedPartUUIDsPtr MergeTreeData::getPinnedPartUUIDs() const +{ + std::lock_guard lock(pinned_part_uuids_mutex); + return pinned_part_uuids; +} + MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_) : parts_to_move(std::move(moving_parts_)), data(data_) { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 46c0014d9f7..eb7f479bf9b 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,8 @@ public: using DataPartStates = std::initializer_list; using DataPartStateVector = std::vector; + using PinnedPartUUIDsPtr = std::shared_ptr; + constexpr static auto FORMAT_VERSION_FILE_NAME = "format_version.txt"; constexpr static auto DETACHED_DIR_NAME = "detached"; @@ -781,6 +784,8 @@ public: /// Mutex for currently_moving_parts mutable std::mutex moving_parts_mutex; + PinnedPartUUIDsPtr getPinnedPartUUIDs() const; + /// Return main processing background job, like merge/mutate/fetch and so on virtual std::optional getDataProcessingJob() = 0; /// Return job to move parts between disks/volumes and so on. @@ -835,6 +840,10 @@ protected: /// Use get and set to receive readonly versions. MultiVersion storage_settings; + /// Used to determine which UUIDs to send to root query executor for deduplication. + mutable std::shared_mutex pinned_part_uuids_mutex; + PinnedPartUUIDsPtr pinned_part_uuids; + /// Work with data parts struct TagByInfo{}; @@ -970,6 +979,7 @@ protected: virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0; /// Makes sense only for replicated tables + virtual void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, const Context & query_context); virtual void fetchPartition( const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 8245364d87a..d94460f6d1c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -291,7 +291,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( auto index_stats = std::make_unique(); if (query_context->getSettingsRef().allow_experimental_query_deduplication) - selectPartsToReadWithUUIDFilter(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, query_context, part_filter_counters); + selectPartsToReadWithUUIDFilter( + parts, + part_values, + data.getPinnedPartUUIDs(), + minmax_idx_condition, + minmax_columns_types, partition_pruner, + max_block_numbers_to_read, + query_context, part_filter_counters); else selectPartsToRead(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, part_filter_counters); @@ -1922,6 +1929,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead( void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( MergeTreeData::DataPartsVector & parts, const std::unordered_set & part_values, + MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids, const std::optional & minmax_idx_condition, const DataTypes & minmax_columns_types, std::optional & partition_pruner, @@ -1929,6 +1937,8 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( ContextPtr query_context, PartFilterCounters & counters) const { + const Settings & settings = query_context.getSettings(); + /// process_parts prepare parts that have to be read for the query, /// returns false if duplicated parts' UUID have been met auto select_parts = [&] (MergeTreeData::DataPartsVector & selected_parts) -> bool @@ -1985,9 +1995,12 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( /// populate UUIDs and exclude ignored parts if enabled if (part->uuid != UUIDHelpers::Nil) { - auto result = temp_part_uuids.insert(part->uuid); - if (!result.second) - throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR); + if (settings.experimental_query_deduplication_send_all_part_uuids || pinned_part_uuids->contains(part->uuid)) + { + auto result = temp_part_uuids.insert(part->uuid); + if (!result.second) + throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR); + } } selected_parts.push_back(part); @@ -2011,7 +2024,8 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( /// Process parts that have to be read for a query. auto needs_retry = !select_parts(parts); - /// If any duplicated part UUIDs met during the first step, try to ignore them in second pass + /// If any duplicated part UUIDs met during the first step, try to ignore them in second pass. + /// This may happen when `prefer_localhost_replica` is set and "distributed" stage runs in the same process with "remote" stage. if (needs_retry) { LOG_DEBUG(log, "Found duplicate uuids locally, will retry part selection without them"); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index d7193fbfbfa..6cb1295271b 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -150,6 +150,7 @@ private: void selectPartsToReadWithUUIDFilter( MergeTreeData::DataPartsVector & parts, const std::unordered_set & part_values, + MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids, const std::optional & minmax_idx_condition, const DataTypes & minmax_columns_types, std::optional & partition_pruner, diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index f422f00f4dc..ced28974849 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -127,6 +127,7 @@ struct Settings; M(UInt64, max_concurrent_queries, 0, "Max number of concurrently executed queries related to the MergeTree table (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \ M(UInt64, min_marks_to_honor_max_concurrent_queries, 0, "Minimal number of marks to honor the MergeTree-level's max_concurrent_queries (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \ M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \ + M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \ \ /** Obsolete settings. Kept for backward compatibility only. */ \ M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \ diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp new file mode 100644 index 00000000000..9247f2e2fde --- /dev/null +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -0,0 +1,411 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PartMovesBetweenShardsOrchestrator::PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_) + : storage(storage_) + , zookeeper_path(storage.zookeeper_path) + , logger_name(storage.getStorageID().getFullTableName() + " (PartMovesBetweenShardsOrchestrator)") + , log(&Poco::Logger::get(logger_name)) + , entries_znode_path(zookeeper_path + "/part_moves_shard") +{ + task = storage.global_context.getSchedulePool().createTask(logger_name, [this]{ run(); }); +} + +void PartMovesBetweenShardsOrchestrator::run() +{ + if (need_stop) + return; + + auto sleep_ms = 10; + + try + { + sync(); + + if (step()) + sync(); + else + sleep_ms = 3 * 1000; + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + + + task->scheduleAfter(sleep_ms); +} + +void PartMovesBetweenShardsOrchestrator::shutdown() +{ + need_stop = true; + task->deactivate(); + LOG_TRACE(log, "PartMovesBetweenShardsOrchestrator thread finished"); +} + +void PartMovesBetweenShardsOrchestrator::sync() +{ + std::lock_guard lock(state_mutex); + + entries.clear(); + + auto zk = storage.getZooKeeper(); + + Strings task_names = zk->getChildren(entries_znode_path); + for (auto const & task_name : task_names) + { + PartMovesBetweenShardsOrchestrator::Entry e; + Coordination::Stat stat; + + e.znode_path = entries_znode_path + "/" + task_name; + + auto entry_str = zk->get(e.znode_path, &stat); + e.fromString(entry_str); + + e.version = stat.version; + e.znode_name = task_name; + + entries[task_name] = std::move(e); + } +} + +bool PartMovesBetweenShardsOrchestrator::step() +{ + if (!storage.is_leader) + return false; + + auto zk = storage.getZooKeeper(); + + std::optional entry_to_process; + + /// Try find an entry to process and copy it. + { + std::lock_guard lock(state_mutex); + + for (auto const & entry : entries | boost::adaptors::map_values) + { + if (entry.state.value == EntryState::DONE || entry.state.value == EntryState::CANCELLED) + continue; + + entry_to_process.emplace(entry); + break; + } + } + + if (!entry_to_process.has_value()) + return false; + + try + { + + /// Since some state transitions are long running (waiting on replicas acknowledgement we create this lock to avoid + /// other replicas trying to do the same work. All state transitions should be idempotent so is is safe to lose the + /// lock and have another replica retry. + /// + /// Note: This blocks all other entries from being executed. Technical debt. + auto entry_node_holder = zkutil::EphemeralNodeHolder::create(entry_to_process->znode_path + "/lock_holder", *zk, storage.replica_name); + } + catch (const Coordination::Exception & e) + { + if (e.code == Coordination::Error::ZNODEEXISTS) + { + LOG_DEBUG(log, "Task {} is being processed by another replica", entry_to_process->znode_name); + return false; + } + + throw; + } + + try + { + stepEntry(entry_to_process.value()); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + + Entry entry_copy = entry_to_process.value(); + entry_copy.last_exception_msg = getCurrentExceptionMessage(false); + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + + return false; + } + + return true; +} + +void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry) +{ + auto zk = storage.getZooKeeper(); + + switch (entry.state.value) + { + case EntryState::DONE: + break; + + case EntryState::CANCELLED: + break; + + case EntryState::TODO: + { + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::SYNC_SOURCE; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + break; + + case EntryState::SYNC_SOURCE: + { + { + /// Log entry. + Coordination::Requests ops; + ReplicatedMergeTreeLogEntryData log_entry; + log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; + log_entry.create_time = std::time(nullptr); + log_entry.source_replica = storage.replica_name; + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + zookeeper_path + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + storage.waitForAllReplicasToProcessLogEntry(log_entry, true); + } + + { + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::SYNC_DESTINATION; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + } + break; + + case EntryState::SYNC_DESTINATION: + { + { + /// Log entry. + Coordination::Requests ops; + ReplicatedMergeTreeLogEntryData log_entry; + log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS; + log_entry.create_time = std::time(nullptr); + log_entry.source_replica = storage.replica_name; + log_entry.source_shard = zookeeper_path; + + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + } + + { + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::DESTINATION_FETCH; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + } + break; + + case EntryState::DESTINATION_FETCH: + { + /// There is a chance that attach on destination will fail and this task will be left in the queue forever. + /// Make sure table structure doesn't change when there are part movements in progress. + /// + /// DESTINATION_FETCH is tricky from idempotency standpoint. We must ensure that no orchestrator + /// issues a CLONE_PART_FROM_SHARD entry after the source part is dropped. + /// + /// `makeSetRequest` on the entry is a sloppy mechanism for ensuring that the thread that makes the state + /// transition is the last one to issue a CLONE_PART_FROM_SHARD event. We assume here that if last event + /// was processed then all the ones prior to that succeeded as well. + /// + /// If we could somehow create just one entry in the log and record log entry name in the same + /// transaction so we could wait for it later the problem would be solved. + { + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(entry.znode_path, entry.toString(), entry.version)); + + /// Log entry. + ReplicatedMergeTreeLogEntryData log_entry; + log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD; + log_entry.create_time = std::time(nullptr); + log_entry.new_part_name = entry.part_name; + log_entry.source_replica = storage.replica_name; + log_entry.source_shard = zookeeper_path; + log_entry.block_id = toString(entry.task_uuid); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest( + entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + + /// Submit. + Coordination::Responses responses; + Coordination::Error rc = zk->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String log_znode_path = dynamic_cast(*responses.back()).path_created; + log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + } + + { + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::SOURCE_DROP_PRE_DELAY; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version + 1); + } + } + break; + + case EntryState::SOURCE_DROP_PRE_DELAY: + { + std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); + + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::SOURCE_DROP; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + break; + + case EntryState::SOURCE_DROP: + { + { + ReplicatedMergeTreeLogEntry log_entry; + if (storage.dropPart(zk, entry.part_name, log_entry,false, false)) + storage.waitForAllReplicasToProcessLogEntry(log_entry, true); + } + + { + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::SOURCE_DROP_POST_DELAY; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + } + break; + + case EntryState::SOURCE_DROP_POST_DELAY: + { + std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds)); + + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::REMOVE_UUID_PIN; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + break; + + case EntryState::REMOVE_UUID_PIN: + { + { + PinnedPartUUIDs src_pins; + PinnedPartUUIDs dst_pins; + + { + String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat); + src_pins.fromString(s); + } + + { + String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat); + dst_pins.fromString(s); + } + + src_pins.part_uuids.erase(entry.part_uuid); + dst_pins.part_uuids.erase(entry.part_uuid); + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version)); + ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version)); + + zk->multi(ops); + } + + /// State transition. + Entry entry_copy = entry; + entry_copy.state = EntryState::DONE; + entry_copy.update_time = std::time(nullptr); + zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version); + } + break; + } +} + +std::vector PartMovesBetweenShardsOrchestrator::getEntries() const +{ + std::lock_guard lock(state_mutex); + + auto res = std::vector(); + + for (const auto& e : entries) + res.push_back(e.second); + + return res; +} + +String PartMovesBetweenShardsOrchestrator::Entry::toString() const +{ + Poco::JSON::Object json; + + json.set(JSON_KEY_CREATE_TIME, DB::toString(create_time)); + json.set(JSON_KEY_UPDATE_TIME, DB::toString(update_time)); + json.set(JSON_KEY_TASK_UUID, DB::toString(task_uuid)); + json.set(JSON_KEY_PART_NAME, part_name); + json.set(JSON_KEY_PART_UUID, DB::toString(part_uuid)); + json.set(JSON_KEY_TO_SHARD, to_shard); + json.set(JSON_KEY_STATE, state.toString()); + json.set(JSON_KEY_LAST_EX_MSG, last_exception_msg); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + json.stringify(oss); + + return oss.str(); +} + +void PartMovesBetweenShardsOrchestrator::Entry::fromString(const String & buf) +{ + Poco::JSON::Parser parser; + auto json = parser.parse(buf).extract(); + + create_time = parseFromString(json->getValue(JSON_KEY_CREATE_TIME)); + update_time = parseFromString(json->getValue(JSON_KEY_UPDATE_TIME)); + task_uuid = parseFromString(json->getValue(JSON_KEY_TASK_UUID)); + part_name = json->getValue(JSON_KEY_PART_NAME); + part_uuid = parseFromString(json->getValue(JSON_KEY_PART_UUID)); + to_shard = json->getValue(JSON_KEY_TO_SHARD); + state.value = EntryState::fromString(json->getValue(JSON_KEY_STATE)); + last_exception_msg = json->getValue(JSON_KEY_LAST_EX_MSG); +} + +} diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h new file mode 100644 index 00000000000..efbb140c56a --- /dev/null +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h @@ -0,0 +1,158 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +class StorageReplicatedMergeTree; + +/// Cross shard part movement workflow orchestration. +class PartMovesBetweenShardsOrchestrator +{ +public: + struct EntryState + { + enum Value + { + TODO, + SYNC_SOURCE, + SYNC_DESTINATION, + DESTINATION_FETCH, + SOURCE_DROP_PRE_DELAY, + SOURCE_DROP, + SOURCE_DROP_POST_DELAY, + REMOVE_UUID_PIN, + DONE, + CANCELLED, + }; + + EntryState(): value(TODO) {} + EntryState(Value value_): value(value_) {} + + Value value; + + String toString() const + { + switch (value) + { + case TODO: return "TODO"; + case SYNC_SOURCE: return "SYNC_SOURCE"; + case SYNC_DESTINATION: return "SYNC_DESTINATION"; + case DESTINATION_FETCH: return "DESTINATION_FETCH"; + case SOURCE_DROP_PRE_DELAY: return "SOURCE_DROP_PRE_DELAY"; + case SOURCE_DROP: return "SOURCE_DROP"; + case SOURCE_DROP_POST_DELAY: return "SOURCE_DROP_POST_DELAY"; + case REMOVE_UUID_PIN: return "REMOVE_UUID_PIN"; + case DONE: return "DONE"; + case CANCELLED: return "CANCELLED"; + } + + throw Exception("Unknown EntryState: " + DB::toString(value), ErrorCodes::LOGICAL_ERROR); + } + + static EntryState::Value fromString(String in) + { + if (in == "TODO") return TODO; + else if (in == "SYNC_SOURCE") return SYNC_SOURCE; + else if (in == "SYNC_DESTINATION") return SYNC_DESTINATION; + else if (in == "DESTINATION_FETCH") return DESTINATION_FETCH; + else if (in == "SOURCE_DROP_PRE_DELAY") return SOURCE_DROP_PRE_DELAY; + else if (in == "SOURCE_DROP") return SOURCE_DROP; + else if (in == "SOURCE_DROP_POST_DELAY") return SOURCE_DROP_POST_DELAY; + else if (in == "REMOVE_UUID_PIN") return REMOVE_UUID_PIN; + else if (in == "DONE") return DONE; + else if (in == "CANCELLED") return CANCELLED; + else throw Exception("Unknown state: " + in, ErrorCodes::LOGICAL_ERROR); + } + }; + + struct Entry + { + friend class PartMovesBetweenShardsOrchestrator; + + time_t create_time = 0; + time_t update_time = 0; + + /// Globally unique identifier used for attaching parts on destination. + /// Using `part_uuid` results in part names being reused when moving parts back and forth. + UUID task_uuid; + + String part_name; + UUID part_uuid; + String to_shard; + + EntryState state; + + String last_exception_msg; + + String znode_name; + + private: + /// Transient value for CAS. + uint32_t version; + + String znode_path; + + public: + String toString() const; + void fromString(const String & buf); + }; + +private: + static constexpr auto JSON_KEY_CREATE_TIME = "create_time"; + static constexpr auto JSON_KEY_UPDATE_TIME = "update_time"; + static constexpr auto JSON_KEY_TASK_UUID = "task_uuid"; + static constexpr auto JSON_KEY_PART_NAME = "part_name"; + static constexpr auto JSON_KEY_PART_UUID = "part_uuid"; + static constexpr auto JSON_KEY_TO_SHARD = "to_shard"; + static constexpr auto JSON_KEY_STATE = "state"; + static constexpr auto JSON_KEY_LAST_EX_MSG = "last_exception"; + +public: + PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_); + + void start() { task->activateAndSchedule(); } + void wakeup() { task->schedule(); } + void shutdown(); + + void sync(); + + /// We could have one thread per Entry and worry about concurrency issues. + /// Or we could have a single thread trying to run one step at a time. + bool step(); + + std::vector getEntries() const; + +private: + void run(); + void stepEntry(const Entry & entry); + +private: + StorageReplicatedMergeTree & storage; + + String zookeeper_path; + String logger_name; + Poco::Logger * log = nullptr; + std::atomic need_stop{false}; + + BackgroundSchedulePool::TaskHolder task; + + mutable std::mutex state_mutex; + std::map entries; + +public: + String entries_znode_path; +}; + +} diff --git a/src/Storages/MergeTree/PinnedPartUUIDs.cpp b/src/Storages/MergeTree/PinnedPartUUIDs.cpp new file mode 100644 index 00000000000..1302d492c8c --- /dev/null +++ b/src/Storages/MergeTree/PinnedPartUUIDs.cpp @@ -0,0 +1,36 @@ +#include "PinnedPartUUIDs.h" +#include +#include +#include +#include +#include + +namespace DB +{ + +String PinnedPartUUIDs::toString() const +{ + std::vector vec(part_uuids.begin(), part_uuids.end()); + + Poco::JSON::Object json; + json.set(JSON_KEY_UUIDS, DB::toString(vec)); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + json.stringify(oss); + + return oss.str(); +} + +void PinnedPartUUIDs::fromString(const String & buf) +{ + Poco::JSON::Parser parser; + auto json = parser.parse(buf).extract(); + + std::vector vec = parseFromString>(json->getValue(PinnedPartUUIDs::JSON_KEY_UUIDS)); + + part_uuids.clear(); + std::copy(vec.begin(), vec.end(), std::inserter(part_uuids, part_uuids.begin())); +} + +} diff --git a/src/Storages/MergeTree/PinnedPartUUIDs.h b/src/Storages/MergeTree/PinnedPartUUIDs.h new file mode 100644 index 00000000000..a8f6c1fceda --- /dev/null +++ b/src/Storages/MergeTree/PinnedPartUUIDs.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +struct PinnedPartUUIDs +{ + std::set part_uuids; + Coordination::Stat stat{}; + + bool contains(const UUID & part_uuid) const + { + return part_uuids.contains(part_uuid); + } + + String toString() const; + void fromString(const String & buf); + +private: + static constexpr auto JSON_KEY_UUIDS = "part_uuids"; +}; + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index df4f9124980..29342cf4d9b 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -183,7 +183,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) } -void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part) +void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part, String block_id) { last_block_is_duplicate = false; @@ -199,7 +199,7 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta try { - commitPart(zookeeper, part, ""); + commitPart(zookeeper, part, block_id); PartLog::addNewPart(storage.getContext(), part, watch.elapsed()); } catch (...) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 6ea16491d64..314e6b92d87 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -40,7 +40,7 @@ public: void write(const Block & block) override; /// For ATTACHing existing data on filesystem. - void writeExistingPart(MergeTreeData::MutableDataPartPtr & part); + void writeExistingPart(MergeTreeData::MutableDataPartPtr & part, String block_id = ""); /// For proper deduplication in MaterializedViews bool lastBlockIsDuplicate() const diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 7d8ba0e4a30..88570ac33ef 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -53,6 +53,12 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const out << "get\n" << new_part_name; break; + case CLONE_PART_FROM_SHARD: + out << "clone_part_from_shard\n" + << new_part_name << "\n" + << "source_shard: " << source_shard; + break; + case ATTACH_PART: out << "attach\n" << new_part_name << "\n" << "part_checksum: " << part_checksum; @@ -141,6 +147,10 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const out << metadata_str; break; + case SYNC_PINNED_PART_UUIDS: + out << "sync_pinned_part_uuids\n"; + break; + default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown log entry type: {}", static_cast(type)); } @@ -305,6 +315,16 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) metadata_str.resize(metadata_size); in.readStrict(&metadata_str[0], metadata_size); } + else if (type_str == "sync_pinned_part_uuids") + { + type = SYNC_PINNED_PART_UUIDS; + } + else if (type_str == "clone_part_from_shard") + { + type = CLONE_PART_FROM_SHARD; + in >> new_part_name; + in >> "\nsource_shard: " >> source_shard; + } if (!trailing_newline_found) in >> "\n"; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 309120560e7..82748dcd4b0 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -41,6 +41,8 @@ struct ReplicatedMergeTreeLogEntryData REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones MUTATE_PART, /// Apply one or several mutations to the part. ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths + SYNC_PINNED_PART_UUIDS, /// Synchronization point for ensuring that all replicas have up to date in-memory state. + CLONE_PART_FROM_SHARD, /// Clone part from another shard. }; static String typeToString(Type type) @@ -56,6 +58,8 @@ struct ReplicatedMergeTreeLogEntryData case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE"; case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART"; case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA"; + case ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS: return "SYNC_PINNED_PART_UUIDS"; + case ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD: return "CLONE_PART_FROM_SHARD"; default: throw Exception("Unknown log entry type: " + DB::toString(type), ErrorCodes::LOGICAL_ERROR); } @@ -74,6 +78,7 @@ struct ReplicatedMergeTreeLogEntryData Type type = EMPTY; String source_replica; /// Empty string means that this entry was added to the queue immediately, and not copied from the log. + String source_shard; String part_checksum; /// Part checksum for ATTACH_PART, empty otherwise. @@ -150,6 +155,14 @@ struct ReplicatedMergeTreeLogEntryData return res; } + /// Doesn't produce any part. + if (type == SYNC_PINNED_PART_UUIDS) + return {}; + + /// Doesn't produce any part by itself. + if (type == CLONE_PART_FROM_SHARD) + return {}; + return {new_part_name}; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index ad41bbe1a08..fc848eb6be2 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1849,6 +1849,17 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate( merges_version = queue_.pullLogsToQueue(zookeeper); + { + /// We avoid returning here a version to be used in a lightweight transaction. + /// + /// When pinned parts set is changed a log entry is added to the queue in the same transaction. + /// The log entry serves as a synchronization point, and it also increments `merges_version`. + /// + /// If pinned parts are fetched after logs are pulled then we can safely say that it contains all locks up to `merges_version`. + String s = zookeeper->get(queue.zookeeper_path + "/pinned_part_uuids"); + pinned_part_uuids.fromString(s); + } + Coordination::GetResponse quorum_status_response = quorum_status_future.get(); if (quorum_status_response.error == Coordination::Error::ZOK) { @@ -1919,6 +1930,13 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts( for (const MergeTreeData::DataPartPtr & part : {left, right}) { + if (pinned_part_uuids.part_uuids.contains(part->uuid)) + { + if (out_reason) + *out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned"; + return false; + } + if (part->name == inprogress_quorum_part) { if (out_reason) @@ -2014,6 +2032,13 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart( const MergeTreeData::DataPartPtr & part, String * out_reason) const { + if (pinned_part_uuids.part_uuids.contains(part->uuid)) + { + if (out_reason) + *out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned"; + return false; + } + if (part->name == inprogress_quorum_part) { if (out_reason) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 00ef3ee7292..551ed9cb55c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -501,6 +502,9 @@ private: /// (loaded at some later time than prev_virtual_parts). std::unordered_map> committing_blocks; + /// List of UUIDs for parts that have their identity "pinned". + PinnedPartUUIDs pinned_part_uuids; + /// Quorum state taken at some later time than prev_virtual_parts. String inprogress_quorum_part; diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index f09f60887e8..45eeecd2e0c 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -13,6 +13,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + std::optional PartitionCommand::parse(const ASTAlterCommand * command_ast) { if (command_ast->type == ASTAlterCommand::DROP_PARTITION) @@ -59,8 +64,11 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.to_database = command_ast->to_database; res.to_table = command_ast->to_table; break; - default: + case DataDestinationType::SHARD: + res.move_destination_type = PartitionCommand::MoveDestinationType::SHARD; break; + case DataDestinationType::DELETE: + throw Exception("ALTER with this destination type is not handled. This is a bug.", ErrorCodes::LOGICAL_ERROR); } if (res.move_destination_type != PartitionCommand::MoveDestinationType::TABLE) res.move_destination_name = command_ast->move_destination_name; diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 9f89d44bd4e..1875f0f98ef 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -62,6 +62,7 @@ struct PartitionCommand DISK, VOLUME, TABLE, + SHARD, }; std::optional move_destination_type; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3b4a1ec4e16..31dee005e42 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -131,6 +132,7 @@ namespace ErrorCodes extern const int NO_SUCH_DATA_PART; extern const int INTERSERVER_SCHEME_DOESNT_MATCH; extern const int DUPLICATE_DATA_PART; + extern const int BAD_ARGUMENTS; } namespace ActionLocks @@ -263,6 +265,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , cleanup_thread(*this) , part_check_thread(*this) , restarting_thread(*this) + , part_moves_between_shards_orchestrator(*this) , allow_renaming(allow_renaming_) , replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size) { @@ -440,6 +443,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } createNewZooKeeperNodes(); + syncPinnedPartUUIDs(); } @@ -560,6 +564,10 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes() zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String()); zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String()); } + + /// Part movement. + zookeeper->createIfNotExists(zookeeper_path + "/part_moves_shard", String()); + zookeeper->createIfNotExists(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString()); } @@ -1199,6 +1207,27 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) } +void StorageReplicatedMergeTree::syncPinnedPartUUIDs() +{ + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &stat); + + std::lock_guard lock(pinned_part_uuids_mutex); + + /// Unsure whether or not this can be called concurrently. + if (pinned_part_uuids->stat.version < stat.version) + { + auto new_pinned_part_uuids = std::make_shared(); + new_pinned_part_uuids->fromString(s); + new_pinned_part_uuids->stat = stat; + + pinned_part_uuids = new_pinned_part_uuids; + } +} + + void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet * absent_replicas_paths) { @@ -1487,6 +1516,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) break; case LogEntry::ALTER_METADATA: return executeMetadataAlter(entry); + case LogEntry::SYNC_PINNED_PART_UUIDS: + syncPinnedPartUUIDs(); + return true; + case LogEntry::CLONE_PART_FROM_SHARD: + executeClonePartFromShard(entry); + return true; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected log entry type: {}", static_cast(entry.type)); } @@ -2479,6 +2514,47 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) } +void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entry) +{ + auto zookeeper = getZooKeeper(); + + Strings replicas = zookeeper->getChildren(entry.source_shard + "/replicas"); + std::shuffle(replicas.begin(), replicas.end(), thread_local_rng); + String replica = replicas.front(); + + LOG_INFO(log, "Will clone part from shard " + entry.source_shard + " and replica " + replica); + + MutableDataPartPtr part; + + { + auto metadata_snapshot = getInMemoryMetadataPtr(); + String source_replica_path = entry.source_shard + "/replicas/" + replica; + ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host")); + auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context); + auto user_password = global_context.getInterserverCredentials(); + String interserver_scheme = global_context.getInterserverScheme(); + + auto get_part = [&, address, timeouts, user_password, interserver_scheme]() + { + if (interserver_scheme != address.scheme) + throw Exception("Interserver schemes are different: '" + interserver_scheme + + "' != '" + address.scheme + "', can't fetch part from " + address.host, + ErrorCodes::LOGICAL_ERROR); + + return fetcher.fetchPart( + metadata_snapshot, entry.new_part_name, source_replica_path, + address.host, address.replication_port, + timeouts, user_password.first, user_password.second, interserver_scheme, true); + }; + + part = get_part(); + + ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); + output.writeExistingPart(part, "clone_part_from_shard_" + entry.block_id); + } +} + + void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper) { String source_path = zookeeper_path + "/replicas/" + source_replica; @@ -4005,6 +4081,8 @@ void StorageReplicatedMergeTree::startup() /// between the assignment of queue_task_handle and queueTask that use the queue_task_handle. background_executor.start(); startBackgroundMovesIfNeeded(); + + part_moves_between_shards_orchestrator.start(); } catch (...) { @@ -4035,6 +4113,7 @@ void StorageReplicatedMergeTree::shutdown() restarting_thread.shutdown(); background_executor.finish(); + part_moves_between_shards_orchestrator.shutdown(); { auto lock = queue.lockQueue(); @@ -5245,6 +5324,11 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica queue.getEntries(res); } +std::vector StorageReplicatedMergeTree::getPartMovesBetweenShardsEntries() +{ + return part_moves_between_shards_orchestrator.getEntries(); +} + time_t StorageReplicatedMergeTree::getAbsoluteDelay() const { time_t min_unprocessed_insert_time = 0; @@ -6432,6 +6516,100 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta cleanLastPartNode(partition_id); } +void StorageReplicatedMergeTree::movePartitionToShard( + const ASTPtr & partition, bool move_part, const String & to, const Context & /*query_context*/) +{ + /// This is a lightweight operation that only optimistically checks if it could succeed and queues tasks. + + if (!move_part) + throw Exception("MOVE PARTITION TO SHARD is not supported, use MOVE PART instead", ErrorCodes::NOT_IMPLEMENTED); + + if (normalizeZooKeeperPath(zookeeper_path) == normalizeZooKeeperPath(to)) + throw Exception("Source and destination are the same", ErrorCodes::BAD_ARGUMENTS); + + auto zookeeper = getZooKeeper(); + + String part_name = partition->as().value.safeGet(); + auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); + + auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed}); + if (!part) + throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found locally", part_name); + + if (part->uuid == UUIDHelpers::Nil) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part {} does not have an uuid assigned and it can't be moved between shards", part_name); + + + ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper); + + /// The following block is pretty much copy & paste from StorageReplicatedMergeTree::dropPart to avoid conflicts while this is WIP. + /// Extract it to a common method and re-use it before merging. + { + if (partIsLastQuorumPart(part->info)) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part {} is last inserted part with quorum in partition. Would not be able to drop", part_name); + } + + /// canMergeSinglePart is overlapping with dropPart, let's try to use the same code. + String out_reason; + if (!merge_pred.canMergeSinglePart(part, &out_reason)) + throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part is busy, reason: " + out_reason); + } + + { + /// Optimistic check that for compatible destination table structure. + checkTableStructure(to, getInMemoryMetadataPtr()); + } + + PinnedPartUUIDs src_pins; + PinnedPartUUIDs dst_pins; + + { + String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat); + src_pins.fromString(s); + } + + { + String s = zookeeper->get(to + "/pinned_part_uuids", &dst_pins.stat); + dst_pins.fromString(s); + } + + if (src_pins.part_uuids.contains(part->uuid) || dst_pins.part_uuids.contains(part->uuid)) + throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part {} has it's uuid ({}) already pinned.", part_name, toString(part->uuid)); + + src_pins.part_uuids.insert(part->uuid); + dst_pins.part_uuids.insert(part->uuid); + + PartMovesBetweenShardsOrchestrator::Entry part_move_entry; + part_move_entry.create_time = std::time(nullptr); + part_move_entry.update_time = part_move_entry.create_time; + part_move_entry.task_uuid = UUIDHelpers::generateV4(); + part_move_entry.part_name = part->name; + part_move_entry.part_uuid = part->uuid; + part_move_entry.to_shard = to; + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", merge_pred.getVersion())); /// Make sure no new events were added to the log. + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version)); + ops.emplace_back(zkutil::makeSetRequest(to + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version)); + ops.emplace_back(zkutil::makeCreateRequest( + part_moves_between_shards_orchestrator.entries_znode_path + "/task-", + part_move_entry.toString(), + zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + Coordination::Error rc = zookeeper->tryMulti(ops, responses); + zkutil::KeeperMultiException::check(rc, ops, responses); + + String task_znode_path = dynamic_cast(*responses.back()).path_created; + LOG_DEBUG(log, "Created task for part movement between shards at " + task_znode_path); + + /// Force refresh local state for making system table up to date after this operation succeeds. + part_moves_between_shards_orchestrator.sync(); + + // TODO: Add support for `replication_alter_partitions_sync`. +} + void StorageReplicatedMergeTree::getCommitPartOps( Coordination::Requests & ops, MutableDataPartPtr & part, diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c70556f40df..fa5b725588c 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -183,6 +184,8 @@ public: using LogEntriesData = std::vector; void getQueue(LogEntriesData & res, String & replica_name); + std::vector getPartMovesBetweenShardsEntries(); + /// Get replica delay relative to current time. time_t getAbsoluteDelay() const; @@ -252,6 +255,7 @@ private: friend struct ReplicatedMergeTreeLogEntry; friend class ScopedPartitionMergeLock; friend class ReplicatedMergeTreeQueue; + friend class PartMovesBetweenShardsOrchestrator; friend class MergeTreeData; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; @@ -305,7 +309,6 @@ private: DataPartsExchange::Fetcher fetcher; - /// When activated, replica is initialized and startup() method could exit Poco::Event startup_event; @@ -350,6 +353,8 @@ private: /// A thread that processes reconnection to ZooKeeper when the session expires. ReplicatedMergeTreeRestartingThread restarting_thread; + PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator; + /// True if replica was created for existing table with fixed granularity bool other_replicas_fixed_granularity = false; @@ -387,6 +392,10 @@ private: */ void checkParts(bool skip_sanity_checks); + /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor + /// to be used for deduplication. + void syncPinnedPartUUIDs(); + /** Check that the part's checksum is the same as the checksum of the same part on some other replica. * If no one has such a part, nothing checks. * Not very reliable: if two replicas add a part almost at the same time, no checks will occur. @@ -457,6 +466,7 @@ private: bool executeFetch(LogEntry & entry); bool executeReplaceRange(const LogEntry & entry); + void executeClonePartFromShard(const LogEntry & entry); /** Updates the queue. */ @@ -633,6 +643,7 @@ private: PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override; void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override; void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override; + void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, const Context & query_context) override; void fetchPartition( const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp b/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp new file mode 100644 index 00000000000..57946468bf3 --- /dev/null +++ b/src/Storages/System/StorageSystemPartMovesBetweenShards.cpp @@ -0,0 +1,135 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +NamesAndTypesList StorageSystemPartMovesBetweenShards::getNamesAndTypes() +{ + return { + /// Table properties. + { "database", std::make_shared() }, + { "table", std::make_shared() }, + + /// Constant element properties. + { "task_name", std::make_shared() }, + { "task_uuid", std::make_shared() }, + { "create_time", std::make_shared() }, + { "part_name", std::make_shared() }, + { "part_uuid", std::make_shared() }, + { "to_shard", std::make_shared() }, + + /// Processing status of item. + { "update_time", std::make_shared() }, + { "state", std::make_shared() }, + { "num_tries", std::make_shared() }, + { "last_exception", std::make_shared() }, + }; +} + + +void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const +{ + const auto access = context.getAccess(); + const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); + + std::map> replicated_tables; + for (const auto & db : DatabaseCatalog::instance().getDatabases()) + { + /// Check if database can contain replicated tables + if (!db.second->canContainMergeTreeTables()) + continue; + + const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); + + for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + const auto & table = iterator->table(); + if (!table) + continue; + if (!dynamic_cast(table.get())) + continue; + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) + continue; + replicated_tables[db.first][iterator->name()] = table; + } + } + + + MutableColumnPtr col_database_mut = ColumnString::create(); + MutableColumnPtr col_table_mut = ColumnString::create(); + + for (auto & db : replicated_tables) + { + for (auto & table : db.second) + { + col_database_mut->insert(db.first); + col_table_mut->insert(table.first); + } + } + + ColumnPtr col_database_to_filter = std::move(col_database_mut); + ColumnPtr col_table_to_filter = std::move(col_table_mut); + + /// Determine what tables are needed by the conditions in the query. + { + Block filtered_block + { + { col_database_to_filter, std::make_shared(), "database" }, + { col_table_to_filter, std::make_shared(), "table" }, + }; + + VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context); + + if (!filtered_block.rows()) + return; + + col_database_to_filter = filtered_block.getByName("database").column; + col_table_to_filter = filtered_block.getByName("table").column; + } + + for (size_t i = 0, tables_size = col_database_to_filter->size(); i < tables_size; ++i) + { + String database = (*col_database_to_filter)[i].safeGet(); + String table = (*col_table_to_filter)[i].safeGet(); + + auto moves = dynamic_cast(*replicated_tables[database][table]).getPartMovesBetweenShardsEntries(); + + for (auto & entry : moves) + { + size_t col_num = 0; + + /// Table properties. + res_columns[col_num++]->insert(database); + res_columns[col_num++]->insert(table); + + /// Constant element properties. + res_columns[col_num++]->insert(entry.znode_name); + res_columns[col_num++]->insert(entry.task_uuid); + res_columns[col_num++]->insert(entry.create_time); + res_columns[col_num++]->insert(entry.part_name); + res_columns[col_num++]->insert(entry.part_uuid); + res_columns[col_num++]->insert(entry.to_shard); + + /// Processing status of item. + res_columns[col_num++]->insert(entry.update_time); + res_columns[col_num++]->insert(entry.state.toString()); + res_columns[col_num++]->insert(0); + res_columns[col_num++]->insert(entry.last_exception_msg); + } + } +} + +} diff --git a/src/Storages/System/StorageSystemPartMovesBetweenShards.h b/src/Storages/System/StorageSystemPartMovesBetweenShards.h new file mode 100644 index 00000000000..1d0e7e40037 --- /dev/null +++ b/src/Storages/System/StorageSystemPartMovesBetweenShards.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +class Context; + + +class StorageSystemPartMovesBetweenShards final : public ext::shared_ptr_helper, public IStorageSystemOneBlock +{ + friend struct ext::shared_ptr_helper; +public: + std::string getName() const override { return "SystemShardMoves"; } + + static NamesAndTypesList getNamesAndTypes(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 673cf671548..3f3c19bb663 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -144,6 +145,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) attach(system_database, "graphite_retentions"); attach(system_database, "macros"); attach(system_database, "replicated_fetches"); + attach(system_database, "part_moves_between_shards"); if (has_zookeeper) attach(system_database, "zookeeper"); diff --git a/tests/integration/test_move_part_to_shard/__init__.py b/tests/integration/test_move_part_to_shard/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_move_part_to_shard/configs/merge_tree.xml b/tests/integration/test_move_part_to_shard/configs/merge_tree.xml new file mode 100644 index 00000000000..d4b61fa08a7 --- /dev/null +++ b/tests/integration/test_move_part_to_shard/configs/merge_tree.xml @@ -0,0 +1,6 @@ + + + 1 + 3 + + diff --git a/tests/integration/test_move_part_to_shard/configs/remote_servers.xml b/tests/integration/test_move_part_to_shard/configs/remote_servers.xml new file mode 100644 index 00000000000..4812d84a918 --- /dev/null +++ b/tests/integration/test_move_part_to_shard/configs/remote_servers.xml @@ -0,0 +1,26 @@ + + + + + + s0r0 + 9000 + + + s0r1 + 9000 + + + + + s1r0 + 9000 + + + s1r1 + 9000 + + + + + diff --git a/tests/integration/test_move_part_to_shard/test.py b/tests/integration/test_move_part_to_shard/test.py new file mode 100644 index 00000000000..eda0cba95e2 --- /dev/null +++ b/tests/integration/test_move_part_to_shard/test.py @@ -0,0 +1,162 @@ +import random +import time + +import pytest + +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV + +cluster = ClickHouseCluster(__file__) + +s0r0 = cluster.add_instance( + 's0r0', + main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + with_zookeeper=True) + +s0r1 = cluster.add_instance( + 's0r1', + main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + with_zookeeper=True) + +s1r0 = cluster.add_instance( + 's1r0', + main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + with_zookeeper=True) + +s1r1 = cluster.add_instance( + 's1r1', + main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + with_zookeeper=True) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_move(started_cluster): + for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]): + for replica_ix, r in enumerate(rs): + r.query(""" + CREATE TABLE t(v UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/t', '{}') + ORDER BY tuple() + """.format(shard_ix, replica_ix)) + + s0r0.query("SYSTEM STOP MERGES t") + + s0r0.query("INSERT INTO t VALUES (1)") + s0r0.query("INSERT INTO t VALUES (2)") + + assert "2" == s0r0.query("SELECT count() FROM t").strip() + assert "0" == s1r0.query("SELECT count() FROM t").strip() + + s0r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/t'") + + print(s0r0.query("SELECT * FROM system.part_moves_between_shards")) + + s0r0.query("SYSTEM START MERGES t") + s0r0.query("OPTIMIZE TABLE t FINAL") + + while True: + time.sleep(3) + + print(s0r0.query("SELECT * FROM system.part_moves_between_shards")) + + # Eventually. + if "DONE" == s0r0.query("SELECT state FROM system.part_moves_between_shards").strip(): + break + + for n in [s0r0, s0r1]: + assert "1" == n.query("SELECT count() FROM t").strip() + + for n in [s1r0, s1r1]: + assert "1" == n.query("SELECT count() FROM t").strip() + + # Move part back + s1r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/t'") + + while True: + time.sleep(3) + + print(s1r0.query("SELECT * FROM system.part_moves_between_shards")) + + # Eventually. + if "DONE" == s1r0.query("SELECT state FROM system.part_moves_between_shards").strip(): + break + + for n in [s0r0, s0r1]: + assert "2" == n.query("SELECT count() FROM t").strip() + + for n in [s1r0, s1r1]: + assert "0" == n.query("SELECT count() FROM t").strip() + + # Cleanup. + for n in started_cluster.instances.values(): + n.query("DROP TABLE t SYNC") + + +def test_deduplication_while_move(started_cluster): + for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]): + for replica_ix, r in enumerate(rs): + r.query(""" + CREATE TABLE t(v UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/t', '{}') + ORDER BY tuple() + """.format(shard_ix, replica_ix)) + + r.query(""" + CREATE TABLE t_d AS t + ENGINE Distributed('test_cluster', '', t) + """) + + s0r0.query("SYSTEM STOP MERGES t") + + s0r0.query("INSERT INTO t VALUES (1)") + s0r0.query("INSERT INTO t VALUES (2)") + s0r1.query("SYSTEM SYNC REPLICA t") + + assert "2" == s0r0.query("SELECT count() FROM t").strip() + assert "0" == s1r0.query("SELECT count() FROM t").strip() + + s0r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/t'") + s0r0.query("SYSTEM START MERGES t") + + expected = """ +1 +2 +""" + + # Verify that we get consisntent result at all times while the part is moving from one shard to another. + while "DONE" != s0r0.query("SELECT state FROM system.part_moves_between_shards ORDER BY create_time DESC LIMIT 1").strip(): + n = random.choice(list(started_cluster.instances.values())) + + assert TSV(n.query("SELECT * FROM t_d ORDER BY v", settings={ + "allow_experimental_query_deduplication": 1 + })) == TSV(expected) + + +def test_move_not_permitted(started_cluster): + for ix, n in enumerate([s0r0, s1r0]): + n.query(""" + CREATE TABLE not_permitted(v_{} UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/not_permitted', 'r') + ORDER BY tuple() + """.format(ix, ix)) + + s0r0.query("INSERT INTO not_permitted VALUES (1)") + + with pytest.raises(QueryRuntimeException) as exc: + s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted'") + + assert "DB::Exception: Table columns structure in ZooKeeper is different from local table structure." in str(exc.value) + + with pytest.raises(QueryRuntimeException) as exc: + s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted'") + + assert "DB::Exception: Source and destination are the same" in str(exc.value) diff --git a/tests/integration/test_query_deduplication/configs/profiles.xml b/tests/integration/test_query_deduplication/configs/profiles.xml new file mode 100644 index 00000000000..4c15556ab99 --- /dev/null +++ b/tests/integration/test_query_deduplication/configs/profiles.xml @@ -0,0 +1,7 @@ + + + + 1 + + + \ No newline at end of file diff --git a/tests/integration/test_query_deduplication/test.py b/tests/integration/test_query_deduplication/test.py index 8d935b98579..1088b539414 100644 --- a/tests/integration/test_query_deduplication/test.py +++ b/tests/integration/test_query_deduplication/test.py @@ -11,15 +11,18 @@ cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( 'node1', - main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml']) + main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'], + user_configs=['configs/profiles.xml']) node2 = cluster.add_instance( 'node2', - main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml']) + main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'], + user_configs=['configs/profiles.xml']) node3 = cluster.add_instance( 'node3', - main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml']) + main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'], + user_configs=['configs/profiles.xml']) @pytest.fixture(scope="module")