From 7824aa528fb5f7ac4eedf9031f1d2c551b1e4c55 Mon Sep 17 00:00:00 2001 From: Igor Mineev Date: Thu, 4 Apr 2019 20:19:11 +0300 Subject: [PATCH] Add Schemes to config. Parse scheme as default on MergeTreeData init --- dbms/src/Interpreters/Context.cpp | 21 ++- dbms/src/Interpreters/Context.h | 3 + .../Storages/MergeTree/DiskSpaceMonitor.cpp | 177 ++++++++++++++++++ .../src/Storages/MergeTree/DiskSpaceMonitor.h | 170 +++++++++-------- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 14 +- dbms/src/Storages/MergeTree/MergeTreeData.h | 1 - .../MergeTree/MergeTreeDataMergerMutator.cpp | 6 +- dbms/src/Storages/StorageMergeTree.cpp | 12 +- .../Storages/StorageReplicatedMergeTree.cpp | 14 +- 9 files changed, 317 insertions(+), 101 deletions(-) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 72e9af80f45..439d7ef823f 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -145,6 +145,8 @@ struct ContextShared std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. mutable std::unique_ptr compression_codec_selector; + /// Storage schema chooser; + mutable std::unique_ptr merge_tree_storage_configuration; std::optional merge_tree_settings; /// Settings of MergeTree* engines. size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default) size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default) @@ -1612,7 +1614,7 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par auto & config = getConfigRef(); if (config.has(config_name)) - shared->compression_codec_selector = std::make_unique(config, "compression"); + shared->compression_codec_selector = std::make_unique(config, config_name); else shared->compression_codec_selector = std::make_unique(); } @@ -1621,6 +1623,23 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par } +///@TODO_IGR ASK maybe pointer to Schema? +Schema Context::chooseSchema(const String & name) const +{ + auto lock = getLock(); + + if (!shared->merge_tree_storage_configuration) + { + constexpr auto config_name = "storage_configuration"; + auto & config = getConfigRef(); + + shared->merge_tree_storage_configuration = std::make_unique(config, config_name); + } + + return (*shared->merge_tree_storage_configuration)[name]; +} + + const MergeTreeSettings & Context::getMergeTreeSettings() const { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 8cd000da280..5d2bc7ed368 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -422,6 +423,8 @@ public: /// Lets you select the compression codec according to the conditions described in the configuration file. std::shared_ptr chooseCompressionCodec(size_t part_size, double part_size_ratio) const; + Schema chooseSchema(const String & name) const; + /// Get the server uptime in seconds. time_t getUptimeSeconds() const; diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp index d6fe0cb053c..72af7160fe2 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp @@ -6,4 +6,181 @@ namespace DB std::map DiskSpaceMonitor::reserved; std::mutex DiskSpaceMonitor::mutex; +Disk::Disk(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + : path(config.getString(config_prefix + ".path")), + keep_free_space_bytes(config.getUInt64(config_prefix + ".keep_free_space_bytes", 0)) +{ +} + +DisksSelector::DisksSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + for (const auto & name : keys) + { + disks.emplace(name, Disk{config, config_prefix + "." + name}); + } +} + +const Disk & DisksSelector::operator[](const String & name) const { + auto it = disks.find(name); + if (it == disks.end()) { + throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + } + return it->second; +} + +Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DisksSelector & disk_selector) { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + /// Disk's names + Strings disks_names; + + for (const auto & name : keys) + { + if (startsWith(name.data(), "disk")) { + disks_names.push_back(config.getString(config_prefix + "." + name)); + } else if (name == "part_size_threshold_bytes") { + max_data_part_size = config.getUInt64(config_prefix + "." + name, 0); + } + ///@TODO_IGR part_size_threshold_ratio which set max_data_part_size by total disk size + } + + if (max_data_part_size == 0) { + --max_data_part_size; + } + + /// Get paths from disk's names + for (const auto & disk_name : disks_names) { + /// Disks operator [] may throw exception + disks.push_back(disk_selector[disk_name]); + } +} + +bool Schema::Volume::setDefaultPath(const String & path) { + bool set = false; + for (auto & disk : disks) { + if (disk.path == "default") { + if (set) { + throw Exception("It is not possible to have two default disks", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); ///@TODO_IGR ASK ErrorCode + } + set = true; + disk.path = path; + + } + } + return set; +} + +DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) const { + /// This volume can not store files which size greater than max_data_part_size + if (expected_size > max_data_part_size) { + return {}; + } + /// Real order is not necessary + size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed); + for (size_t i = 0; i != disks.size(); ++i) { + size_t index = (start_from + i) % disks.size(); + auto reservation = DiskSpaceMonitor::tryToReserve(disks[index], expected_size); + if (reservation) { + return reservation; + } + } + return {}; +} + +UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const { + UInt64 res = 0; + for (const auto & disk : disks) { + ///@TODO_IGR ASK There is cycle with mutex locking inside((( + res = std::max(res, DiskSpaceMonitor::getUnreservedFreeSpace(disk)); + } + return res; +} + +Schema::Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DisksSelector & disks) { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + for (const auto & name : keys) + { + if (!startsWith(name.data(), "volume")) + throw Exception("Unknown element in config: " + config_prefix + "." + name + ", must be 'volume'",\ + ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + volumes.emplace_back(config, config_prefix + "." + name, disks); + } +} + +///@TODO_IRG ASK Single use in MergeTreeData constuctor +void Schema::setDefaultPath(const String & path) { + bool set = false; + for (auto & volume : volumes) { + if (volume.setDefaultPath(path)) { + if (set) { + throw Exception("It is not possible to have two default disks", + ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); ///@TODO_IGR ASK ErrorCode + } + set = true; + } + } +} + +///@TODO_IGR ASK maybe iterator without copy? +Strings Schema::getFullPaths() const { + Strings res; + for (const auto & volume : volumes) { + for (const auto & disk : volume.disks) { + res.push_back(disk.path); + } + } + return res; +} + +UInt64 Schema::getMaxUnreservedFreeSpace() const { + UInt64 res = 0; + for (const auto & volume : volumes) { + res = std::max(res, volume.getMaxUnreservedFreeSpace()); + } + return res; +} + +DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const { + for (auto & volume : volumes) { + auto reservation = volume.reserve(expected_size); + if (reservation) { + return reservation; + } + } + return {}; +} + +SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DisksSelector & disks) { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + + for (const auto & name : keys) + { + ///@TODO_IGR ASK What if same names? + std::cerr << "Schema " + name << std::endl; + schemes.emplace(name, Schema{config, config_prefix + "." + name, disks}); + } + + std::cerr << config_prefix << " " << schemes.size() << std::endl; +} + +const Schema & SchemaSelector::operator[](const String & name) const { + auto it = schemes.find(name); + if (it == schemes.end()) { + throw Exception("Unknown schema " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); ///@TODO_IGR Choose error code + } + return it->second; +} + +MergeTreeStorageConfiguration::MergeTreeStorageConfiguration(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + : disks(config, config_prefix + ".disks"), schema_selector(config, config_prefix + ".schemes", disks) +{ + std::cerr << config_prefix << " " << disks.size() << std::endl; +} + } diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h index b79aa4734dc..15f5d1f5b1e 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -23,8 +24,16 @@ namespace ErrorCodes { extern const int CANNOT_STATVFS; extern const int NOT_ENOUGH_SPACE; + extern const int UNKNOWN_ELEMENT_IN_CONFIG; } +struct Disk { + String path; + UInt64 keep_free_space_bytes; + + Disk(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); +}; + /** Determines amount of free space in filesystem. * Could "reserve" space, for different operations to plan disk space usage. @@ -114,21 +123,23 @@ public: using ReservationPtr = std::unique_ptr; - static UInt64 getUnreservedFreeSpace(const String & disk_path) + static UInt64 getUnreservedFreeSpace(const Disk & disk) { struct statvfs fs; - if (statvfs(disk_path.c_str(), &fs) != 0) + if (statvfs(disk.path.c_str(), &fs) != 0) throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS); UInt64 res = fs.f_bfree * fs.f_bsize; + res -= std::min(res, disk.keep_free_space_bytes); ///@TODO_IGR ASK Is Heuristic by Michael Kolupaev actual? + /// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df. res -= std::min(res, static_cast(30 * (1ul << 20))); std::lock_guard lock(mutex); - auto & reserved_bytes = reserved[disk_path].reserved_bytes; + auto & reserved_bytes = reserved[disk.path].reserved_bytes; if (reserved_bytes > res) res = 0; @@ -138,60 +149,36 @@ public: return res; } - /** Returns max of unreserved free space on all disks - * It is necessary to have guarantee that all paths are set - */ - static UInt64 getMaxUnreservedFreeSpace() + static UInt64 getAllReservedSpace() { - UInt64 max_unreserved = 0; - for (auto& [disk_path, reserve] : reserved) { - struct statvfs fs; - - if (statvfs(disk_path.c_str(), &fs) != 0) - throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS); - - UInt64 res = fs.f_bfree * fs.f_bsize; - - /// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df. - res -= std::min(res, static_cast(30 * (1ul << 20))); - - ///@TODO_IGR ASK Maybe mutex out of for - std::lock_guard lock(mutex); - - auto &reserved_bytes = reserved[disk_path].reserved_bytes; - - if (reserved_bytes > res) - res = 0; - else - res -= reserved_bytes; - - max_unreserved = std::max(max_unreserved, res); + std::lock_guard lock(mutex); + UInt64 res; + for (const auto & reserve : reserved) { + res += reserve.second.reserved_bytes; } - return max_unreserved; + return res; } - static UInt64 getReservedSpace(const String & disk_path) + static UInt64 getAllReservationCount() { std::lock_guard lock(mutex); - return reserved[disk_path].reserved_bytes; - } - - static UInt64 getReservationCount(const String & disk_path) - { - std::lock_guard lock(mutex); - return reserved[disk_path].reservation_count; + UInt64 res; + for (const auto & reserve : reserved) { + res += reserve.second.reservation_count; + } + return res; } /// If not enough (approximately) space, do not reserve. - static ReservationPtr tryToReserve(const String & disk_path, UInt64 size) + static ReservationPtr tryToReserve(const Disk & disk, UInt64 size) { - UInt64 free_bytes = getUnreservedFreeSpace(disk_path); + UInt64 free_bytes = getUnreservedFreeSpace(disk); ///@TODO_IGR ASK twice reservation? if (free_bytes < size) { return {}; } - return std::make_unique(size, &reserved[disk_path], disk_path); + return std::make_unique(size, &reserved[disk.path], disk.path); } private: @@ -199,61 +186,88 @@ private: static std::mutex mutex; }; +class DisksSelector { +public: + DisksSelector() = default; + + DisksSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); + + const Disk & operator[](const String & name) const; + + ///@TODO_IGR REMOVE it + size_t size() const { + return disks.size(); + } + +private: + std::map disks; +}; + class Schema { class Volume { friend class Schema; public: - Volume(std::vector paths_) : paths(std::move(paths_)) + Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DisksSelector & disk_selector); + + Volume(const Volume & other) + : max_data_part_size(other.max_data_part_size), + disks(other.disks), + last_used(0) { } - DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const { - for (size_t i = 0; i != paths.size(); ++i) { - last_used = (last_used + 1) % paths.size(); - auto reservation = DiskSpaceMonitor::tryToReserve(paths[last_used], expected_size); - if (reservation) { - return reservation; - } - } - return {}; - } + bool setDefaultPath(const String & path); + + DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const; + + UInt64 getMaxUnreservedFreeSpace() const; private: - const Strings paths; - mutable size_t last_used = 0; ///@TODO_IGR ASK It is thread safe, but it is not consistent. :( - /// P.S. I do not want to use mutex here + UInt64 max_data_part_size; + + std::vector disks; + mutable std::atomic last_used = 0; ///@TODO_IGR ASK It is thread safe, but it is not consistent. :( + /// P.S. I do not want to use mutex here }; public: - Schema(const std::vector & disks) { - for (const Strings & volume : disks) { - volumes.emplace_back(volume); - } - } + Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DisksSelector & disks); - ///@TODO_IGR ASK maybe iterator without copy? - Strings getFullPaths() const { - Strings res; - for (const auto & volume : volumes) { - std::copy(volume.paths.begin(), volume.paths.end(), std::back_inserter(res)); - } - return res; - } + void setDefaultPath(const String & path); - DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const { - for (auto & volume : volumes) { - auto reservation = volume.reserve(expected_size); - if (reservation) { - return reservation; - } - } - return {}; - } + Strings getFullPaths() const; + + UInt64 getMaxUnreservedFreeSpace() const; + + DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const; private: std::vector volumes; }; +class SchemaSelector { +public: + SchemaSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DisksSelector & disks); + + const Schema & operator[](const String & name) const; + +private: + std::map schemes; +}; + +class MergeTreeStorageConfiguration { +public: + MergeTreeStorageConfiguration(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); + + const Schema & operator[](const String & name) const { + return schema_selector[name]; + } + +private: + DisksSelector disks; + SchemaSelector schema_selector; +}; + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 84b5abdd6e6..c689c74816d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -89,7 +89,7 @@ namespace ErrorCodes MergeTreeData::MergeTreeData( const String & database_, const String & table_, - const Schema & schema_, const ColumnsDescription & columns_, + const ColumnsDescription & columns_, const IndicesDescription & indices_, Context & context_, const String & date_column_name, @@ -110,7 +110,7 @@ MergeTreeData::MergeTreeData( sample_by_ast(sample_by_ast_), require_part_metadata(require_part_metadata_), database_name(database_), table_name(table_), - schema(schema_), + schema(context_.chooseSchema("default")), ///@TODO_IGR Schema name broken_part_callback(broken_part_callback_), log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")), data_parts_by_info(data_parts_indexes.get()), @@ -2425,11 +2425,11 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size) const { std::cerr << "Exp size " << expected_size << std::endl; - constexpr UInt64 SIZE_100MB = 100ull << 20; + constexpr UInt64 SIZE_1MB = 1ull << 20; ///@TODO_IGR ASK Is it OK? constexpr UInt64 MAGIC_CONST = 1; - if (expected_size < SIZE_100MB) { - expected_size = SIZE_100MB; + if (expected_size < SIZE_1MB) { + expected_size = SIZE_1MB; } auto reservation = reserveSpaceAtDisk(expected_size * MAGIC_CONST); if (reservation) { @@ -2437,7 +2437,7 @@ DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expec } throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(expected_size) + " requested, " - + formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getMaxUnreservedFreeSpace()) + " available", + + formatReadableSizeWithBinarySuffix(schema.getMaxUnreservedFreeSpace()) + " available", ErrorCodes::NOT_ENOUGH_SPACE); } @@ -2650,7 +2650,7 @@ DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expect auto reservation = schema.reserve(expected_size); if (reservation) { /// Add path to table at disk - reservation->addEnclosedDirToPath(table_name); ///@TODO_IGR ASK can we use table_name here? Could path be different? + reservation->addEnclosedDirToPath(escapeForFileName(table_name)); ///@TODO_IGR ASK can we use table_name here? Could path be different? } return reservation; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f6726fb8539..dd19952db97 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -304,7 +304,6 @@ public: /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. /// attach - whether the existing table is attached or the new table is created. MergeTreeData(const String & database_, const String & table_, - const Schema & schema_, const ColumnsDescription & columns_, const IndicesDescription & indices_, Context & context_, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index f1e8ca7a104..e14df0681a8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -150,7 +150,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_ data.settings.max_bytes_to_merge_at_max_space_in_pool, static_cast(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge); - return std::min(max_size, static_cast(DiskSpaceMonitor::getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); + return std::min(max_size, static_cast(data.schema.getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); } @@ -290,8 +290,8 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name << " because not enough free space: " << formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved " - << "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace("")) << " reserved in " ///@TODO_IGR ASK RESERVED SPACE ON ALL DISKS? - << DiskSpaceMonitor::getReservationCount("") << " chunks), " + << "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getAllReservedSpace()) << " reserved in " ///@TODO_IGR ASK RESERVED SPACE ON ALL DISKS? + << DiskSpaceMonitor::getAllReservationCount() << " chunks at all disks), " << formatReadableSizeWithBinarySuffix(sum_bytes) << " required now (+" << static_cast((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100) << "% on overhead); suppressing similar warnings for the next hour"); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index a873b650db8..70412366b13 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -64,8 +64,7 @@ StorageMergeTree::StorageMergeTree( bool has_force_restore_data_flag) : path(path_), database_name(database_name_), table_name(table_name_), global_context(context_), background_pool(context_.getBackgroundPool()), - data(database_name, table_name, - Schema(std::vector{{path, "/mnt/data/Data2/"}}), columns_, indices_, ///@TODO_IGR generate Schema from config + data(database_name, table_name, columns_, indices_, context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, merging_params_, settings_, false, attach), reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()), @@ -74,6 +73,10 @@ StorageMergeTree::StorageMergeTree( if (path_.empty()) throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME); + ///@TODO_IGR ASK Set inside MergeTreeData? + /// Is bad for default((( + data.schema.setDefaultPath(path); + data.loadDataParts(has_force_restore_data_flag); if (!attach && !data.getDataParts().empty()) @@ -489,8 +492,7 @@ bool StorageMergeTree::merge( } else { - /// DataPArt can be store only at one disk. Get Max of free space at all disks - UInt64 disk_space = DiskSpaceMonitor::getMaxUnreservedFreeSpace(); ///@TODO_IGR ASK Maybe reserve max space at this disk there and then change to exactly space + UInt64 disk_space = data.schema.getMaxUnreservedFreeSpace(); selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } @@ -580,7 +582,7 @@ bool StorageMergeTree::tryMutatePart() std::optional tagger; { /// DataPArt can be store only at one disk. Get Max of free space at all disks - UInt64 disk_space = DiskSpaceMonitor::getMaxUnreservedFreeSpace(); ///@TODO_IGR ASK Maybe reserve max space at this disk there and then change to exactly space + UInt64 disk_space = data.schema.getMaxUnreservedFreeSpace(); std::lock_guard lock(currently_merging_mutex); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index e237f6fe71e..9e4aa5d8bf7 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -214,8 +214,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name, table_name)), replica_name(global_context.getMacros()->expand(replica_name_, database_name, table_name)), - data(database_name, table_name, - Schema(std::vector{{full_path}}), columns_, indices_, + data(database_name, table_name, columns_, indices_, context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_, sample_by_ast_, merging_params_, settings_, true, attach, [this] (const std::string & name) { enqueuePartForCheck(name); }), @@ -227,6 +226,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( if (path_.empty()) throw Exception("ReplicatedMergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME); + ///@TODO_IGR ASK Set inside MergeTreeData? + data.schema.setDefaultPath(path_); + if (!zookeeper_path.empty() && zookeeper_path.back() == '/') zookeeper_path.resize(zookeeper_path.size() - 1); /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it. @@ -1046,7 +1048,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts); /// Can throw an exception. - DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::tryToReserve(full_path, estimated_space_for_merge); + DiskSpaceMonitor::ReservationPtr reserved_space = data.reserveSpaceForPart(estimated_space_for_merge); if (!reserved_space) { throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX } @@ -1179,7 +1181,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation); /// Can throw an exception. - DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::tryToReserve(full_path, estimated_space_for_result); + DiskSpaceMonitor::ReservationPtr reserved_space = data.reserveSpaceForPart(estimated_space_for_result); if (!reserved_space) { throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX } @@ -3010,7 +3012,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p for (const MergeTreeData::DataPartPtr & part : data_parts) partition_ids.emplace(part->info.partition_id); - UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); + UInt64 disk_space = data.schema.getMaxUnreservedFreeSpace(); for (const String & partition_id : partition_ids) { @@ -3037,7 +3039,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p } else { - UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); + UInt64 disk_space = data.schema.getMaxUnreservedFreeSpace(); String partition_id = data.getPartitionIDFromQuery(partition, query_context); selected = merger_mutator.selectAllPartsToMergeWithinPartition( future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);