diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp index e4d7465b360..e6253285539 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.cpp @@ -5,15 +5,15 @@ namespace DB { -ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names) +ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const ActiveDataPartSet::PartPathNames & names) : format_version(format_version_) { - for (const auto & name : names) - add(name); + for (const auto & path_name : names) + add(path_name.path, path_name.name); } -bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts) +bool ActiveDataPartSet::add(const String & path, const String & name, PartPathNames * out_replaced_parts) { auto part_info = MergeTreePartInfo::fromPartName(name, format_version); @@ -52,12 +52,12 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts) part_info_to_name.erase(it++); } - part_info_to_name.emplace(part_info, name); + part_info_to_name.emplace(part_info, PartPathName{path, name}); return true; } -String ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const +ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const { auto it = getContainingPartImpl(part_info); if (it != part_info_to_name.end()) @@ -66,7 +66,7 @@ String ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) } -String ActiveDataPartSet::getContainingPart(const String & name) const +ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const String & name) const { auto it = getContainingPartImpl(MergeTreePartInfo::fromPartName(name, format_version)); if (it != part_info_to_name.end()) @@ -75,7 +75,7 @@ String ActiveDataPartSet::getContainingPart(const String & name) const } -std::map::const_iterator +std::map::const_iterator ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) const { /// A part can only be covered/overlapped by the previous or next one in `part_info_to_name`. @@ -97,7 +97,8 @@ ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) co return part_info_to_name.end(); } -Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const +ActiveDataPartSet::PartPathNames +ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const { auto it_middle = part_info_to_name.lower_bound(part_info); auto begin = it_middle; @@ -128,16 +129,16 @@ Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info ++end; } - Strings covered; + PartPathNames covered; for (auto it = begin; it != end; ++it) covered.push_back(it->second); return covered; } -Strings ActiveDataPartSet::getParts() const +ActiveDataPartSet::PartPathNames ActiveDataPartSet::getParts() const { - Strings res; + PartPathNames res; res.reserve(part_info_to_name.size()); for (const auto & kv : part_info_to_name) res.push_back(kv.second); diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h index 17b30205bd5..f64f551dbd9 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h @@ -15,8 +15,15 @@ namespace DB class ActiveDataPartSet { public: + struct PartPathName { + String path; + String name; + }; + using PartPathNames = std::vector; + + ActiveDataPartSet(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) {} - ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names); + ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const PartPathNames & names); ActiveDataPartSet(const ActiveDataPartSet & other) : format_version(other.format_version) @@ -43,7 +50,7 @@ public: /// Returns true if the part was actually added. If out_replaced_parts != nullptr, it will contain /// parts that were replaced from the set by the newly added part. - bool add(const String & name, Strings * out_replaced_parts = nullptr); + bool add(const String & path, const String & name, PartPathNames * out_replaced_parts = nullptr); bool remove(const MergeTreePartInfo & part_info) { @@ -56,13 +63,13 @@ public: } /// If not found, return an empty string. - String getContainingPart(const MergeTreePartInfo & part_info) const; - String getContainingPart(const String & name) const; + PartPathName getContainingPart(const MergeTreePartInfo & part_info) const; + PartPathName getContainingPart(const String & name) const; - Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const; + PartPathNames getPartsCoveredBy(const MergeTreePartInfo & part_info) const; /// Returns parts in ascending order of the partition_id and block number. - Strings getParts() const; + PartPathNames getParts() const; size_t size() const; @@ -70,9 +77,9 @@ public: private: MergeTreeDataFormatVersion format_version; - std::map part_info_to_name; + std::map part_info_to_name; - std::map::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const; + std::map::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const; }; } diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index 42c668c9fcb..cb97937cd3b 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -100,7 +100,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo { String file_name = it.first; - String path = data.getFullPath() + part_name + "/" + file_name; + String path = part->getFullPath() + part_name + "/" + file_name; UInt64 size = Poco::File(path).getSize(); @@ -200,7 +200,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; - String absolute_part_path = data.getFullPath() + relative_part_path + "/"; + String part_path = data.getFullPathForPart(0); + String absolute_part_path = part_path + relative_part_path + "/"; ///@TODO_IGR ASK path for file Poco::File part_file(absolute_part_path); if (part_file.exists()) @@ -210,7 +211,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( part_file.createDirectory(); - MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, part_name); + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, part_path, part_name); new_data_part->relative_path = relative_part_path; new_data_part->is_temp = true; diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp index 507ac8b7459..d6fe0cb053c 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp @@ -3,8 +3,7 @@ namespace DB { -UInt64 DiskSpaceMonitor::reserved_bytes; -UInt64 DiskSpaceMonitor::reservation_count; +std::map DiskSpaceMonitor::reserved; std::mutex DiskSpaceMonitor::mutex; } diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h index d518fea5490..d9e229c23a8 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h @@ -34,6 +34,11 @@ namespace ErrorCodes class DiskSpaceMonitor { public: + struct DiskReserve { + UInt64 reserved_bytes; + UInt64 reservation_count; + }; + class Reservation : private boost::noncopyable { public: @@ -42,23 +47,23 @@ public: try { std::lock_guard lock(DiskSpaceMonitor::mutex); - if (DiskSpaceMonitor::reserved_bytes < size) + if (reserves->reserved_bytes < size) { - DiskSpaceMonitor::reserved_bytes = 0; + reserves->reserved_bytes = 0; LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size; it's a bug"); } else { - DiskSpaceMonitor::reserved_bytes -= size; + reserves->reserved_bytes -= size; } - if (DiskSpaceMonitor::reservation_count == 0) + if (reserves->reservation_count == 0) { LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count; it's a bug"); } else { - --DiskSpaceMonitor::reservation_count; + --reserves->reservation_count; } } catch (...) @@ -71,9 +76,9 @@ public: void update(UInt64 new_size) { std::lock_guard lock(DiskSpaceMonitor::mutex); - DiskSpaceMonitor::reserved_bytes -= size; + reserves->reserved_bytes -= size; size = new_size; - DiskSpaceMonitor::reserved_bytes += size; + reserves->reserved_bytes += size; } UInt64 getSize() const @@ -81,17 +86,18 @@ public: return size; } - Reservation(UInt64 size_) - : size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size) + Reservation(UInt64 size_, DiskReserve * reserves_) + : size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_) { std::lock_guard lock(DiskSpaceMonitor::mutex); - DiskSpaceMonitor::reserved_bytes += size; - ++DiskSpaceMonitor::reservation_count; + reserves->reserved_bytes += size; + ++reserves->reservation_count; } private: UInt64 size; CurrentMetrics::Increment metric_increment; + DiskReserve * reserves; }; using ReservationPtr = std::unique_ptr; @@ -110,6 +116,8 @@ public: std::lock_guard lock(mutex); + auto & reserved_bytes = reserved[path].reserved_bytes; + if (reserved_bytes > res) res = 0; else @@ -118,16 +126,16 @@ public: return res; } - static UInt64 getReservedSpace() + static UInt64 getReservedSpace(const std::string & path) { std::lock_guard lock(mutex); - return reserved_bytes; + return reserved[path].reserved_bytes; } - static UInt64 getReservationCount() + static UInt64 getReservationCount(const std::string & path) { std::lock_guard lock(mutex); - return reservation_count; + return reserved[path].reservation_count; } /// If not enough (approximately) space, throw an exception. @@ -137,12 +145,11 @@ public: if (free_bytes < size) throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, " + formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE); - return std::make_unique(size); + return std::make_unique(size, &reserved[path]); } private: - static UInt64 reserved_bytes; - static UInt64 reservation_count; + static std::map reserved; static std::mutex mutex; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 16b1e300c32..b883fec5c97 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -89,7 +89,7 @@ namespace ErrorCodes MergeTreeData::MergeTreeData( const String & database_, const String & table_, - const String & full_path_, const ColumnsDescription & columns_, + const Strings & full_paths_, const ColumnsDescription & columns_, const IndicesDescription & indices_, Context & context_, const String & date_column_name, @@ -110,7 +110,7 @@ MergeTreeData::MergeTreeData( sample_by_ast(sample_by_ast_), require_part_metadata(require_part_metadata_), database_name(database_), table_name(table_), - full_path(full_path_), + full_paths(full_paths_), broken_part_callback(broken_part_callback_), log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")), data_parts_by_info(data_parts_indexes.get()), @@ -159,13 +159,17 @@ MergeTreeData::MergeTreeData( min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING; } - auto path_exists = Poco::File(full_path).exists(); - /// Creating directories, if not exist. - Poco::File(full_path).createDirectories(); + auto format_path = full_paths[0]; ///@TODO_IGR ASK What path should we use for format file? + auto path_exists = Poco::File(format_path).exists(); - Poco::File(full_path + "detached").createDirectory(); + for (const String & path : full_paths) { + /// Creating directories, if not exist. + Poco::File(path).createDirectories(); - String version_file_path = full_path + "format_version.txt"; + Poco::File(path + "detached").createDirectory(); + } + + String version_file_path = format_path + "format_version.txt"; auto version_file_exists = Poco::File(version_file_path).exists(); // When data path or file not exists, ignore the format_version check if (!attach || !path_exists || !version_file_exists) @@ -625,15 +629,19 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { LOG_DEBUG(log, "Loading data parts"); - Strings part_file_names; + std::vector> part_file_names; Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it(full_path); it != end; ++it) + for (size_t i = 0; i != full_paths.size(); ++i) { - /// Skip temporary directories. - if (startsWith(it.name(), "tmp")) - continue; + auto&& full_path = full_paths[i]; + for (Poco::DirectoryIterator it(full_path); it != end; ++it) + { + /// Skip temporary directories. + if (startsWith(it.name(), "tmp")) + continue; - part_file_names.push_back(it.name()); + part_file_names.emplace_back(it.name(), i); + } } DataPartsVector broken_parts_to_remove; @@ -643,13 +651,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) std::lock_guard lock(data_parts_mutex); data_parts_indexes.clear(); - for (const String & file_name : part_file_names) + for (const auto & part_file_name : part_file_names) { + const String & file_name = part_file_name.first; + const size_t path_index = part_file_name.second; MergeTreePartInfo part_info; if (!MergeTreePartInfo::tryParsePartName(file_name, &part_info, format_version)) continue; - MutableDataPartPtr part = std::make_shared(*this, file_name, part_info); + MutableDataPartPtr part = std::make_shared(*this, full_paths[path_index], file_name, part_info); part->relative_path = file_name; bool broken = false; @@ -683,7 +693,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) if (part->info.level == 0) { /// It is impossible to restore level 0 parts. - LOG_ERROR(log, "Considering to remove broken part " << full_path + file_name << " because it's impossible to repair."); + LOG_ERROR(log, "Considering to remove broken part " << full_paths[path_index] + file_name << " because it's impossible to repair."); broken_parts_to_remove.push_back(part); } else @@ -693,10 +703,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) /// delete it. size_t contained_parts = 0; - LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it."); + LOG_ERROR(log, "Part " << full_paths[path_index] + file_name << " is broken. Looking for parts to replace it."); - for (const String & contained_name : part_file_names) + for (auto part_file_name : part_file_names) { + const String & contained_name = part_file_name.first; + const size_t contained_path_index = part_file_name.second; if (contained_name == file_name) continue; @@ -706,19 +718,19 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) if (part->info.contains(contained_part_info)) { - LOG_ERROR(log, "Found part " << full_path + contained_name); + LOG_ERROR(log, "Found part " << full_paths[contained_path_index] + contained_name); ++contained_parts; } } if (contained_parts >= 2) { - LOG_ERROR(log, "Considering to remove broken part " << full_path + file_name << " because it covers at least 2 other parts"); + LOG_ERROR(log, "Considering to remove broken part " << full_paths[path_index] + file_name << " because it covers at least 2 other parts"); broken_parts_to_remove.push_back(part); } else { - LOG_ERROR(log, "Detaching broken part " << full_path + file_name + LOG_ERROR(log, "Detaching broken part " << full_paths[path_index] + file_name << " because it covers less than 2 parts. You need to resolve this manually"); broken_parts_to_detach.push_back(part); ++suspicious_broken_parts; @@ -728,7 +740,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) continue; } - part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime(); + part->modification_time = Poco::File(full_paths[path_index] + file_name).getLastModified().epochTime(); /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later part->state = DataPartState::Committed; @@ -830,23 +842,23 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life /// Delete temporary directories older than a day. Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it{full_path}; it != end; ++it) + for (auto && full_path : full_paths) { - if (startsWith(it.name(), "tmp_")) + for (Poco::DirectoryIterator it{full_path}; it != end; ++it) { - Poco::File tmp_dir(full_path + it.name()); + if (startsWith(it.name(), "tmp_")) + { + Poco::File tmp_dir(full_path + it.name()); - try - { - if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline)) - { - LOG_WARNING(log, "Removing temporary directory " << full_path << it.name()); - Poco::File(full_path + it.name()).remove(true); + try { + if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline)) { + LOG_WARNING(log, "Removing temporary directory " << full_path << it.name()); + Poco::File(full_path + it.name()).remove(true); + } + } + catch (const Poco::FileNotFoundException &) { + /// If the file is already deleted, do nothing. } - } - catch (const Poco::FileNotFoundException &) - { - /// If the file is already deleted, do nothing. } } } @@ -965,15 +977,18 @@ void MergeTreeData::clearOldPartsFromFilesystem() removePartsFinally(parts_to_remove); } -void MergeTreeData::setPath(const String & new_full_path) +void MergeTreeData::setPath([[maybe_unused]] const String & new_full_path) { - if (Poco::File{new_full_path}.exists()) - throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS}; + ///@TODO_IGR ASK We can not implement this function. Remove it? + throw Exception{"this funcion does not implemeted yes", ErrorCodes::BAD_ARGUMENTS}; - Poco::File(full_path).renameTo(new_full_path); - - global_context.dropCaches(); - full_path = new_full_path; +// if (Poco::File{new_full_path}.exists()) +// throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS}; +// +// Poco::File(full_path).renameTo(new_full_path); +// +// global_context.dropCaches(); +// full_path = new_full_path; } void MergeTreeData::dropAllData() @@ -991,7 +1006,9 @@ void MergeTreeData::dropAllData() LOG_TRACE(log, "dropAllData: removing data from filesystem."); - Poco::File(full_path).remove(true); + for (auto && full_path : full_paths) { + Poco::File(full_path).remove(true); + } LOG_TRACE(log, "dropAllData: done."); } @@ -1366,7 +1383,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( exception_message << ") need to be " << (forbidden_because_of_modify ? "modified" : "removed") - << " in part " << part->name << " of table at " << full_path << ". Aborting just in case." + << " in part " << part->name << " of table at " << part->path << ". Aborting just in case." << " If it is not an error, you could increase merge_tree/" << (forbidden_because_of_modify ? "max_files_to_modify_in_alter_columns" : "max_files_to_remove_in_alter_columns") << " parameter in configuration file (current value: " @@ -1403,8 +1420,10 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( * will have old name of shared offsets for arrays. */ IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; + + ///@TODO_IGR ASK Why dont we use part->relative_path? MergedColumnOnlyOutputStream out( - *this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_codec, true /* skip_offsets */, unused_written_offsets); + *this, in.getHeader(), part->path + part->name + '/', true /* sync */, compression_codec, true /* skip_offsets */, unused_written_offsets); in.readPrefix(); out.writePrefix(); @@ -1430,7 +1449,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( if (!part->checksums.empty()) { transaction->new_checksums = new_checksums; - WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096); + WriteBufferFromFile checksums_file(part->path + part->name + "/checksums.txt.tmp", 4096); new_checksums.write(checksums_file); transaction->rename_map["checksums.txt.tmp"] = "checksums.txt"; } @@ -1438,7 +1457,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( /// Write the new column list to the temporary file. { transaction->new_columns = new_columns.filter(part->columns.getNames()); - WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096); + WriteBufferFromFile columns_file(part->path + part->name + "/columns.txt.tmp", 4096); transaction->new_columns.writeText(columns_file); transaction->rename_map["columns.txt.tmp"] = "columns.txt"; } @@ -1459,7 +1478,7 @@ void MergeTreeData::AlterDataPartTransaction::commit() { std::unique_lock lock(data_part->columns_lock); - String path = data_part->storage.full_path + data_part->name + "/"; + String path = data_part->path + data_part->name + "/"; /// NOTE: checking that a file exists before renaming or deleting it /// is justified by the fact that, when converting an ordinary column @@ -2133,9 +2152,9 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na } -MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path) +MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & path, const String & relative_path) { - MutableDataPartPtr part = std::make_shared(*this, Poco::Path(relative_path).getFileName()); + MutableDataPartPtr part = std::make_shared(*this, path, Poco::Path(relative_path).getFileName()); part->relative_path = relative_path; String full_part_path = part->getFullPath(); @@ -2254,19 +2273,21 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it(full_path); it != end; ++it) + for (const String & full_path : full_paths) { - MergeTreePartInfo part_info; - if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_info, format_version)) - continue; - if (part_info.partition_id != partition_id) - continue; - - const auto part_path = it.path().absolute().toString(); - for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2) + for (Poco::DirectoryIterator it(full_path); it != end; ++it) { - const auto part_file_path = it2.path().absolute().toString(); - size += Poco::File(part_file_path).getSize(); + MergeTreePartInfo part_info; + if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_info, format_version)) + continue; + if (part_info.partition_id != partition_id) + continue; + + const auto part_path = it.path().absolute().toString(); + for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2) { + const auto part_file_path = it2.path().absolute().toString(); + size += Poco::File(part_file_path).getSize(); + } } } @@ -2392,6 +2413,27 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat return res; } +String MergeTreeData::getFullPathForPart(UInt64 expected_size) const +{ + std::cerr << "Exp size " << expected_size << std::endl; + constexpr UInt64 SIZE_100MB = 100ull << 20; + constexpr UInt64 MAGIC_CONST = 1; + + if (expected_size < SIZE_100MB) { + expected_size = SIZE_100MB; + } + for (const String & path : full_paths) { + UInt64 free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path); ///@TODO_IGR ASK reserve? YES, we are + + if (free_space > expected_size * MAGIC_CONST) { + std::cerr << "Choosed " << free_space << " " << path << std::endl; + return path; + } + } + std::cerr << "Choosed last " << full_paths[full_paths.size() - 1] << std::endl; + return full_paths[full_paths.size() - 1]; +} + MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const { DataParts res; @@ -2577,7 +2619,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg String dst_part_name = src_part->getNewName(dst_part_info); String tmp_dst_part_name = tmp_part_prefix + dst_part_name; - Poco::Path dst_part_absolute_path = Poco::Path(full_path + tmp_dst_part_name).absolute(); + ///@TODO_IGR ASK Maybe flag that it is not recent part? Or choose same dir if it is possible + Poco::Path dst_part_absolute_path = Poco::Path(getFullPathForPart(src_part->bytes_on_disk) + tmp_dst_part_name).absolute(); Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute(); if (Poco::File(dst_part_absolute_path).exists()) @@ -2586,7 +2629,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString()); localBackup(src_part_absolute_path, dst_part_absolute_path); - MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared(*this, dst_part_name, dst_part_info); + MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared(*this, dst_part_storage_path, dst_part_name, dst_part_info); dst_data_part->relative_path = tmp_dst_part_name; dst_data_part->is_temp = true; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f252e43b562..a24fe5c0aeb 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -303,7 +303,7 @@ public: /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. /// attach - whether the existing table is attached or the new table is created. MergeTreeData(const String & database_, const String & table_, - const String & full_path_, + const Strings & full_paths_, const ColumnsDescription & columns_, const IndicesDescription & indices_, Context & context_, @@ -363,7 +363,7 @@ public: String getTableName() const { return table_name; } - String getFullPath() const { return full_path; } + String getFullPathForPart(UInt64 expected_size) const; String getLogName() const { return log_name; } @@ -525,7 +525,7 @@ public: Names getColumnsRequiredForSampling() const { return columns_required_for_sampling; } /// Check that the part is not broken and calculate the checksums for it if they are not present. - MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path); + MutableDataPartPtr loadPartAndFixMetadata(const String & path, const String & relative_path); /** Create local backup (snapshot) for parts with specified prefix. * Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number, @@ -633,7 +633,7 @@ private: String database_name; String table_name; - String full_path; + Strings full_paths; /// Current column sizes in compressed and uncompressed form. ColumnSizeByName column_sizes; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index f5ee7fe1ee7..877742b34b8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -119,6 +119,14 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_) name = part_info.getPartName(); } + UInt64 FutureMergedMutatedPart::expectedSize() const { ///TODO_IGR ASK sum size? + size_t size = 0; + for (auto &part : parts) { + size += part->getFileSizeOrZero(""); ///@TODO_IGR ASK file name? + } + return size; +} + MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, const BackgroundProcessingPool & pool_) : data(data_), pool(pool_), log(&Logger::get(data.getLogName() + " (MergerMutator)")) { @@ -150,7 +158,8 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_ data.settings.max_bytes_to_merge_at_max_space_in_pool, static_cast(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge); - return std::min(max_size, static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT)); + ///@TODO_IGR ASK what path? + return std::min(max_size, static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_paths[0]) / DISK_USAGE_COEFFICIENT_TO_SELECT)); } @@ -290,8 +299,8 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name << " because not enough free space: " << formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved " - << "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace()) << " reserved in " - << DiskSpaceMonitor::getReservationCount() << " chunks), " + << "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace("")) << " reserved in " ///@TODO_IGR ASK RESERVED SPACE ON ALL DISKS? + << DiskSpaceMonitor::getReservationCount("") << " chunks), " << formatReadableSizeWithBinarySuffix(sum_bytes) << " required now (+" << static_cast((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100) << "% on overhead); suppressing similar warnings for the next hour"); @@ -513,7 +522,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor << parts.front()->name << " to " << parts.back()->name << " into " << TMP_PREFIX + future_part.name); - String new_part_tmp_path = data.getFullPath() + TMP_PREFIX + future_part.name + "/"; + size_t expected_size = future_part.expectedSize(); + String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED SIZE + + String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/"; if (Poco::File(new_part_tmp_path).exists()) throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); @@ -531,7 +543,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared( - data, future_part.name, future_part.part_info); + data, part_path, future_part.name, future_part.part_info); new_data_part->partition.assign(future_part.getPartition()); new_data_part->relative_path = TMP_PREFIX + future_part.name; new_data_part->is_temp = true; @@ -853,8 +865,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor else LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation); + size_t expected_size = future_part.expectedSize(); + String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED_SIZE + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared( - data, future_part.name, future_part.part_info); + data, part_path, future_part.name, future_part.part_info); new_data_part->relative_path = "tmp_mut_" + future_part.name; new_data_part->is_temp = true; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 4f657847648..b109e236064 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -30,6 +30,8 @@ struct FutureMergedMutatedPart } void assign(MergeTreeData::DataPartsVector parts_); + + UInt64 expectedSize() const; }; /** Can select the parts to merge and merge them. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 01ff4c4cdac..3c89ba95226 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -136,8 +136,9 @@ void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other) } -MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_) - : storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version)) +MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String& path_, const String & name_) + ///@TODO_IGR DO check is fromPartName need to use path + : storage(storage_), path(path_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version)) { } @@ -233,7 +234,7 @@ String MergeTreeDataPart::getFullPath() const if (relative_path.empty()) throw Exception("Part relative_path cannot be empty. This is bug.", ErrorCodes::LOGICAL_ERROR); - return storage.full_path + relative_path + "/"; + return path + relative_path + "/"; } String MergeTreeDataPart::getNameWithPrefix() const @@ -355,8 +356,8 @@ void MergeTreeDataPart::remove() const * And a race condition can happen that will lead to "File not found" error here. */ - String from = storage.full_path + relative_path; - String to = storage.full_path + "delete_tmp_" + name; + String from = path + relative_path; + String to = path + "delete_tmp_" + name; Poco::File from_dir{from}; Poco::File to_dir{to}; @@ -396,7 +397,7 @@ void MergeTreeDataPart::remove() const void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const { String from = getFullPath(); - String to = storage.full_path + new_relative_path + "/"; + String to = path + new_relative_path + "/"; Poco::File from_file(from); if (!from_file.exists()) @@ -442,7 +443,7 @@ String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) { res = dst_name(); - if (!Poco::File(storage.full_path + res).exists()) + if (!Poco::File(path + res).exists()) return res; LOG_WARNING(storage.log, "Directory " << dst_name() << " (to detach to) is already exist." @@ -462,7 +463,8 @@ void MergeTreeDataPart::renameToDetached(const String & prefix) const void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const { Poco::Path src(getFullPath()); - Poco::Path dst(storage.full_path + getRelativePathForDetachedPart(prefix)); + Poco::Path dst(path + getRelativePathForDetachedPart(prefix)); + ///@TODO_IGR ASK What about another path? /// Backup is not recursive (max_level is 0), so do not copy inner directories localBackup(src, dst, 0); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 6c6db319916..a3a68fa59ad 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -28,12 +28,12 @@ struct MergeTreeDataPart using Checksums = MergeTreeDataPartChecksums; using Checksum = MergeTreeDataPartChecksums::Checksum; - MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_) - : storage(storage_), name(name_), info(info_) + MergeTreeDataPart(const MergeTreeData & storage_, const String & path_, const String & name_, const MergeTreePartInfo & info_) + : storage(storage_), path(path_), name(name_), info(info_) { } - MergeTreeDataPart(MergeTreeData & storage_, const String & name_); + MergeTreeDataPart(MergeTreeData & storage_, const String & path_, const String & name_); /// Returns the name of a column with minimum compressed size (as returned by getColumnSize()). /// If no checksums are present returns the name of the first physically existing column. @@ -86,6 +86,7 @@ struct MergeTreeDataPart const MergeTreeData & storage; + String path; String name; MergeTreePartInfo info; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index bd293c224a0..67f89dd36e2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -161,7 +161,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa else part_name = new_part_info.getPartName(); - MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, part_name, new_part_info); + size_t expected_size = block.bytes(); + String part_absolute_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK expected size + + MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, part_absolute_path, part_name, new_part_info); new_data_part->partition = std::move(partition); new_data_part->minmax_idx = std::move(minmax_idx); new_data_part->relative_path = TMP_PREFIX + part_name; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index fd356e75e8f..60e9feafdc9 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -62,10 +62,10 @@ StorageMergeTree::StorageMergeTree( const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_, bool has_force_restore_data_flag) - : path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), + : path(path_), database_name(database_name_), table_name(table_name_), full_paths{path + escapeForFileName(table_name) + '/', "/mnt/data/Data2/" + escapeForFileName(table_name) + '/'}, global_context(context_), background_pool(context_.getBackgroundPool()), data(database_name, table_name, - full_path, columns_, indices_, + full_paths, columns_, indices_, context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, merging_params_, settings_, false, attach), reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()), @@ -185,7 +185,7 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne path = new_path_to_db; table_name = new_table_name; - full_path = new_full_path; + full_paths = {new_full_path}; ///TODO_IGR ASK rename? /// NOTE: Logger names are not updated. } @@ -273,7 +273,7 @@ public: : future_part(future_part_), storage(storage_) { /// Assume mutex is already locked, because this method is called from mergeTask. - reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// May throw. + reserved_space = DiskSpaceMonitor::reserve(storage.full_paths[0], total_size); /// May throw. @TODO_IGR ASK WHERE TO RESERVE for (const auto & part : future_part.parts) { @@ -333,7 +333,7 @@ public: void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) { - MergeTreeMutationEntry entry(commands, full_path, data.insert_increment.get()); + MergeTreeMutationEntry entry(commands, full_paths[0], data.insert_increment.get()); ///@TODO_IGR ASK PATH TO ENTRY String file_name; { std::lock_guard lock(currently_merging_mutex); @@ -426,11 +426,11 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) void StorageMergeTree::loadMutations() { Poco::DirectoryIterator end; - for (auto it = Poco::DirectoryIterator(full_path); it != end; ++it) + for (auto it = Poco::DirectoryIterator(full_paths[0]); it != end; ++it) ///@TODO_IGR ASK MUTATIONS FROM ALL DISKS? { if (startsWith(it.name(), "mutation_")) { - MergeTreeMutationEntry entry(full_path, it.name()); + MergeTreeMutationEntry entry(full_paths[0], it.name()); Int64 block_number = entry.block_number; auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry)); current_mutations_by_version.emplace(block_number, insertion.first->second); @@ -481,7 +481,7 @@ bool StorageMergeTree::merge( } else { - UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); + UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } @@ -570,7 +570,7 @@ bool StorageMergeTree::tryMutatePart() /// You must call destructor with unlocked `currently_merging_mutex`. std::optional tagger; { - auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); + auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS std::lock_guard lock(currently_merging_mutex); @@ -948,39 +948,43 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par String source_dir = "detached/"; /// Let's make a list of parts to add. - Strings parts; + ActiveDataPartSet::PartPathNames parts; if (attach_part) { - parts.push_back(partition_id); + for (const String & full_path : full_paths) { + parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id}); ///@TODO_IGR ASK + } } else { LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); ActiveDataPartSet active_parts(data.format_version); - for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) - { - const String & name = it.name(); - MergeTreePartInfo part_info; - if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version) - || part_info.partition_id != partition_id) + for (const String & full_path : full_paths) { + for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) ///@TODO_IGR { - continue; + const String & name = it.name(); + MergeTreePartInfo part_info; + if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version) + || part_info.partition_id != partition_id) + { + continue; + } + LOG_DEBUG(log, "Found part " << name); + active_parts.add(full_path, name); } - LOG_DEBUG(log, "Found part " << name); - active_parts.add(name); } LOG_DEBUG(log, active_parts.size() << " of them are active"); parts = active_parts.getParts(); } - for (const auto & source_part_name : parts) + for (const auto & source_part : parts) { - String source_path = source_dir + source_part_name; + String source_path = source_dir + source_part.name; LOG_DEBUG(log, "Checking data"); - MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path); + MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_part.path, source_part.name); - LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path); + LOG_INFO(log, "Attaching part " << source_part.name << " from " << source_path); data.renameTempPartAndAdd(part, &increment); LOG_INFO(log, "Finished attaching part"); diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index f7e4e10fc4e..a9d4b08098c 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -90,7 +90,7 @@ public: MergeTreeData & getData() { return data; } const MergeTreeData & getData() const { return data; } - String getDataPath() const override { return full_path; } + String getDataPath() const override { return full_paths[0]; } ///@TODO_IGR ASK WHAT PATH ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; } ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); } @@ -107,7 +107,7 @@ private: String path; String database_name; String table_name; - String full_path; + Strings full_paths; Context global_context; BackgroundProcessingPool & background_pool;