diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt index 6ca3999ff7f..407e391b445 100644 --- a/cmake/autogenerated_versions.txt +++ b/cmake/autogenerated_versions.txt @@ -1,5 +1,5 @@ # This strings autochanged from release_lib.sh: -SET(VERSION_REVISION 54440) +SET(VERSION_REVISION 54441) SET(VERSION_MAJOR 20) SET(VERSION_MINOR 10) SET(VERSION_PATCH 1) diff --git a/docs/en/engines/table-engines/special/distributed.md b/docs/en/engines/table-engines/special/distributed.md index f03ee25f3b3..b1d741e9e13 100644 --- a/docs/en/engines/table-engines/special/distributed.md +++ b/docs/en/engines/table-engines/special/distributed.md @@ -45,6 +45,18 @@ Clusters are set like this: + + + 1 diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index c8fdde3d3a6..08ded9ed870 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -85,7 +85,12 @@ public: std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i]; connections.emplace_back(std::make_unique( - concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure)); + concurrency, + cur_host, cur_port, + default_database_, user_, password_, + "", /* cluster */ + "", /* cluster_secret */ + "benchmark", Protocol::Compression::Enable, secure)); comparison_info_per_interval.emplace_back(std::make_shared()); comparison_info_total.emplace_back(std::make_shared()); } diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 0c2aca2b3c8..60e29a5306e 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -701,6 +701,8 @@ private: connection_parameters.default_database, connection_parameters.user, connection_parameters.password, + "", /* cluster */ + "", /* cluster_secret */ "client", connection_parameters.compression, connection_parameters.security); diff --git a/programs/client/Suggest.cpp b/programs/client/Suggest.cpp index 738697817c3..ac18a131c3a 100644 --- a/programs/client/Suggest.cpp +++ b/programs/client/Suggest.cpp @@ -26,6 +26,8 @@ void Suggest::load(const ConnectionParameters & connection_parameters, size_t su connection_parameters.default_database, connection_parameters.user, connection_parameters.password, + "" /* cluster */, + "" /* cluster_secret */, "client", connection_parameters.compression, connection_parameters.security); diff --git a/programs/server/config.xml b/programs/server/config.xml index 77b59abd891..f23d29deff0 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -311,6 +311,28 @@ + + + diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index ed27a878b5a..d8fe865136f 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -17,12 +17,15 @@ #include #include #include +#include +#include #include #include #include #include #include #include +#include #if !defined(ARCADIA_BUILD) # include @@ -171,8 +174,26 @@ void Connection::sendHello() // NOTE For backward compatibility of the protocol, client cannot send its version_patch. writeVarUInt(client_revision, *out); writeStringBinary(default_database, *out); - writeStringBinary(user, *out); - writeStringBinary(password, *out); + /// If interserver-secret is used, one do not need password + /// (NOTE we do not check for DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET, since we cannot ignore inter-server secret if it was requested) + if (!cluster_secret.empty()) + { + writeStringBinary(USER_INTERSERVER_MARKER, *out); + writeStringBinary("" /* password */, *out); + +#if USE_SSL + sendClusterNameAndSalt(); +#else + throw Exception( + "Inter-server secret support is disabled, because ClickHouse was built without SSL library", + ErrorCodes::SUPPORT_IS_DISABLED); +#endif + } + else + { + writeStringBinary(user, *out); + writeStringBinary(password, *out); + } out->next(); } @@ -288,6 +309,19 @@ void Connection::forceConnected(const ConnectionTimeouts & timeouts) } } +#if USE_SSL +void Connection::sendClusterNameAndSalt() +{ + pcg64_fast rng(randomSeed()); + UInt64 rand = rng(); + + salt = encodeSHA256(&rand, sizeof(rand)); + + writeStringBinary(cluster, *out); + writeStringBinary(salt, *out); +} +#endif + bool Connection::ping() { // LOG_TRACE(log_wrapper.get(), "Ping"); @@ -406,6 +440,37 @@ void Connection::sendQuery( else writeStringBinary("" /* empty string is a marker of the end of settings */, *out); + /// Interserver secret + if (server_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET) + { + /// Hash + /// + /// Send correct hash only for !INITIAL_QUERY, due to: + /// - this will avoid extra protocol complexity for simplest cases + /// - there is no need in hash for the INITIAL_QUERY anyway + /// (since there is no secure/unsecure changes) + if (client_info && !cluster_secret.empty() && client_info->query_kind != ClientInfo::QueryKind::INITIAL_QUERY) + { +#if USE_SSL + std::string data(salt); + data += cluster_secret; + data += query; + data += query_id; + data += client_info->initial_user; + /// TODO: add source/target host/ip-address + + std::string hash = encodeSHA256(data); + writeStringBinary(hash, *out); +#else + throw Exception( + "Inter-server secret support is disabled, because ClickHouse was built without SSL library", + ErrorCodes::SUPPORT_IS_DISABLED); +#endif + } + else + writeStringBinary("", *out); + } + writeVarUInt(stage, *out); writeVarUInt(static_cast(compression), *out); diff --git a/src/Client/Connection.h b/src/Client/Connection.h index 7019778a2c9..f4c25001f3e 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -83,6 +83,8 @@ public: Connection(const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, + const String & cluster_, + const String & cluster_secret_, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Secure secure_ = Protocol::Secure::Disable, @@ -90,6 +92,8 @@ public: : host(host_), port(port_), default_database(default_database_), user(user_), password(password_), + cluster(cluster_), + cluster_secret(cluster_secret_), client_name(client_name_), compression(compression_), secure(secure_), @@ -191,6 +195,11 @@ private: String user; String password; + /// For inter-server authorization + String cluster; + String cluster_secret; + String salt; + /// Address is resolved during the first connection (or the following reconnects) /// Use it only for logging purposes std::optional current_resolved_address; @@ -269,6 +278,10 @@ private: void connect(const ConnectionTimeouts & timeouts); void sendHello(); void receiveHello(); + +#if USE_SSL + void sendClusterNameAndSalt(); +#endif bool ping(); Block receiveData(); diff --git a/src/Client/ConnectionPool.h b/src/Client/ConnectionPool.h index 95cb81c8052..736075a4cc1 100644 --- a/src/Client/ConnectionPool.h +++ b/src/Client/ConnectionPool.h @@ -54,6 +54,8 @@ public: const String & default_database_, const String & user_, const String & password_, + const String & cluster_, + const String & cluster_secret_, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Secure secure_ = Protocol::Secure::Disable, @@ -65,6 +67,8 @@ public: default_database(default_database_), user(user_), password(password_), + cluster(cluster_), + cluster_secret(cluster_secret_), client_name(client_name_), compression(compression_), secure(secure_), @@ -109,6 +113,7 @@ protected: return std::make_shared( host, port, default_database, user, password, + cluster, cluster_secret, client_name, compression, secure); } @@ -119,6 +124,10 @@ private: String user; String password; + /// For inter-server authorization + String cluster; + String cluster_secret; + String client_name; Protocol::Compression compression; /// Whether to compress data when interacting with the server. Protocol::Secure secure; /// Whether to encrypt data when interacting with the server. diff --git a/src/Common/OpenSSLHelpers.cpp b/src/Common/OpenSSLHelpers.cpp index cfd47c684f3..77abbf99a90 100644 --- a/src/Common/OpenSSLHelpers.cpp +++ b/src/Common/OpenSSLHelpers.cpp @@ -12,11 +12,26 @@ namespace DB { #pragma GCC diagnostic warning "-Wold-style-cast" +std::string encodeSHA256(const std::string_view & text) +{ + return encodeSHA256(text.data(), text.size()); +} +std::string encodeSHA256(const void * text, size_t size) +{ + std::string out; + out.resize(32); + encodeSHA256(text, size, reinterpret_cast(out.data())); + return out; +} void encodeSHA256(const std::string_view & text, unsigned char * out) +{ + encodeSHA256(text.data(), text.size(), out); +} +void encodeSHA256(const void * text, size_t size, unsigned char * out) { SHA256_CTX ctx; SHA256_Init(&ctx); - SHA256_Update(&ctx, reinterpret_cast(text.data()), text.size()); + SHA256_Update(&ctx, reinterpret_cast(text), size); SHA256_Final(out, &ctx); } diff --git a/src/Common/OpenSSLHelpers.h b/src/Common/OpenSSLHelpers.h index 2560664de9e..1058a611ac6 100644 --- a/src/Common/OpenSSLHelpers.h +++ b/src/Common/OpenSSLHelpers.h @@ -10,8 +10,13 @@ namespace DB { -/// Encodes `text` and puts the result to `out` which must be at least 32 bytes long. + +/// Encodes `text` and returns it. +std::string encodeSHA256(const std::string_view & text); +std::string encodeSHA256(const void * text, size_t size); +/// `out` must be at least 32 bytes long. void encodeSHA256(const std::string_view & text, unsigned char * out); +void encodeSHA256(const void * text, size_t size, unsigned char * out); /// Returns concatenation of error strings for all errors that OpenSSL has recorded, emptying the error queue. String getOpenSSLErrors(); diff --git a/src/Core/Defines.h b/src/Core/Defines.h index e244581c339..3a7d29e92b1 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -67,8 +67,11 @@ /// Minimum revision supporting SettingsBinaryFormat::STRINGS. #define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429 +/// Mininum revision supporting interserver secret. +#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441 + /// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change. -#define DBMS_TCP_PROTOCOL_VERSION 54226 +#define DBMS_TCP_PROTOCOL_VERSION 54441 /// The boundary on which the blocks for asynchronous file operations should be aligned. #define DEFAULT_AIO_FILE_BLOCK_SIZE 4096 diff --git a/src/Core/Protocol.h b/src/Core/Protocol.h index 15630d0a6f8..a370a29dac8 100644 --- a/src/Core/Protocol.h +++ b/src/Core/Protocol.h @@ -52,6 +52,10 @@ namespace DB /// Using this block the client can initialize the output formatter and display the prefix of resulting table /// beforehand. +/// Marker of the inter-server secret (passed in the user name) +/// (anyway user cannot be started with a whitespace) +const char USER_INTERSERVER_MARKER[] = " INTERSERVER SECRET "; + namespace Protocol { /// Packet types that server transmits. @@ -71,6 +75,8 @@ namespace Protocol TablesStatusResponse = 9, /// A response to TablesStatus request. Log = 10, /// System logs of the query execution TableColumns = 11, /// Columns' description for default values calculation + + MAX = TableColumns, }; /// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10 @@ -79,9 +85,21 @@ namespace Protocol /// See https://www.securecoding.cert.org/confluence/display/cplusplus/INT36-CPP.+Do+not+use+out-of-range+enumeration+values inline const char * toString(UInt64 packet) { - static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", - "Extremes", "TablesStatusResponse", "Log", "TableColumns" }; - return packet < 12 + static const char * data[] = { + "Hello", + "Data", + "Exception", + "Progress", + "Pong", + "EndOfStream", + "ProfileInfo", + "Totals", + "Extremes", + "TablesStatusResponse", + "Log", + "TableColumns", + }; + return packet <= MAX ? data[packet] : "Unknown packet"; } @@ -113,13 +131,23 @@ namespace Protocol Ping = 4, /// Check that connection to the server is alive. TablesStatusRequest = 5, /// Check status of tables on the server. KeepAlive = 6, /// Keep the connection alive - Scalar = 7 /// A block of data (compressed or not). + Scalar = 7, /// A block of data (compressed or not). + + MAX = Scalar, }; inline const char * toString(UInt64 packet) { - static const char * data[] = { "Hello", "Query", "Data", "Cancel", "Ping", "TablesStatusRequest", "KeepAlive" }; - return packet < 7 + static const char * data[] = { + "Hello", + "Query", + "Data", + "Cancel", + "Ping", + "TablesStatusRequest", + "KeepAlive", + }; + return packet <= MAX ? data[packet] : "Unknown packet"; } diff --git a/src/Dictionaries/ClickHouseDictionarySource.cpp b/src/Dictionaries/ClickHouseDictionarySource.cpp index 4c119e13def..8199b16a94b 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -40,6 +40,8 @@ static ConnectionPoolWithFailoverPtr createPool( db, user, password, + "", /* cluster */ + "", /* cluster_secret */ "ClickHouseDictionarySource", Protocol::Compression::Enable, secure ? Protocol::Secure::Enable : Protocol::Secure::Disable)); diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 6558ebf63d5..b385e74adc5 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB { @@ -73,8 +74,16 @@ bool Cluster::Address::isLocal(UInt16 clickhouse_port) const Cluster::Address::Address( - const Poco::Util::AbstractConfiguration & config, const String & config_prefix, UInt32 shard_index_, UInt32 replica_index_) - : shard_index(shard_index_), replica_index(replica_index_) + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + const String & cluster_, + const String & cluster_secret_, + UInt32 shard_index_, + UInt32 replica_index_) + : cluster(cluster_) + , cluster_secret(cluster_secret_) + , shard_index(shard_index_) + , replica_index(replica_index_) { host_name = config.getString(config_prefix + ".host"); port = static_cast(config.getInt(config_prefix + ".port")); @@ -92,8 +101,15 @@ Cluster::Address::Address( } -Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_, Int64 priority_) - : user(user_), password(password_) +Cluster::Address::Address( + const String & host_port_, + const String & user_, + const String & password_, + UInt16 clickhouse_port, + bool secure_, + Int64 priority_) + : user(user_) + , password(password_) { auto parsed_host_port = parseAddress(host_port_, clickhouse_port); host_name = parsed_host_port.first; @@ -219,9 +235,9 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string) /// Implementation of Clusters class -Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name) +Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix) { - updateClusters(config, settings, config_name); + updateClusters(config, settings, config_prefix); } @@ -241,10 +257,10 @@ void Clusters::setCluster(const String & cluster_name, const std::shared_ptr(config, settings, config_name + "." + key)); + impl.emplace(key, std::make_shared(config, settings, config_prefix, key)); } } @@ -268,18 +284,25 @@ Clusters::Impl Clusters::getContainer() const /// Implementation of `Cluster` class -Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name) +Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, + const Settings & settings, + const String & config_prefix_, + const String & cluster_name) { + auto config_prefix = config_prefix_ + "." + cluster_name; + Poco::Util::AbstractConfiguration::Keys config_keys; - config.keys(cluster_name, config_keys); + config.keys(config_prefix, config_keys); + + config_prefix += "."; + + secret = config.getString(config_prefix + "secret", ""); + boost::range::remove_erase(config_keys, "secret"); if (config_keys.empty()) - throw Exception("No cluster elements (shard, node) specified in config at path " + cluster_name, ErrorCodes::SHARD_HAS_NO_CONNECTIONS); - - const auto & config_prefix = cluster_name + "."; + throw Exception("No cluster elements (shard, node) specified in config at path " + config_prefix, ErrorCodes::SHARD_HAS_NO_CONNECTIONS); UInt32 current_shard_num = 1; - for (const auto & key : config_keys) { if (startsWith(key, "node")) @@ -291,7 +314,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting const auto & prefix = config_prefix + key; const auto weight = config.getInt(prefix + ".weight", default_weight); - addresses.emplace_back(config, prefix, current_shard_num, 1); + addresses.emplace_back(config, prefix, cluster_name, secret, current_shard_num, 1); const auto & address = addresses.back(); ShardInfo info; @@ -305,6 +328,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting settings.distributed_connections_pool_size, address.host_name, address.port, address.default_database, address.user, address.password, + address.cluster, address.cluster_secret, "server", address.compression, address.secure, address.priority); @@ -345,7 +369,12 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting if (startsWith(replica_key, "replica")) { - replica_addresses.emplace_back(config, partial_prefix + replica_key, current_shard_num, current_replica_num); + replica_addresses.emplace_back(config, + partial_prefix + replica_key, + cluster_name, + secret, + current_shard_num, + current_replica_num); ++current_replica_num; if (internal_replication) @@ -379,6 +408,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting settings.distributed_connections_pool_size, replica.host_name, replica.port, replica.default_database, replica.user, replica.password, + replica.cluster, replica.cluster_secret, "server", replica.compression, replica.secure, replica.priority); @@ -442,6 +472,7 @@ Cluster::Cluster(const Settings & settings, const std::vector> & names, const String & username, const String & password, UInt16 clickhouse_port, bool treat_local_as_remote, @@ -62,6 +67,11 @@ public: UInt16 port; String user; String password; + + /// For inter-server authorization + String cluster; + String cluster_secret; + UInt32 shard_index{}; /// shard serial number in configuration file, starting from 1. UInt32 replica_index{}; /// replica serial number in this shard, starting from 1; zero means no replicas. @@ -80,6 +90,8 @@ public: Address( const Poco::Util::AbstractConfiguration & config, const String & config_prefix, + const String & cluster_, + const String & cluster_secret_, UInt32 shard_index_ = 0, UInt32 replica_index_ = 0); Address( @@ -170,6 +182,8 @@ public: /// The number of all shards. size_t getShardCount() const { return shards_info.size(); } + const String & getSecret() const { return secret; } + /// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster. std::unique_ptr getClusterWithSingleShard(size_t index) const; @@ -197,6 +211,9 @@ private: struct ReplicasAsShardsTag {}; Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings); + /// Inter-server secret + String secret; + String hash_of_addresses; /// Description of the cluster shards. ShardsInfo shards_info; @@ -219,7 +236,7 @@ using ClusterPtr = std::shared_ptr; class Clusters { public: - Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers"); + Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix = "remote_servers"); Clusters(const Clusters &) = delete; Clusters & operator=(const Clusters &) = delete; @@ -227,7 +244,7 @@ public: ClusterPtr getCluster(const std::string & cluster_name) const; void setCluster(const String & cluster_name, const ClusterPtr & cluster); - void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name); + void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix); public: using Impl = std::map; @@ -239,6 +256,4 @@ protected: mutable std::mutex mutex; }; -using ClustersPtr = std::shared_ptr; - } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index ab511acda76..ed5b7bb6a48 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -677,7 +677,7 @@ ConfigurationPtr Context::getUsersConfig() } -void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address) +void Context::setUserImpl(const String & name, const std::optional & password, const Poco::Net::SocketAddress & address) { auto lock = getLock(); @@ -686,7 +686,7 @@ void Context::setUser(const String & name, const String & password, const Poco:: #if defined(ARCADIA_BUILD) /// This is harmful field that is used only in foreign "Arcadia" build. - client_info.current_password = password; + client_info.current_password = password.value_or(""); #endif auto new_user_id = getAccessControlManager().find(name); @@ -694,7 +694,9 @@ void Context::setUser(const String & name, const String & password, const Poco:: if (new_user_id) { new_access = getAccessControlManager().getContextAccess(*new_user_id, {}, true, settings, current_database, client_info); - if (!new_access->isClientHostAllowed() || !new_access->isCorrectPassword(password)) + /// Access w/o password is done under interserver-secret (remote_servers.secret) + /// So it is okay not to check client's host (since there is trust). + if (password && (!new_access->isClientHostAllowed() || !new_access->isCorrectPassword(*password))) { new_user_id = {}; new_access = nullptr; @@ -712,6 +714,16 @@ void Context::setUser(const String & name, const String & password, const Poco:: setSettings(*access->getDefaultSettings()); } +void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address) +{ + setUserImpl(name, std::make_optional(password), address); +} + +void Context::setUserWithoutCheckingPassword(const String & name, const Poco::Net::SocketAddress & address) +{ + setUserImpl(name, {} /* no password */, address); +} + std::shared_ptr Context::getUser() const { return getAccess()->getUser(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 7200bf57e6e..bd4289828b1 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -258,6 +258,11 @@ public: /// Sets the current user, checks the password and that the specified host is allowed. /// Must be called before getClientInfo. void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address); + /// Sets the current user, *do not checks the password* but check that the specified host is allowed. + /// Must be called before getClientInfo. + /// + /// (Used only internally in cluster, if the secret matches) + void setUserWithoutCheckingPassword(const String & name, const Poco::Net::SocketAddress & address); void setQuotaKey(String quota_key_); UserPtr getUser() const; @@ -640,6 +645,9 @@ private: StoragePolicySelectorPtr getStoragePolicySelector(std::lock_guard & lock) const; DiskSelectorPtr getDiskSelector(std::lock_guard & /* lock */) const; + + /// If the password is not set, the password will not be checked + void setUserImpl(const String & name, const std::optional & password, const Poco::Net::SocketAddress & address); }; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index e4dcfce9102..d3a5ea38f3f 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,7 @@ namespace ErrorCodes extern const int POCO_EXCEPTION; extern const int SOCKET_TIMEOUT; extern const int UNEXPECTED_PACKET_FROM_CLIENT; + extern const int SUPPORT_IS_DISABLED; } @@ -293,6 +295,12 @@ void TCPHandler::runImpl() if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT) throw; + /// If there is UNEXPECTED_PACKET_FROM_CLIENT emulate network_error + /// to break the loop, but do not throw to send the exception to + /// the client. + if (e.code() == ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT) + network_error = true; + /// If a timeout occurred, try to inform client about it and close the session if (e.code() == ErrorCodes::SOCKET_TIMEOUT) network_error = true; @@ -351,6 +359,8 @@ void TCPHandler::runImpl() tryLogCurrentException(log, "Can't send logs to client"); } + const auto & e = *exception; + LOG_ERROR(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString()); sendException(*exception, send_exception_with_stack_trace); } } @@ -716,7 +726,7 @@ void TCPHandler::receiveHello() { /// Receive `hello` packet. UInt64 packet_type = 0; - String user = "default"; + String user; String password; readVarUInt(packet_type, *in); @@ -747,14 +757,25 @@ void TCPHandler::receiveHello() readStringBinary(user, *in); readStringBinary(password, *in); + if (user.empty()) + throw NetException("Unexpected packet from client (no user in Hello package)", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); + LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.", client_name, client_version_major, client_version_minor, client_version_patch, client_revision, (!default_database.empty() ? ", database: " + default_database : ""), - (!user.empty() ? ", user: " + user : "")); + (!user.empty() ? ", user: " + user : "") + ); - connection_context.setUser(user, password, socket().peerAddress()); + if (user != USER_INTERSERVER_MARKER) + { + connection_context.setUser(user, password, socket().peerAddress()); + } + else + { + receiveClusterNameAndSalt(); + } } @@ -836,6 +857,30 @@ bool TCPHandler::receivePacket() } } +void TCPHandler::receiveClusterNameAndSalt() +{ + readStringBinary(cluster, *in); + readStringBinary(salt, *in, 32); + + try + { + if (salt.empty()) + throw NetException("Empty salt is not allowed", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); + + cluster_secret = query_context->getCluster(cluster)->getSecret(); + } + catch (const Exception & e) + { + try + { + /// We try to send error information to the client. + sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace); + } + catch (...) {} + + throw; + } +} void TCPHandler::receiveQuery() { @@ -873,10 +918,6 @@ void TCPHandler::receiveQuery() client_info.initial_query_id = client_info.current_query_id; client_info.initial_address = client_info.current_address; } - else - { - query_context->setInitialRowPolicy(); - } /// Per query settings are also passed via TCP. /// We need to check them before applying due to they can violate the settings constraints. @@ -884,6 +925,64 @@ void TCPHandler::receiveQuery() : SettingsWriteFormat::BINARY; Settings passed_settings; passed_settings.read(*in, settings_format); + + /// Interserver secret. + std::string received_hash; + if (client_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET) + { + readStringBinary(received_hash, *in, 32); + } + + readVarUInt(stage, *in); + state.stage = QueryProcessingStage::Enum(stage); + + readVarUInt(compression, *in); + state.compression = static_cast(compression); + + readStringBinary(state.query, *in); + + /// It is OK to check only when query != INITIAL_QUERY, + /// since only in that case the actions will be done. + if (!cluster.empty() && client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY) + { +#if USE_SSL + std::string data(salt); + data += cluster_secret; + data += state.query; + data += state.query_id; + data += client_info.initial_user; + + if (received_hash.size() != 32) + throw NetException("Unexpected hash received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); + + std::string calculated_hash = encodeSHA256(data); + + if (calculated_hash != received_hash) + throw NetException("Hash mismatch", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); + /// TODO: change error code? + + /// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka, + /// i.e. when the INSERT is done with the global context (w/o user). + if (!client_info.initial_user.empty()) + { + query_context->setUserWithoutCheckingPassword(client_info.initial_user, socket().peerAddress()); + LOG_DEBUG(log, "User (initial): {}", query_context->getUserName()); + } + /// No need to update connection_context, since it does not requires user (it will not be used for query execution) +#else + throw Exception( + "Inter-server secret support is disabled, because ClickHouse was built without SSL library", + ErrorCodes::SUPPORT_IS_DISABLED); +#endif + } + else + { + query_context->setInitialRowPolicy(); + } + + /// + /// Settings + /// auto settings_changes = passed_settings.changes(); if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) { @@ -897,20 +996,11 @@ void TCPHandler::receiveQuery() } query_context->applySettingsChanges(settings_changes); const Settings & settings = query_context->getSettingsRef(); - /// Sync timeouts on client and server during current query to avoid dangling queries on server /// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter), /// because settings.send_timeout is client-side setting which has opposite meaning on the server side. /// NOTE: these settings are applied only for current connection (not for distributed tables' connections) state.timeout_setter = std::make_unique(socket(), settings.receive_timeout, settings.send_timeout); - - readVarUInt(stage, *in); - state.stage = QueryProcessingStage::Enum(stage); - - readVarUInt(compression, *in); - state.compression = static_cast(compression); - - readStringBinary(state.query, *in); } void TCPHandler::receiveUnexpectedQuery() @@ -929,6 +1019,11 @@ void TCPHandler::receiveUnexpectedQuery() : SettingsWriteFormat::BINARY; skip_settings.read(*in, settings_format); + std::string skip_hash; + bool interserver_secret = client_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET; + if (interserver_secret) + readStringBinary(skip_hash, *in, 32); + readVarUInt(skip_uint_64, *in); readVarUInt(skip_uint_64, *in); readStringBinary(skip_string, *in); diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 3fec89264be..3771755892f 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -97,7 +97,6 @@ struct LastBlockInputParameters Block header; }; - class TCPHandler : public Poco::Net::TCPServerConnection { public: @@ -139,6 +138,12 @@ private: String default_database; + /// For inter-server secret (remote_server.*.secret) + String salt; + String cluster; + String cluster_secret; + + /// At the moment, only one ongoing query in the connection is supported at a time. QueryState state; @@ -187,6 +192,8 @@ private: void sendTotals(const Block & totals); void sendExtremes(const Block & extremes); + void receiveClusterNameAndSalt(); + /// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled. void initBlockInput(); void initBlockOutput(const Block & block); diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index b67d3283ac9..dfb35f62bc4 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -236,8 +236,17 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } return std::make_shared( - 1, address.host_name, address.port, address.default_database, address.user, address.password, - storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure); + 1, /* max_connections */ + address.host_name, + address.port, + address.default_database, + address.user, + address.password, + address.cluster, + address.cluster_secret, + storage.getName() + '_' + address.user, /* client */ + Protocol::Compression::Enable, + address.secure); }; auto pools = createPoolsForAddresses(name, pool_factory); diff --git a/tests/integration/test_allowed_client_hosts/test.py b/tests/integration/test_allowed_client_hosts/test.py index e60e488b3ae..cf83718e25c 100644 --- a/tests/integration/test_allowed_client_hosts/test.py +++ b/tests/integration/test_allowed_client_hosts/test.py @@ -60,7 +60,5 @@ def test_allowed_host(): assert query_from_one_node_to_another(client_node, server, "SELECT * FROM test_table") == "5\n" for client_node in expected_to_fail: - with pytest.raises(Exception) as e: - result = query_from_one_node_to_another(client_node, server, "SELECT * FROM test_table") - print("Client node: {} Server node: {} Result: {}".format(client_node, server_node, result)) - assert "default: Authentication failed" in str(e) + with pytest.raises(Exception, match=r'default: Authentication failed'): + query_from_one_node_to_another(client_node, server, "SELECT * FROM test_table") diff --git a/tests/integration/test_distributed_inter_server_secret/__init__.py b/tests/integration/test_distributed_inter_server_secret/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_distributed_inter_server_secret/configs/remote_servers.xml b/tests/integration/test_distributed_inter_server_secret/configs/remote_servers.xml new file mode 100644 index 00000000000..0ff521ac800 --- /dev/null +++ b/tests/integration/test_distributed_inter_server_secret/configs/remote_servers.xml @@ -0,0 +1,26 @@ + + + + + n1 + 9000 + + + n2 + 9000 + + + + + foo + + n1 + 9000 + + + n2 + 9000 + + + + diff --git a/tests/integration/test_distributed_inter_server_secret/configs/remote_servers_n1.xml b/tests/integration/test_distributed_inter_server_secret/configs/remote_servers_n1.xml new file mode 100644 index 00000000000..70f8cee679a --- /dev/null +++ b/tests/integration/test_distributed_inter_server_secret/configs/remote_servers_n1.xml @@ -0,0 +1,15 @@ + + + + bar_n1 + + n1 + 9000 + + + n2 + 9000 + + + + diff --git a/tests/integration/test_distributed_inter_server_secret/configs/remote_servers_n2.xml b/tests/integration/test_distributed_inter_server_secret/configs/remote_servers_n2.xml new file mode 100644 index 00000000000..316e8a49afa --- /dev/null +++ b/tests/integration/test_distributed_inter_server_secret/configs/remote_servers_n2.xml @@ -0,0 +1,15 @@ + + + + bar_n2 + + n1 + 9000 + + + n2 + 9000 + + + + diff --git a/tests/integration/test_distributed_inter_server_secret/configs/users.xml b/tests/integration/test_distributed_inter_server_secret/configs/users.xml new file mode 100644 index 00000000000..1b012bfea9c --- /dev/null +++ b/tests/integration/test_distributed_inter_server_secret/configs/users.xml @@ -0,0 +1,41 @@ + + + + + + + + + + + + ::/0 + + default + default + + + + + + ::/0 + + default + default + + + + foo + + ::/0 + + default + default + + + + + + + + diff --git a/tests/integration/test_distributed_inter_server_secret/test.py b/tests/integration/test_distributed_inter_server_secret/test.py new file mode 100644 index 00000000000..b39f9dec861 --- /dev/null +++ b/tests/integration/test_distributed_inter_server_secret/test.py @@ -0,0 +1,152 @@ +# pylint: disable=unused-argument +# pylint: disable=redefined-outer-name +# pylint: disable=line-too-long + +import pytest + +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +def make_instance(name, cfg): + return cluster.add_instance(name, + with_zookeeper=True, + main_configs=['configs/remote_servers.xml', cfg], + user_configs=['configs/users.xml']) +# _n1/_n2 contains cluster with different -- should fail +n1 = make_instance('n1', 'configs/remote_servers_n1.xml') +n2 = make_instance('n2', 'configs/remote_servers_n2.xml') + +users = pytest.mark.parametrize('user,password', [ + ('default', '' ), + ('nopass', '' ), + ('pass', 'foo'), +]) + +def bootstrap(): + for n in cluster.instances.values(): + n.query('DROP TABLE IF EXISTS data') + n.query('DROP TABLE IF EXISTS dist') + n.query('CREATE TABLE data (key Int) Engine=Memory()') + n.query(""" + CREATE TABLE dist_insecure AS data + Engine=Distributed(insecure, currentDatabase(), data, key) + """) + n.query(""" + CREATE TABLE dist_secure AS data + Engine=Distributed(secure, currentDatabase(), data, key) + """) + n.query(""" + CREATE TABLE dist_secure_disagree AS data + Engine=Distributed(secure_disagree, currentDatabase(), data, key) + """) + n.query(""" + CREATE TABLE dist_secure_buffer AS dist_secure + Engine=Buffer(currentDatabase(), dist_secure, + /* settings for manual flush only */ + 1, /* num_layers */ + 10e6, /* min_time, placeholder */ + 10e6, /* max_time, placeholder */ + 0, /* min_rows */ + 10e6, /* max_rows */ + 0, /* min_bytes */ + 80e6 /* max_bytes */ + ) + """) + +@pytest.fixture(scope='module', autouse=True) +def start_cluster(): + try: + cluster.start() + bootstrap() + yield cluster + finally: + cluster.shutdown() + +def query_with_id(node, id_, query, **kwargs): + return node.query("WITH '{}' AS __id {}".format(id_, query), **kwargs) + +# @return -- [user, initial_user] +def get_query_user_info(node, query_pattern): + node.query("SYSTEM FLUSH LOGS") + return node.query(""" + SELECT user, initial_user + FROM system.query_log + WHERE + query LIKE '%{}%' AND + query NOT LIKE '%system.query_log%' AND + type = 'QueryFinish' + """.format(query_pattern)).strip().split('\t') + +def test_insecure(): + n1.query('SELECT * FROM dist_insecure') + +def test_insecure_insert_async(): + n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)') + n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER insecure dist_insecure') + assert int(n1.query('SELECT count() FROM dist_insecure')) == 2 + n1.query('TRUNCATE TABLE data ON CLUSTER insecure') + +def test_insecure_insert_sync(): + n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1}) + assert int(n1.query('SELECT count() FROM dist_insecure')) == 2 + n1.query('TRUNCATE TABLE data ON CLUSTER secure') + +def test_secure(): + n1.query('SELECT * FROM dist_secure') + +def test_secure_insert_async(): + n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)') + n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure') + assert int(n1.query('SELECT count() FROM dist_secure')) == 2 + n1.query('TRUNCATE TABLE data ON CLUSTER secure') + +def test_secure_insert_sync(): + n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1}) + assert int(n1.query('SELECT count() FROM dist_secure')) == 2 + n1.query('TRUNCATE TABLE data ON CLUSTER secure') + +# INSERT w/o initial_user +# +# Buffer() flush happens with global context, that does not have user +# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty +def test_secure_insert_buffer_async(): + n1.query('INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)') + n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure') + # no Buffer flush happened + assert int(n1.query('SELECT count() FROM dist_secure')) == 0 + n1.query('OPTIMIZE TABLE dist_secure_buffer') + # manual flush + n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure') + assert int(n1.query('SELECT count() FROM dist_secure')) == 2 + n1.query('TRUNCATE TABLE data ON CLUSTER secure') + +def test_secure_disagree(): + with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'): + n1.query('SELECT * FROM dist_secure_disagree') + +def test_secure_disagree_insert(): + n1.query('INSERT INTO dist_secure_disagree SELECT * FROM numbers(2)') + with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'): + n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure_disagree dist_secure_disagree') + # check the the connection will be re-established + # IOW that we will not get "Unknown BlockInfo field" + with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'): + assert int(n1.query('SELECT count() FROM dist_secure_disagree')) == 0 + +@users +def test_user_insecure_cluster(user, password): + id_ = 'query-dist_insecure-' + user + query_with_id(n1, id_, 'SELECT * FROM dist_insecure', user=user, password=password) + assert get_query_user_info(n1, id_) == [user, user] # due to prefer_localhost_replica + assert get_query_user_info(n2, id_) == ['default', user] + +@users +def test_user_secure_cluster(user, password): + id_ = 'query-dist_secure-' + user + query_with_id(n1, id_, 'SELECT * FROM dist_secure', user=user, password=password) + assert get_query_user_info(n1, id_) == [user, user] + assert get_query_user_info(n2, id_) == [user, user] + +# TODO: check user for INSERT diff --git a/tests/queries/0_stateless/01308_row_policy_and_trivial_count_query.sql b/tests/queries/0_stateless/01308_row_policy_and_trivial_count_query.sql index c105885cb60..cd41bb227eb 100644 --- a/tests/queries/0_stateless/01308_row_policy_and_trivial_count_query.sql +++ b/tests/queries/0_stateless/01308_row_policy_and_trivial_count_query.sql @@ -4,6 +4,7 @@ CREATE TABLE t (x UInt8) ENGINE = MergeTree ORDER BY x; INSERT INTO t VALUES (1), (2), (3); SELECT count() FROM t; +DROP ROW POLICY IF EXISTS filter ON t; CREATE ROW POLICY filter ON t USING (x % 2 = 1) TO ALL; SELECT count() FROM t; DROP ROW POLICY filter ON t;