From 341e3f1c5b63561857942633977eac4820069a47 Mon Sep 17 00:00:00 2001 From: Igor Mineev Date: Wed, 3 Apr 2019 15:52:09 +0300 Subject: [PATCH] Add space reservation for each new MargeTreeDataPart. --- .../Storages/MergeTree/ActiveDataPartSet.h | 1 + .../src/Storages/MergeTree/DiskSpaceMonitor.h | 135 +++++++++++++++--- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 49 ++++--- dbms/src/Storages/MergeTree/MergeTreeData.h | 21 ++- .../MergeTree/MergeTreeDataMergerMutator.cpp | 17 +-- .../MergeTree/MergeTreeDataMergerMutator.h | 4 +- dbms/src/Storages/StorageMergeTree.cpp | 18 ++- 7 files changed, 187 insertions(+), 58 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h index f64f551dbd9..f88f53cb6b6 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h @@ -16,6 +16,7 @@ class ActiveDataPartSet { public: struct PartPathName { + /// path + name is absolute path to DataPart String path; String name; }; diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h index d9e229c23a8..b79aa4734dc 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h @@ -30,6 +30,8 @@ namespace ErrorCodes * Could "reserve" space, for different operations to plan disk space usage. * Reservations are not separated for different filesystems, * instead it is assumed, that all reservations are done within same filesystem. + * + * It is necessary to set all paths in map before MergeTreeData starts */ class DiskSpaceMonitor { @@ -86,8 +88,17 @@ public: return size; } - Reservation(UInt64 size_, DiskReserve * reserves_) - : size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_) + const String & getPath() const { + return path; + } + + void addEnclosedDirToPath(const String & dir) { + path += dir + '/'; + } + + Reservation(UInt64 size_, DiskReserve * reserves_, const String & path_) + : size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_), + path(path_) { std::lock_guard lock(DiskSpaceMonitor::mutex); reserves->reserved_bytes += size; @@ -98,15 +109,16 @@ public: UInt64 size; CurrentMetrics::Increment metric_increment; DiskReserve * reserves; + String path; }; using ReservationPtr = std::unique_ptr; - static UInt64 getUnreservedFreeSpace(const std::string & path) + static UInt64 getUnreservedFreeSpace(const String & disk_path) { struct statvfs fs; - if (statvfs(path.c_str(), &fs) != 0) + if (statvfs(disk_path.c_str(), &fs) != 0) throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS); UInt64 res = fs.f_bfree * fs.f_bsize; @@ -116,7 +128,7 @@ public: std::lock_guard lock(mutex); - auto & reserved_bytes = reserved[path].reserved_bytes; + auto & reserved_bytes = reserved[disk_path].reserved_bytes; if (reserved_bytes > res) res = 0; @@ -126,26 +138,60 @@ public: return res; } - static UInt64 getReservedSpace(const std::string & path) + /** Returns max of unreserved free space on all disks + * It is necessary to have guarantee that all paths are set + */ + static UInt64 getMaxUnreservedFreeSpace() { - std::lock_guard lock(mutex); - return reserved[path].reserved_bytes; + UInt64 max_unreserved = 0; + for (auto& [disk_path, reserve] : reserved) { + struct statvfs fs; + + if (statvfs(disk_path.c_str(), &fs) != 0) + throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS); + + UInt64 res = fs.f_bfree * fs.f_bsize; + + /// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df. + res -= std::min(res, static_cast(30 * (1ul << 20))); + + ///@TODO_IGR ASK Maybe mutex out of for + std::lock_guard lock(mutex); + + auto &reserved_bytes = reserved[disk_path].reserved_bytes; + + if (reserved_bytes > res) + res = 0; + else + res -= reserved_bytes; + + max_unreserved = std::max(max_unreserved, res); + } + return max_unreserved; } - static UInt64 getReservationCount(const std::string & path) + static UInt64 getReservedSpace(const String & disk_path) { std::lock_guard lock(mutex); - return reserved[path].reservation_count; + return reserved[disk_path].reserved_bytes; } - /// If not enough (approximately) space, throw an exception. - static ReservationPtr reserve(const std::string & path, UInt64 size) + static UInt64 getReservationCount(const String & disk_path) { - UInt64 free_bytes = getUnreservedFreeSpace(path); + std::lock_guard lock(mutex); + return reserved[disk_path].reservation_count; + } + + /// If not enough (approximately) space, do not reserve. + static ReservationPtr tryToReserve(const String & disk_path, UInt64 size) + { + UInt64 free_bytes = getUnreservedFreeSpace(disk_path); + ///@TODO_IGR ASK twice reservation? if (free_bytes < size) - throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, " - + formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE); - return std::make_unique(size, &reserved[path]); + { + return {}; + } + return std::make_unique(size, &reserved[disk_path], disk_path); } private: @@ -153,4 +199,61 @@ private: static std::mutex mutex; }; +class Schema +{ + class Volume { + friend class Schema; + + public: + Volume(std::vector paths_) : paths(std::move(paths_)) + { + } + + DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const { + for (size_t i = 0; i != paths.size(); ++i) { + last_used = (last_used + 1) % paths.size(); + auto reservation = DiskSpaceMonitor::tryToReserve(paths[last_used], expected_size); + if (reservation) { + return reservation; + } + } + return {}; + } + + private: + const Strings paths; + mutable size_t last_used = 0; ///@TODO_IGR ASK It is thread safe, but it is not consistent. :( + /// P.S. I do not want to use mutex here + }; + +public: + Schema(const std::vector & disks) { + for (const Strings & volume : disks) { + volumes.emplace_back(volume); + } + } + + ///@TODO_IGR ASK maybe iterator without copy? + Strings getFullPaths() const { + Strings res; + for (const auto & volume : volumes) { + std::copy(volume.paths.begin(), volume.paths.end(), std::back_inserter(res)); + } + return res; + } + + DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const { + for (auto & volume : volumes) { + auto reservation = volume.reserve(expected_size); + if (reservation) { + return reservation; + } + } + return {}; + } + +private: + std::vector volumes; +}; + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index b883fec5c97..84b5abdd6e6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -89,7 +89,7 @@ namespace ErrorCodes MergeTreeData::MergeTreeData( const String & database_, const String & table_, - const Strings & full_paths_, const ColumnsDescription & columns_, + const Schema & schema_, const ColumnsDescription & columns_, const IndicesDescription & indices_, Context & context_, const String & date_column_name, @@ -110,7 +110,7 @@ MergeTreeData::MergeTreeData( sample_by_ast(sample_by_ast_), require_part_metadata(require_part_metadata_), database_name(database_), table_name(table_), - full_paths(full_paths_), + schema(schema_), broken_part_callback(broken_part_callback_), log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")), data_parts_by_info(data_parts_indexes.get()), @@ -159,7 +159,10 @@ MergeTreeData::MergeTreeData( min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING; } + auto full_paths = getFullPaths(); + auto format_path = full_paths[0]; ///@TODO_IGR ASK What path should we use for format file? + /// Use first disk. If format file not there move it. auto path_exists = Poco::File(format_path).exists(); for (const String & path : full_paths) { @@ -629,6 +632,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { LOG_DEBUG(log, "Loading data parts"); + const auto full_paths = getFullPaths(); + std::vector> part_file_names; Poco::DirectoryIterator end; for (size_t i = 0; i != full_paths.size(); ++i) @@ -840,6 +845,8 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life ? current_time - custom_directories_lifetime_seconds : current_time - settings.temporary_directories_lifetime.totalSeconds(); + const auto full_paths = getFullPaths(); + /// Delete temporary directories older than a day. Poco::DirectoryIterator end; for (auto && full_path : full_paths) @@ -1006,7 +1013,7 @@ void MergeTreeData::dropAllData() LOG_TRACE(log, "dropAllData: removing data from filesystem."); - for (auto && full_path : full_paths) { + for (auto && full_path : getFullPaths()) { Poco::File(full_path).remove(true); } @@ -2273,6 +2280,8 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const Poco::DirectoryIterator end; + const auto full_paths = getFullPaths(); + for (const String & full_path : full_paths) { for (Poco::DirectoryIterator it(full_path); it != end; ++it) @@ -2413,7 +2422,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat return res; } -String MergeTreeData::getFullPathForPart(UInt64 expected_size) const +DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size) const { std::cerr << "Exp size " << expected_size << std::endl; constexpr UInt64 SIZE_100MB = 100ull << 20; @@ -2422,16 +2431,14 @@ String MergeTreeData::getFullPathForPart(UInt64 expected_size) const if (expected_size < SIZE_100MB) { expected_size = SIZE_100MB; } - for (const String & path : full_paths) { - UInt64 free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path); ///@TODO_IGR ASK reserve? YES, we are - - if (free_space > expected_size * MAGIC_CONST) { - std::cerr << "Choosed " << free_space << " " << path << std::endl; - return path; - } + auto reservation = reserveSpaceAtDisk(expected_size * MAGIC_CONST); + if (reservation) { + return reservation; } - std::cerr << "Choosed last " << full_paths[full_paths.size() - 1] << std::endl; - return full_paths[full_paths.size() - 1]; + + throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(expected_size) + " requested, " + + formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getMaxUnreservedFreeSpace()) + " available", + ErrorCodes::NOT_ENOUGH_SPACE); } MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const @@ -2619,8 +2626,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg String dst_part_name = src_part->getNewName(dst_part_info); String tmp_dst_part_name = tmp_part_prefix + dst_part_name; - ///@TODO_IGR ASK Maybe flag that it is not recent part? Or choose same dir if it is possible - Poco::Path dst_part_absolute_path = Poco::Path(getFullPathForPart(src_part->bytes_on_disk) + tmp_dst_part_name).absolute(); + auto reservation = reserveSpaceForPart(src_part->bytes_on_disk); + String dst_part_path = reservation->getPath(); + Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute(); Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute(); if (Poco::File(dst_part_absolute_path).exists()) @@ -2629,7 +2637,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString()); localBackup(src_part_absolute_path, dst_part_absolute_path); - MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared(*this, dst_part_storage_path, dst_part_name, dst_part_info); + MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared(*this, dst_part_path, dst_part_name, dst_part_info); dst_data_part->relative_path = tmp_dst_part_name; dst_data_part->is_temp = true; @@ -2638,6 +2646,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg return dst_data_part; } +DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expected_size) const { + auto reservation = schema.reserve(expected_size); + if (reservation) { + /// Add path to table at disk + reservation->addEnclosedDirToPath(table_name); ///@TODO_IGR ASK can we use table_name here? Could path be different? + } + return reservation; +} + void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context) { String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index a24fe5c0aeb..f6726fb8539 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -286,7 +287,7 @@ public: }; - /// Attach the table corresponding to the directory in full_path (must end with /), with the given columns. + /// Attach the table corresponding to the directory in full_path inside schema (must end with /), with the given columns. /// Correctness of names and paths is not checked. /// /// date_column_name - if not empty, the name of the Date column used for partitioning by month. @@ -303,7 +304,7 @@ public: /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. /// attach - whether the existing table is attached or the new table is created. MergeTreeData(const String & database_, const String & table_, - const Strings & full_paths_, + const Schema & schema_, const ColumnsDescription & columns_, const IndicesDescription & indices_, Context & context_, @@ -363,7 +364,7 @@ public: String getTableName() const { return table_name; } - String getFullPathForPart(UInt64 expected_size) const; + DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size) const; ///@TODO_IGR ASK Is it realy const? String getLogName() const { return log_name; } @@ -569,6 +570,17 @@ public: MergeTreeData::MutableDataPartPtr cloneAndLoadDataPart(const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info); + DiskSpaceMonitor::ReservationPtr reserveSpaceAtDisk(UInt64 expected_size) const; ///@TODO_IGR ASK Maybe set this method as private? + + Strings getFullPaths() const { + auto paths = schema.getFullPaths(); + for (auto && path : paths) { + path += table_name + '/'; ///@TODO_IGR ASK It is too slow( + /// Maybe store in full_paths variable? + } + return paths; + } + MergeTreeDataFormatVersion format_version; Context global_context; @@ -633,7 +645,8 @@ private: String database_name; String table_name; - Strings full_paths; + + Schema schema; /// Current column sizes in compressed and uncompressed form. ColumnSizeByName column_sizes; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 877742b34b8..7ad13ad6338 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -119,14 +119,6 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_) name = part_info.getPartName(); } - UInt64 FutureMergedMutatedPart::expectedSize() const { ///TODO_IGR ASK sum size? - size_t size = 0; - for (auto &part : parts) { - size += part->getFileSizeOrZero(""); ///@TODO_IGR ASK file name? - } - return size; -} - MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, const BackgroundProcessingPool & pool_) : data(data_), pool(pool_), log(&Logger::get(data.getLogName() + " (MergerMutator)")) { @@ -522,8 +514,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor << parts.front()->name << " to " << parts.back()->name << " into " << TMP_PREFIX + future_part.name); - size_t expected_size = future_part.expectedSize(); - String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED SIZE + String part_path = disk_reservation->getPath(); String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/"; if (Poco::File(new_part_tmp_path).exists()) @@ -830,7 +821,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor const FutureMergedMutatedPart & future_part, const std::vector & commands, MergeListEntry & merge_entry, - const Context & context) + const Context & context, + DiskSpaceMonitor::Reservation * disk_reservation) { auto check_not_cancelled = [&]() { @@ -865,8 +857,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor else LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation); - size_t expected_size = future_part.expectedSize(); - String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED_SIZE + String part_path = disk_reservation->getPath(); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared( data, part_path, future_part.name, future_part.part_info); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index b109e236064..67c816ffb21 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -30,8 +30,6 @@ struct FutureMergedMutatedPart } void assign(MergeTreeData::DataPartsVector parts_); - - UInt64 expectedSize() const; }; /** Can select the parts to merge and merge them. @@ -98,7 +96,7 @@ public: MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart( const FutureMergedMutatedPart & future_part, const std::vector & commands, - MergeListEntry & merge_entry, const Context & context); + MergeListEntry & merge_entry, const Context & context, DiskSpaceMonitor::Reservation * disk_reservation); MergeTreeData::DataPartPtr renameMergedTemporaryPart( MergeTreeData::MutableDataPartPtr & new_data_part, diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 60e9feafdc9..84c85553d79 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -65,7 +65,7 @@ StorageMergeTree::StorageMergeTree( : path(path_), database_name(database_name_), table_name(table_name_), full_paths{path + escapeForFileName(table_name) + '/', "/mnt/data/Data2/" + escapeForFileName(table_name) + '/'}, global_context(context_), background_pool(context_.getBackgroundPool()), data(database_name, table_name, - full_paths, columns_, indices_, + Schema(std::vector{full_paths}), columns_, indices_, ///@TODO_IGR generate Schema from config context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, merging_params_, settings_, false, attach), reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()), @@ -273,12 +273,15 @@ public: : future_part(future_part_), storage(storage_) { /// Assume mutex is already locked, because this method is called from mergeTask. - reserved_space = DiskSpaceMonitor::reserve(storage.full_paths[0], total_size); /// May throw. @TODO_IGR ASK WHERE TO RESERVE + reserved_space = storage.data.reserveSpaceForPart(total_size); + if (!reserved_space) { + throw Exception("Not enought space", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR Edit exception msg + } for (const auto & part : future_part.parts) { if (storage.currently_merging.count(part)) - throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); + throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); } storage.currently_merging.insert(future_part.parts.begin(), future_part.parts.end()); } @@ -481,7 +484,8 @@ bool StorageMergeTree::merge( } else { - UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS + /// DataPArt can be store only at one disk. Get Max of free space at all disks + UInt64 disk_space = DiskSpaceMonitor::getMaxUnreservedFreeSpace(); ///@TODO_IGR ASK Maybe reserve max space at this disk there and then change to exactly space selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } @@ -570,7 +574,8 @@ bool StorageMergeTree::tryMutatePart() /// You must call destructor with unlocked `currently_merging_mutex`. std::optional tagger; { - auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS + /// DataPArt can be store only at one disk. Get Max of free space at all disks + UInt64 disk_space = DiskSpaceMonitor::getMaxUnreservedFreeSpace(); ///@TODO_IGR ASK Maybe reserve max space at this disk there and then change to exactly space std::lock_guard lock(currently_merging_mutex); @@ -660,7 +665,8 @@ bool StorageMergeTree::tryMutatePart() try { - new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context); + new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context, + tagger->reserved_space.get()); data.renameTempPartAndReplace(new_part); tagger->is_successful = true; write_part_log({});