diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index a91cc8823c9..33da1ab420f 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include @@ -27,6 +26,7 @@ namespace ErrorCodes extern const int CANNOT_WRITE_TO_OSTREAM; extern const int CHECKSUM_DOESNT_MATCH; extern const int UNKNOWN_TABLE; + extern const int UNKNOWN_ACTION; } namespace DataPartsExchange @@ -52,7 +52,17 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo if (blocker.isCancelled()) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); - String part_name = params.get("part"); + /// "0" for backward compatibility + String protocol_version = params.get("protocol_version", "0"); + + String part_name; + + if (protocol_version == "0") + part_name = params.get("part"); + else if (protocol_version == "1") + part_name = params.get("part_name"); + else + throw Exception("Unsupported protocol version", ErrorCodes::UNKNOWN_ACTION); ///@TODO_IGR ASK Is it true error code? static std::atomic_uint total_sends {0}; @@ -95,7 +105,21 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo MergeTreeData::DataPart::Checksums data_checksums; + if (protocol_version == "1") + { + /// Get size of all files + UInt64 all_part_files_size = 0; + for (const auto &it : checksums.files) + { + String file_name = it.first; + String path = part->getFullPath() + part_name + "/" + file_name; + all_part_files_size += Poco::File(path).getSize(); + } + writeBinary(all_part_files_size, out); + } + writeBinary(checksums.files.size(), out); + for (const auto & it : checksums.files) { String file_name = it.first; @@ -174,9 +198,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( uri.setPort(port); uri.setQueryParameters( { - {"endpoint", getEndpointId(replica_path)}, - {"part", part_name}, - {"compress", "false"} + {"endpoint", getEndpointId(replica_path)}, + {"part_name", part_name}, + {"protocol_version", "1"}, + {"compress", "false"} }); Poco::Net::HTTPBasicCredentials creds{}; @@ -186,22 +211,81 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( creds.setPassword(password); } + bool protocol_error = true; + try + { + PooledReadWriteBufferFromHTTP in{ + uri, + Poco::Net::HTTPRequest::HTTP_POST, + {}, + timeouts, + creds, + DBMS_DEFAULT_BUFFER_SIZE, + data.settings.replicated_max_parallel_fetches_for_host + }; + + UInt64 sum_files_size; + readBinary(sum_files_size, in); + + protocol_error = false; + + auto reservation = data.reserveSpaceForPart(sum_files_size); + return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, reservation, in); + } + catch (...) ///@TODO_IGR catch exception + { + if (!protocol_error) + throw; + } + + /// Protocol error + /// Seems to be replica without protocol_version "1" supporting + /// Try to use old one + Poco::URI uri_v0; + uri_v0.setScheme(interserver_scheme); + uri_v0.setHost(host); + uri_v0.setPort(port); + uri_v0.setQueryParameters( + { + {"endpoint", getEndpointId(replica_path)}, + {"part", part_name}, + {"compress", "false"} + }); + PooledReadWriteBufferFromHTTP in{ - uri, - Poco::Net::HTTPRequest::HTTP_POST, - {}, - timeouts, - creds, - DBMS_DEFAULT_BUFFER_SIZE, - data.settings.replicated_max_parallel_fetches_for_host + uri_v0, + Poco::Net::HTTPRequest::HTTP_POST, + {}, + timeouts, + creds, + DBMS_DEFAULT_BUFFER_SIZE, + data.settings.replicated_max_parallel_fetches_for_host }; + /// We don't know real size of part + auto reservation = data.reserveOnMaxDiskWithoutReservation(); + return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, reservation, in); +} + +MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( + const String & part_name, + const String & replica_path, + bool to_detached, + const String & tmp_prefix_, + const DiskSpaceMonitor::ReservationPtr & reservation, + PooledReadWriteBufferFromHTTP & in) +{ + + size_t files; + readBinary(files, in); + static const String TMP_PREFIX = "tmp_fetch_"; String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; - auto reservation = data.reserveSpaceForPart(0); ///@TODO_IGR ASK What size should be there? + String part_path = data.getFullPathOnDisk(reservation->getDisk()); + String absolute_part_path = part_path + relative_part_path + "/"; Poco::File part_file(absolute_part_path); @@ -217,8 +301,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( new_data_part->relative_path = relative_part_path; new_data_part->is_temp = true; - size_t files; - readBinary(files, in); + MergeTreeData::DataPart::Checksums checksums; for (size_t i = 0; i < files; ++i) { diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.h b/dbms/src/Storages/MergeTree/DataPartsExchange.h index d97687da886..c0c36cf8c37 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.h +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -64,6 +65,14 @@ public: ActionBlocker blocker; private: + MergeTreeData::MutableDataPartPtr downloadPart( + const String & part_name, + const String & replica_path, + bool to_detached, + const String & tmp_prefix_, + const DiskSpaceMonitor::ReservationPtr & reservation, + PooledReadWriteBufferFromHTTP & in); + MergeTreeData & data; Logger * log; }; diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp index 055da025d50..0f2158b957d 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp @@ -183,6 +183,25 @@ DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const return {}; } +DiskSpaceMonitor::ReservationPtr Schema::reserveOnMaxDiskWithoutReservation() const +{ + UInt64 max_space = 0; + DiskPtr max_disk; + for (const auto & volume : volumes) + { + for (const auto &disk : volume.disks) + { + auto avail_space = disk->getAvailableSpace(); + if (avail_space > max_space) + { + max_space = avail_space; + max_disk = disk; + } + } + } + return DiskSpaceMonitor::tryToReserve(max_disk, 0); +} + SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks) { Poco::Util::AbstractConfiguration::Keys keys; diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h index 03c031d318c..b16280b8a3f 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h @@ -324,6 +324,10 @@ public: DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const; + /// Reserves 0 bytes on disk with max available space + /// Do not use this function when it is possible to predict size!!! + DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() const; + private: Volumes volumes; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index ece0452e362..90784230b78 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -605,6 +605,10 @@ public: DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size); + /// Choose disk with max available free space + /// Reserves 0 bytes + DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return schema.reserveOnMaxDiskWithoutReservation(); } + MergeTreeDataFormatVersion format_version; Context global_context; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 915d91da2ad..1f11ef6fd01 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -334,8 +334,10 @@ public: void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) { - auto reservation = reserveSpaceForPart(0); ///@TODO_IGR ASK What expected size of mutated part? what size should we reserve? - MergeTreeMutationEntry entry(commands, getFullPathOnDisk(reservation->getDisk()), insert_increment.get()); + ///@TODO_IGR ASK What should i do here? + /// Choose any disk. + auto disk = schema.getDisks()[0]; + MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get()); String file_name; { std::lock_guard lock(currently_merging_mutex); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 36cb14c17e9..8d30de784e6 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4172,12 +4172,16 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const * Unreliable (there is a race condition) - such a partition may appear a little later. */ Poco::DirectoryIterator dir_end; - for (Poco::DirectoryIterator dir_it{getDataPaths()[0] + "detached/"}; dir_it != dir_end; ++dir_it) ///@TODO IGR + for (const std::string & path : getDataPaths()) { - MergeTreePartInfo part_info; - if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version) - && part_info.partition_id == partition_id) - throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS); + for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it) + { + MergeTreePartInfo part_info; + if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version) + && part_info.partition_id == partition_id) + throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS); + } + } zkutil::Strings replicas;