diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 6d52afe5314..693f128d8dd 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -305,8 +305,8 @@ public: /** Notify engine about updated dependencies for this storage. */ virtual void updateDependencies() {} - /// Returns data path if storage supports it, empty string otherwise. - virtual String getDataPath() const { return {}; } + /// Returns data path if storage supports it, empty vector otherwise. + virtual Strings getDataPaths() const { return {}; } /// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none. virtual ASTPtr getPartitionKeyAST() const { return nullptr; } diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index cb97937cd3b..5dfb709dcb4 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -200,8 +200,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; - String part_path = data.getFullPathForPart(0); - String absolute_part_path = part_path + relative_part_path + "/"; ///@TODO_IGR ASK path for file + auto reservation = data.reserveSpaceForPart(0); ///@TODO_IGR ASK What size should be there? + String part_path = reservation->getPath(); + String absolute_part_path = part_path + relative_part_path + "/"; Poco::File part_file(absolute_part_path); if (part_file.exists()) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 7ad13ad6338..f1e8ca7a104 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -150,8 +150,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_ data.settings.max_bytes_to_merge_at_max_space_in_pool, static_cast(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge); - ///@TODO_IGR ASK what path? - return std::min(max_size, static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_paths[0]) / DISK_USAGE_COEFFICIENT_TO_SELECT)); + return std::min(max_size, static_cast(DiskSpaceMonitor::getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 67f89dd36e2..7fe20c8bdb5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -162,7 +162,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa part_name = new_part_info.getPartName(); size_t expected_size = block.bytes(); - String part_absolute_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK expected size + auto reservation = data.reserveSpaceForPart(expected_size); ///@TODO_IGR ASK expected size + String part_absolute_path = reservation->getPath(); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, part_absolute_path, part_name, new_part_info); new_data_part->partition = std::move(partition); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index a745af1e414..6c66e9b57aa 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -233,7 +233,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) zk_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true); checkDataPart( - storage.data.getFullPath() + part_name, + storage.data.getFullPaths()[0] + part_name, ///@TODO_IGR ASK what should we do there? Should we check all paths in checkDataPart? storage.data.index_granularity, true, storage.data.primary_key_data_types, diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 1caae87aac9..92cff35e15d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -32,8 +32,8 @@ void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & for (const auto & part : parts) { - current_parts.add(part->name); - virtual_parts.add(part->name); + current_parts.add("/", part->name); + virtual_parts.add("/", part->name); } } @@ -122,7 +122,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked( { for (const String & virtual_part_name : entry->getVirtualPartNames()) { - virtual_parts.add(virtual_part_name); + virtual_parts.add("/", virtual_part_name); updateMutationsPartsToDo(virtual_part_name, /* add = */ true); } @@ -192,13 +192,13 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( { for (const String & virtual_part_name : entry->getVirtualPartNames()) { - Strings replaced_parts; - current_parts.add(virtual_part_name, &replaced_parts); + ActiveDataPartSet::PartPathNames replaced_parts; + current_parts.add(String("/"), virtual_part_name, &replaced_parts); /// Each part from `replaced_parts` should become Obsolete as a result of executing the entry. /// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion() - for (const String & replaced_part_name : replaced_parts) - updateMutationsPartsToDo(replaced_part_name, /* add = */ false); + for (const auto & replaced_part : replaced_parts) + updateMutationsPartsToDo(replaced_part.name, /* add = */ false); } String drop_range_part_name; @@ -539,9 +539,9 @@ static size_t countPartsToMutate( /// because they are not consecutive in `parts`. MergeTreePartInfo covering_part_info( partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER); - for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info)) + for (const auto & covered_part : parts.getPartsCoveredBy(covering_part_info)) { - auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion()); + auto part_info = MergeTreePartInfo::fromPartName(covered_part.name, parts.getFormatVersion()); if (part_info.getDataVersion() < block_num) ++count; } @@ -1306,7 +1306,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name) { std::lock_guard lock(state_mutex); - virtual_parts.add(part_name); + virtual_parts.add("/", part_name); } @@ -1572,7 +1572,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()( return false; } - if (prev_virtual_parts.getContainingPart(part->info).empty()) + if (prev_virtual_parts.getContainingPart(part->info).name.empty()) { if (out_reason) *out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet"; @@ -1610,7 +1610,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()( { /// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer /// and it is guaranteed that it will contain all merges assigned before this object is constructed. - String containing_part = queue.virtual_parts.getContainingPart(part->info); + String containing_part = queue.virtual_parts.getContainingPart(part->info).name; if (containing_part != part->name) { if (out_reason) @@ -1625,7 +1625,11 @@ bool ReplicatedMergeTreeMergePredicate::operator()( left->info.partition_id, left_max_block + 1, right_min_block - 1, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER); - Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info); + auto tmp = queue.virtual_parts.getPartsCoveredBy(gap_part_info); + Strings covered; + for (auto & elem : tmp) { + covered.push_back(elem.name); + } if (!covered.empty()) { if (out_reason) @@ -1667,7 +1671,7 @@ std::optional ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio std::lock_guard lock(queue.state_mutex); - if (queue.virtual_parts.getContainingPart(part->info) != part->name) + if (queue.virtual_parts.getContainingPart(part->info).name != part->name) return {}; auto in_partition = queue.mutations_by_partition.find(part->info.partition_id); diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index 404a9a7265e..8bcaf04f7af 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -88,7 +88,7 @@ public: void startup() override; void shutdown() override; - String getDataPath() const override { return path; } + Strings getDataPaths() const override { return {path}; } const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; } const String & getShardingKeyColumnName() const { return sharding_key_column_name; } diff --git a/dbms/src/Storages/StorageFile.h b/dbms/src/Storages/StorageFile.h index eb74ad615a7..7d9742b5f4a 100644 --- a/dbms/src/Storages/StorageFile.h +++ b/dbms/src/Storages/StorageFile.h @@ -47,7 +47,7 @@ public: void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; - String getDataPath() const override { return path; } + Strings getDataPaths() const override { return {path}; } protected: friend class StorageFileBlockInputStream; diff --git a/dbms/src/Storages/StorageLog.h b/dbms/src/Storages/StorageLog.h index cf0d07a3bfe..970e1ce8641 100644 --- a/dbms/src/Storages/StorageLog.h +++ b/dbms/src/Storages/StorageLog.h @@ -44,7 +44,7 @@ public: std::string full_path() const { return path + escapeForFileName(name) + '/';} - String getDataPath() const override { return full_path(); } + Strings getDataPaths() const override { return {full_path()}; } protected: /** Attach the table with the appropriate name, along the appropriate path (with / at the end), diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index fd63053c78a..beab5428466 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -283,10 +283,10 @@ StoragePtr StorageMaterializedView::tryGetTargetTable() const return global_context.tryGetTable(target_database_name, target_table_name); } -String StorageMaterializedView::getDataPath() const +Strings StorageMaterializedView::getDataPaths() const { if (auto table = tryGetTargetTable()) - return table->getDataPath(); + return table->getDataPaths(); return {}; } diff --git a/dbms/src/Storages/StorageMaterializedView.h b/dbms/src/Storages/StorageMaterializedView.h index 665f6243a32..39b8d21f0c8 100644 --- a/dbms/src/Storages/StorageMaterializedView.h +++ b/dbms/src/Storages/StorageMaterializedView.h @@ -57,7 +57,7 @@ public: size_t max_block_size, unsigned num_streams) override; - String getDataPath() const override; + Strings getDataPaths() const override; private: String select_database_name; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 84c85553d79..a873b650db8 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -62,10 +62,10 @@ StorageMergeTree::StorageMergeTree( const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_, bool has_force_restore_data_flag) - : path(path_), database_name(database_name_), table_name(table_name_), full_paths{path + escapeForFileName(table_name) + '/', "/mnt/data/Data2/" + escapeForFileName(table_name) + '/'}, + : path(path_), database_name(database_name_), table_name(table_name_), global_context(context_), background_pool(context_.getBackgroundPool()), data(database_name, table_name, - Schema(std::vector{full_paths}), columns_, indices_, ///@TODO_IGR generate Schema from config + Schema(std::vector{{path, "/mnt/data/Data2/"}}), columns_, indices_, ///@TODO_IGR generate Schema from config context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, merging_params_, settings_, false, attach), reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()), @@ -183,9 +183,9 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne data.setPath(new_full_path); - path = new_path_to_db; + path = new_path_to_db; ///@TODO_IGR ASK path? table_name? table_name = new_table_name; - full_paths = {new_full_path}; ///TODO_IGR ASK rename? + //full_paths = {new_full_path}; ///@TODO_IGR ASK rename? /// NOTE: Logger names are not updated. } @@ -336,6 +336,7 @@ public: void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) { + const auto full_paths = data.getFullPaths(); ///@TODO_IGR ASK What expected size of mutated part? what size should we reserve? MergeTreeMutationEntry entry(commands, full_paths[0], data.insert_increment.get()); ///@TODO_IGR ASK PATH TO ENTRY String file_name; { @@ -429,18 +430,22 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) void StorageMergeTree::loadMutations() { Poco::DirectoryIterator end; - for (auto it = Poco::DirectoryIterator(full_paths[0]); it != end; ++it) ///@TODO_IGR ASK MUTATIONS FROM ALL DISKS? + const auto full_paths = data.getFullPaths(); + for (const String & path : full_paths) { - if (startsWith(it.name(), "mutation_")) + for (auto it = Poco::DirectoryIterator(path); it != end; ++it) { - MergeTreeMutationEntry entry(full_paths[0], it.name()); - Int64 block_number = entry.block_number; - auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry)); - current_mutations_by_version.emplace(block_number, insertion.first->second); - } - else if (startsWith(it.name(), "tmp_mutation_")) - { - it->remove(); + if (startsWith(it.name(), "mutation_")) + { + MergeTreeMutationEntry entry(path, it.name()); + Int64 block_number = entry.block_number; + auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry)); + current_mutations_by_version.emplace(block_number, insertion.first->second); + } + else if (startsWith(it.name(), "tmp_mutation_")) + { + it->remove(); + } } } @@ -953,20 +958,23 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par String source_dir = "detached/"; + const auto full_paths = data.getFullPaths(); + /// Let's make a list of parts to add. ActiveDataPartSet::PartPathNames parts; if (attach_part) { for (const String & full_path : full_paths) { - parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id}); ///@TODO_IGR ASK + parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id}); } } else { LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); + ///@TODO_IGR ASK ActiveDataPartSet without path? Is it possible here? ActiveDataPartSet active_parts(data.format_version); for (const String & full_path : full_paths) { - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) ///@TODO_IGR + for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) { const String & name = it.name(); MergeTreePartInfo part_info; @@ -976,7 +984,7 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par continue; } LOG_DEBUG(log, "Found part " << name); - active_parts.add(full_path, name); + active_parts.add(full_path, name); ///@TODO_IGR ASK full_path? full_path + detached? } } LOG_DEBUG(log, active_parts.size() << " of them are active"); diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index a9d4b08098c..88fd90780e5 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -90,7 +90,7 @@ public: MergeTreeData & getData() { return data; } const MergeTreeData & getData() const { return data; } - String getDataPath() const override { return full_paths[0]; } ///@TODO_IGR ASK WHAT PATH + Strings getDataPaths() const override { return data.getFullPaths(); } ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; } ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); } @@ -107,7 +107,7 @@ private: String path; String database_name; String table_name; - Strings full_paths; +// Strings full_paths; Context global_context; BackgroundProcessingPool & background_pool; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c51070c4b44..e237f6fe71e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -215,7 +215,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name, table_name)), replica_name(global_context.getMacros()->expand(replica_name_, database_name, table_name)), data(database_name, table_name, - full_path, columns_, indices_, + Schema(std::vector{{full_path}}), columns_, indices_, context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, merging_params_, settings_, true, attach, [this] (const std::string & name) { enqueuePartForCheck(name); }), @@ -1046,7 +1046,10 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts); /// Can throw an exception. - DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge); + DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::tryToReserve(full_path, estimated_space_for_merge); + if (!reserved_space) { + throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX + } auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY); @@ -1176,7 +1179,10 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation); /// Can throw an exception. - DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result); + DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::tryToReserve(full_path, estimated_space_for_result); + if (!reserved_space) { + throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX + } auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY); @@ -1202,7 +1208,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM try { - new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context); + new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands, *merge_entry, global_context, reserved_space.get()); data.renameTempPartAndReplace(new_part, nullptr, &transaction); try @@ -1694,7 +1700,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) if (part_desc->src_table_part) { /// It is clonable part - adding_parts_active_set.add(part_desc->new_part_name); + adding_parts_active_set.add(full_path, part_desc->new_part_name); part_name_to_desc.emplace(part_desc->new_part_name, part_desc); continue; } @@ -1727,14 +1733,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) part_desc->found_new_part_info = MergeTreePartInfo::fromPartName(found_part_name, data.format_version); part_desc->replica = replica; - adding_parts_active_set.add(part_desc->found_new_part_name); + adding_parts_active_set.add(full_path, part_desc->found_new_part_name); part_name_to_desc.emplace(part_desc->found_new_part_name, part_desc); } /// Check that we could cover whole range for (PartDescriptionPtr & part_desc : parts_to_add) { - if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty()) + if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).name.empty()) { throw Exception("Not found part " + part_desc->new_part_name + " (or part covering it) neither source table neither remote replicas" , ErrorCodes::NO_REPLICA_HAS_PART); @@ -1744,10 +1750,11 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) /// Filter covered parts PartDescriptions final_parts; { - Strings final_part_names = adding_parts_active_set.getParts(); + auto final_part_names = adding_parts_active_set.getParts(); - for (const String & final_part_name : final_part_names) + for (const auto & final_part : final_part_names) { + const auto & final_part_name = final_part.name; auto part_desc = part_name_to_desc[final_part_name]; if (!part_desc) throw Exception("There is no final part " + final_part_name + ". This is a bug", ErrorCodes::LOGICAL_ERROR); @@ -1925,12 +1932,18 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo } /// Add to the queue jobs to receive all the active parts that the reference/master replica has. - Strings parts = zookeeper->getChildren(source_path + "/parts"); + Strings parts_tmp = zookeeper->getChildren(source_path + "/parts"); + ActiveDataPartSet::PartPathNames parts; + for (const auto & elem : parts_tmp) { + parts.push_back(ActiveDataPartSet::PartPathName{"/", elem}); + } + ActiveDataPartSet active_parts_set(data.format_version, parts); - Strings active_parts = active_parts_set.getParts(); - for (const String & name : active_parts) + auto active_parts = active_parts_set.getParts(); + for (const auto & path_name : active_parts) { + const auto & name = path_name.name; LogEntry log_entry; log_entry.type = LogEntry::GET_PART; log_entry.source_replica = ""; @@ -3553,16 +3566,19 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool if (part_info.partition_id != partition_id) continue; LOG_DEBUG(log, "Found part " << name); - active_parts.add(name); + active_parts.add(full_path, name); part_names.insert(name); } LOG_DEBUG(log, active_parts.size() << " of them are active"); - parts = active_parts.getParts(); + auto tmp_parts = active_parts.getParts(); + for (auto & elem : tmp_parts) { + parts.push_back(elem.name); + } /// Inactive parts rename so they can not be attached in case of repeated ATTACH. for (const auto & name : part_names) { - String containing_part = active_parts.getContainingPart(name); + String containing_part = active_parts.getContainingPart(name).name; if (!containing_part.empty() && containing_part != name) Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name); } @@ -3574,7 +3590,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool for (const String & part : parts) { LOG_DEBUG(log, "Checking part " << part); - loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part)); + loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir, source_dir + part)); } ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here. @@ -4138,7 +4154,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const * Unreliable (there is a race condition) - such a partition may appear a little later. */ Poco::DirectoryIterator dir_end; - for (Poco::DirectoryIterator dir_it{data.getFullPath() + "detached/"}; dir_it != dir_end; ++dir_it) + for (Poco::DirectoryIterator dir_it{data.getFullPaths()[0] + "detached/"}; dir_it != dir_end; ++dir_it) { MergeTreePartInfo part_info; if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, data.format_version) @@ -4221,13 +4237,20 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const if (try_no >= query_context.getSettings().max_fetch_partition_retries_count) throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS); - Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts"); + Strings parts_tmp = getZooKeeper()->getChildren(best_replica_path + "/parts"); + ActiveDataPartSet::PartPathNames parts; + for (const auto & elem : parts_tmp) { + parts.push_back(ActiveDataPartSet::PartPathName{"/", elem}); + } ActiveDataPartSet active_parts_set(data.format_version, parts); Strings parts_to_fetch; if (missing_parts.empty()) { - parts_to_fetch = active_parts_set.getParts(); + auto tmp = active_parts_set.getParts(); + for (auto elem : tmp) { + parts_to_fetch.push_back(elem.name); + } /// Leaving only the parts of the desired partition. Strings parts_to_fetch_partition; @@ -4246,7 +4269,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const { for (const String & missing_part : missing_parts) { - String containing_part = active_parts_set.getContainingPart(missing_part); + String containing_part = active_parts_set.getContainingPart(missing_part).name; if (!containing_part.empty()) parts_to_fetch.push_back(containing_part); else diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index caa75000d70..3d2620df320 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -194,7 +194,7 @@ public: part_check_thread.enqueuePart(part_name, delay_to_check_seconds); } - String getDataPath() const override { return full_path; } + Strings getDataPaths() const override { return {full_path}; } ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; } ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); } diff --git a/dbms/src/Storages/StorageSet.h b/dbms/src/Storages/StorageSet.h index 0585dc271c6..aae37b8a52a 100644 --- a/dbms/src/Storages/StorageSet.h +++ b/dbms/src/Storages/StorageSet.h @@ -25,7 +25,7 @@ public: BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; - String getDataPath() const override { return path; } + Strings getDataPaths() const override { return {path}; } protected: StorageSetOrJoinBase( diff --git a/dbms/src/Storages/StorageStripeLog.h b/dbms/src/Storages/StorageStripeLog.h index 6489c82873e..e6438497e52 100644 --- a/dbms/src/Storages/StorageStripeLog.h +++ b/dbms/src/Storages/StorageStripeLog.h @@ -51,7 +51,7 @@ public: std::string full_path() const { return path + escapeForFileName(name) + '/';} - String getDataPath() const override { return full_path(); } + Strings getDataPaths() const override { return {full_path()}; } void truncate(const ASTPtr &, const Context &) override; diff --git a/dbms/src/Storages/StorageTinyLog.h b/dbms/src/Storages/StorageTinyLog.h index 5b8e4bc90ac..1fc3a96c178 100644 --- a/dbms/src/Storages/StorageTinyLog.h +++ b/dbms/src/Storages/StorageTinyLog.h @@ -50,7 +50,7 @@ public: std::string full_path() const { return path + escapeForFileName(name) + '/';} - String getDataPath() const override { return full_path(); } + Strings getDataPaths() const override { return {full_path()}; } void truncate(const ASTPtr &, const Context &) override; diff --git a/dbms/src/Storages/System/StorageSystemTables.cpp b/dbms/src/Storages/System/StorageSystemTables.cpp index 3413e8609f4..b17d1355e5c 100644 --- a/dbms/src/Storages/System/StorageSystemTables.cpp +++ b/dbms/src/Storages/System/StorageSystemTables.cpp @@ -195,8 +195,11 @@ protected: if (columns_mask[src_index++]) res_columns[res_index++]->insert(0u); // is_temporary - if (columns_mask[src_index++]) - res_columns[res_index++]->insert(tables_it->table()->getDataPath()); + if (columns_mask[src_index++]) { + for (const String & path : tables_it->table()->getDataPaths() ) { + res_columns[res_index++]->insert(path); ///@TODO_IGR ASK Is it fine? + } + } if (columns_mask[src_index++]) res_columns[res_index++]->insert(database->getTableMetadataPath(table_name));