diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp index 16a84b4b2f6..d0f4b50fa34 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp @@ -14,7 +14,9 @@ void ReplicatedMergeTreeAddress::writeText(WriteBuffer & out) const << "port: " << replication_port << '\n' << "tcp_port: " << queries_port << '\n' << "database: " << escape << database << '\n' - << "table: " << escape << table << '\n'; + << "table: " << escape << table << '\n' + << "scheme: " << escape << scheme << '\n'; + } void ReplicatedMergeTreeAddress::readText(ReadBuffer & in) @@ -25,6 +27,11 @@ void ReplicatedMergeTreeAddress::readText(ReadBuffer & in) >> "tcp_port: " >> queries_port >> "\n" >> "database: " >> escape >> database >> "\n" >> "table: " >> escape >> table >> "\n"; + + if (!in.eof()) + in >> "scheme: " >> escape >> scheme >> "\n"; + else + scheme = "http"; } String ReplicatedMergeTreeAddress::toString() const diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h index b50ec72f3a5..2a620515278 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h @@ -16,6 +16,7 @@ struct ReplicatedMergeTreeAddress UInt16 queries_port; String database; String table; + String scheme; ReplicatedMergeTreeAddress() = default; explicit ReplicatedMergeTreeAddress(const String & str) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index dd9af0e7639..0ef02f813f7 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1974,6 +1974,9 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) auto [user, password] = context.getInterserverCredentials(); String interserver_scheme = context.getInterserverScheme(); + if (interserver_scheme != address.scheme) + throw Exception("Interserver schemes are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); + part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, replica_path, address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_"); @@ -2713,6 +2716,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin try { + if (interserver_scheme != address.scheme) + throw Exception("Interserver schemes are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); + part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, interserver_scheme, to_detached); if (!to_detached) @@ -4596,6 +4602,7 @@ ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAdd res.queries_port = context.getTCPPort(); res.database = database_name; res.table = table_name; + res.scheme = context.getInterserverScheme(); return res; }