Merge pull request #13156 from azat/cluster-secure

Secure inter-cluster query execution (with initial_user as current query user) [v3]
This commit is contained in:
Vitaly Baranov 2020-09-17 17:11:00 +03:00 committed by GitHub
commit 3356d75b23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 671 additions and 61 deletions

View File

@ -1,5 +1,5 @@
# This strings autochanged from release_lib.sh:
SET(VERSION_REVISION 54440)
SET(VERSION_REVISION 54441)
SET(VERSION_MAJOR 20)
SET(VERSION_MINOR 10)
SET(VERSION_PATCH 1)

View File

@ -45,6 +45,18 @@ Clusters are set like this:
<remote_servers>
<logs>
<shard>
<!-- Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)
If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.
And also (and which is more important), the initial_user will
be used as current user for the query.
-->
<!-- <secret></secret> -->
<!-- Optional. Shard weight when writing data. Default: 1. -->
<weight>1</weight>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->

View File

@ -85,7 +85,12 @@ public:
std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i];
connections.emplace_back(std::make_unique<ConnectionPool>(
concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure));
concurrency,
cur_host, cur_port,
default_database_, user_, password_,
"", /* cluster */
"", /* cluster_secret */
"benchmark", Protocol::Compression::Enable, secure));
comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
comparison_info_total.emplace_back(std::make_shared<Stats>());
}

View File

@ -701,6 +701,8 @@ private:
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"", /* cluster */
"", /* cluster_secret */
"client",
connection_parameters.compression,
connection_parameters.security);

View File

@ -26,6 +26,8 @@ void Suggest::load(const ConnectionParameters & connection_parameters, size_t su
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"" /* cluster */,
"" /* cluster_secret */,
"client",
connection_parameters.compression,
connection_parameters.security);

View File

@ -311,6 +311,28 @@
<remote_servers incl="clickhouse_remote_servers" >
<!-- Test only shard config for testing distributed storage -->
<test_shard_localhost>
<!-- Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)
If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.
And also (and which is more important), the initial_user will
be used as current user for the query.
Right now the protocol is pretty simple and it only takes into account:
- cluster name
- query
Also it will be nice if the following will be implemented:
- source hostname (see interserver_http_host), but then it will depends from DNS,
it can use IP address instead, but then the you need to get correct on the initiator node.
- target hostname / ip address (same notes as for source hostname)
- time-based security tokens
-->
<!-- <secret></secret> -->
<shard>
<!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
<!-- <internal_replication>false</internal_replication> -->

View File

@ -17,12 +17,15 @@
#include <Common/CurrentMetrics.h>
#include <Common/DNSResolver.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/randomSeed.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ConcatProcessor.h>
#include <pcg_random.hpp>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
@ -171,8 +174,26 @@ void Connection::sendHello()
// NOTE For backward compatibility of the protocol, client cannot send its version_patch.
writeVarUInt(client_revision, *out);
writeStringBinary(default_database, *out);
writeStringBinary(user, *out);
writeStringBinary(password, *out);
/// If interserver-secret is used, one do not need password
/// (NOTE we do not check for DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET, since we cannot ignore inter-server secret if it was requested)
if (!cluster_secret.empty())
{
writeStringBinary(USER_INTERSERVER_MARKER, *out);
writeStringBinary("" /* password */, *out);
#if USE_SSL
sendClusterNameAndSalt();
#else
throw Exception(
"Inter-server secret support is disabled, because ClickHouse was built without SSL library",
ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
else
{
writeStringBinary(user, *out);
writeStringBinary(password, *out);
}
out->next();
}
@ -288,6 +309,19 @@ void Connection::forceConnected(const ConnectionTimeouts & timeouts)
}
}
#if USE_SSL
void Connection::sendClusterNameAndSalt()
{
pcg64_fast rng(randomSeed());
UInt64 rand = rng();
salt = encodeSHA256(&rand, sizeof(rand));
writeStringBinary(cluster, *out);
writeStringBinary(salt, *out);
}
#endif
bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping");
@ -406,6 +440,37 @@ void Connection::sendQuery(
else
writeStringBinary("" /* empty string is a marker of the end of settings */, *out);
/// Interserver secret
if (server_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
{
/// Hash
///
/// Send correct hash only for !INITIAL_QUERY, due to:
/// - this will avoid extra protocol complexity for simplest cases
/// - there is no need in hash for the INITIAL_QUERY anyway
/// (since there is no secure/unsecure changes)
if (client_info && !cluster_secret.empty() && client_info->query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
{
#if USE_SSL
std::string data(salt);
data += cluster_secret;
data += query;
data += query_id;
data += client_info->initial_user;
/// TODO: add source/target host/ip-address
std::string hash = encodeSHA256(data);
writeStringBinary(hash, *out);
#else
throw Exception(
"Inter-server secret support is disabled, because ClickHouse was built without SSL library",
ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
else
writeStringBinary("", *out);
}
writeVarUInt(stage, *out);
writeVarUInt(static_cast<bool>(compression), *out);

View File

@ -83,6 +83,8 @@ public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable,
@ -90,6 +92,8 @@ public:
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
@ -191,6 +195,11 @@ private:
String user;
String password;
/// For inter-server authorization
String cluster;
String cluster_secret;
String salt;
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes
std::optional<Poco::Net::SocketAddress> current_resolved_address;
@ -269,6 +278,10 @@ private:
void connect(const ConnectionTimeouts & timeouts);
void sendHello();
void receiveHello();
#if USE_SSL
void sendClusterNameAndSalt();
#endif
bool ping();
Block receiveData();

View File

@ -54,6 +54,8 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable,
@ -65,6 +67,8 @@ public:
default_database(default_database_),
user(user_),
password(password_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
@ -109,6 +113,7 @@ protected:
return std::make_shared<Connection>(
host, port,
default_database, user, password,
cluster, cluster_secret,
client_name, compression, secure);
}
@ -119,6 +124,10 @@ private:
String user;
String password;
/// For inter-server authorization
String cluster;
String cluster_secret;
String client_name;
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.

View File

@ -12,11 +12,26 @@ namespace DB
{
#pragma GCC diagnostic warning "-Wold-style-cast"
std::string encodeSHA256(const std::string_view & text)
{
return encodeSHA256(text.data(), text.size());
}
std::string encodeSHA256(const void * text, size_t size)
{
std::string out;
out.resize(32);
encodeSHA256(text, size, reinterpret_cast<unsigned char *>(out.data()));
return out;
}
void encodeSHA256(const std::string_view & text, unsigned char * out)
{
encodeSHA256(text.data(), text.size(), out);
}
void encodeSHA256(const void * text, size_t size, unsigned char * out)
{
SHA256_CTX ctx;
SHA256_Init(&ctx);
SHA256_Update(&ctx, reinterpret_cast<const UInt8 *>(text.data()), text.size());
SHA256_Update(&ctx, reinterpret_cast<const UInt8 *>(text), size);
SHA256_Final(out, &ctx);
}

View File

@ -10,8 +10,13 @@
namespace DB
{
/// Encodes `text` and puts the result to `out` which must be at least 32 bytes long.
/// Encodes `text` and returns it.
std::string encodeSHA256(const std::string_view & text);
std::string encodeSHA256(const void * text, size_t size);
/// `out` must be at least 32 bytes long.
void encodeSHA256(const std::string_view & text, unsigned char * out);
void encodeSHA256(const void * text, size_t size, unsigned char * out);
/// Returns concatenation of error strings for all errors that OpenSSL has recorded, emptying the error queue.
String getOpenSSLErrors();

View File

@ -67,8 +67,11 @@
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
/// Mininum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226
#define DBMS_TCP_PROTOCOL_VERSION 54441
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -52,6 +52,10 @@ namespace DB
/// Using this block the client can initialize the output formatter and display the prefix of resulting table
/// beforehand.
/// Marker of the inter-server secret (passed in the user name)
/// (anyway user cannot be started with a whitespace)
const char USER_INTERSERVER_MARKER[] = " INTERSERVER SECRET ";
namespace Protocol
{
/// Packet types that server transmits.
@ -71,6 +75,8 @@ namespace Protocol
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
MAX = TableColumns,
};
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
@ -79,9 +85,21 @@ namespace Protocol
/// See https://www.securecoding.cert.org/confluence/display/cplusplus/INT36-CPP.+Do+not+use+out-of-range+enumeration+values
inline const char * toString(UInt64 packet)
{
static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals",
"Extremes", "TablesStatusResponse", "Log", "TableColumns" };
return packet < 12
static const char * data[] = {
"Hello",
"Data",
"Exception",
"Progress",
"Pong",
"EndOfStream",
"ProfileInfo",
"Totals",
"Extremes",
"TablesStatusResponse",
"Log",
"TableColumns",
};
return packet <= MAX
? data[packet]
: "Unknown packet";
}
@ -113,13 +131,23 @@ namespace Protocol
Ping = 4, /// Check that connection to the server is alive.
TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6, /// Keep the connection alive
Scalar = 7 /// A block of data (compressed or not).
Scalar = 7, /// A block of data (compressed or not).
MAX = Scalar,
};
inline const char * toString(UInt64 packet)
{
static const char * data[] = { "Hello", "Query", "Data", "Cancel", "Ping", "TablesStatusRequest", "KeepAlive" };
return packet < 7
static const char * data[] = {
"Hello",
"Query",
"Data",
"Cancel",
"Ping",
"TablesStatusRequest",
"KeepAlive",
};
return packet <= MAX
? data[packet]
: "Unknown packet";
}

View File

@ -40,6 +40,8 @@ static ConnectionPoolWithFailoverPtr createPool(
db,
user,
password,
"", /* cluster */
"", /* cluster_secret */
"ClickHouseDictionarySource",
Protocol::Compression::Enable,
secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));

View File

@ -11,6 +11,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <ext/range.h>
#include <boost/range/algorithm_ext/erase.hpp>
namespace DB
{
@ -73,8 +74,16 @@ bool Cluster::Address::isLocal(UInt16 clickhouse_port) const
Cluster::Address::Address(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix, UInt32 shard_index_, UInt32 replica_index_)
: shard_index(shard_index_), replica_index(replica_index_)
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const String & cluster_,
const String & cluster_secret_,
UInt32 shard_index_,
UInt32 replica_index_)
: cluster(cluster_)
, cluster_secret(cluster_secret_)
, shard_index(shard_index_)
, replica_index(replica_index_)
{
host_name = config.getString(config_prefix + ".host");
port = static_cast<UInt16>(config.getInt(config_prefix + ".port"));
@ -92,8 +101,15 @@ Cluster::Address::Address(
}
Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_, Int64 priority_)
: user(user_), password(password_)
Cluster::Address::Address(
const String & host_port_,
const String & user_,
const String & password_,
UInt16 clickhouse_port,
bool secure_,
Int64 priority_)
: user(user_)
, password(password_)
{
auto parsed_host_port = parseAddress(host_port_, clickhouse_port);
host_name = parsed_host_port.first;
@ -219,9 +235,9 @@ Cluster::Address Cluster::Address::fromFullString(const String & full_string)
/// Implementation of Clusters class
Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
Clusters::Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix)
{
updateClusters(config, settings, config_name);
updateClusters(config, settings, config_prefix);
}
@ -241,10 +257,10 @@ void Clusters::setCluster(const String & cluster_name, const std::shared_ptr<Clu
}
void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
config.keys(config_prefix, config_keys);
std::lock_guard lock(mutex);
@ -254,7 +270,7 @@ void Clusters::updateClusters(const Poco::Util::AbstractConfiguration & config,
if (key.find('.') != String::npos)
throw Exception("Cluster names with dots are not supported: '" + key + "'", ErrorCodes::SYNTAX_ERROR);
impl.emplace(key, std::make_shared<Cluster>(config, settings, config_name + "." + key));
impl.emplace(key, std::make_shared<Cluster>(config, settings, config_prefix, key));
}
}
@ -268,18 +284,25 @@ Clusters::Impl Clusters::getContainer() const
/// Implementation of `Cluster` class
Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
const Settings & settings,
const String & config_prefix_,
const String & cluster_name)
{
auto config_prefix = config_prefix_ + "." + cluster_name;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);
config.keys(config_prefix, config_keys);
config_prefix += ".";
secret = config.getString(config_prefix + "secret", "");
boost::range::remove_erase(config_keys, "secret");
if (config_keys.empty())
throw Exception("No cluster elements (shard, node) specified in config at path " + cluster_name, ErrorCodes::SHARD_HAS_NO_CONNECTIONS);
const auto & config_prefix = cluster_name + ".";
throw Exception("No cluster elements (shard, node) specified in config at path " + config_prefix, ErrorCodes::SHARD_HAS_NO_CONNECTIONS);
UInt32 current_shard_num = 1;
for (const auto & key : config_keys)
{
if (startsWith(key, "node"))
@ -291,7 +314,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
const auto & prefix = config_prefix + key;
const auto weight = config.getInt(prefix + ".weight", default_weight);
addresses.emplace_back(config, prefix, current_shard_num, 1);
addresses.emplace_back(config, prefix, cluster_name, secret, current_shard_num, 1);
const auto & address = addresses.back();
ShardInfo info;
@ -305,6 +328,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
address.host_name, address.port,
address.default_database, address.user, address.password,
address.cluster, address.cluster_secret,
"server", address.compression,
address.secure, address.priority);
@ -345,7 +369,12 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
if (startsWith(replica_key, "replica"))
{
replica_addresses.emplace_back(config, partial_prefix + replica_key, current_shard_num, current_replica_num);
replica_addresses.emplace_back(config,
partial_prefix + replica_key,
cluster_name,
secret,
current_shard_num,
current_replica_num);
++current_replica_num;
if (internal_replication)
@ -379,6 +408,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
replica.cluster, replica.cluster_secret,
"server", replica.compression,
replica.secure, replica.priority);
@ -442,6 +472,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
replica.cluster, replica.cluster_secret,
"server", replica.compression, replica.secure, replica.priority);
all_replicas.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
@ -546,6 +577,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
address.default_database,
address.user,
address.password,
address.cluster,
address.cluster_secret,
"server",
address.compression,
address.secure,

View File

@ -20,12 +20,17 @@ namespace ErrorCodes
class Cluster
{
public:
Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);
Cluster(const Poco::Util::AbstractConfiguration & config,
const Settings & settings,
const String & config_prefix_,
const String & cluster_name);
/// Construct a cluster by the names of shards and replicas.
/// Local are treated as well as remote ones if treat_local_as_remote is true.
/// 'clickhouse_port' - port that this server instance listen for queries.
/// This parameter is needed only to check that some address is local (points to ourself).
///
/// Used for remote() function.
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password,
UInt16 clickhouse_port, bool treat_local_as_remote,
@ -62,6 +67,11 @@ public:
UInt16 port;
String user;
String password;
/// For inter-server authorization
String cluster;
String cluster_secret;
UInt32 shard_index{}; /// shard serial number in configuration file, starting from 1.
UInt32 replica_index{}; /// replica serial number in this shard, starting from 1; zero means no replicas.
@ -80,6 +90,8 @@ public:
Address(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const String & cluster_,
const String & cluster_secret_,
UInt32 shard_index_ = 0,
UInt32 replica_index_ = 0);
Address(
@ -170,6 +182,8 @@ public:
/// The number of all shards.
size_t getShardCount() const { return shards_info.size(); }
const String & getSecret() const { return secret; }
/// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster.
std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
@ -197,6 +211,9 @@ private:
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings);
/// Inter-server secret
String secret;
String hash_of_addresses;
/// Description of the cluster shards.
ShardsInfo shards_info;
@ -219,7 +236,7 @@ using ClusterPtr = std::shared_ptr<Cluster>;
class Clusters
{
public:
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix = "remote_servers");
Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;
@ -227,7 +244,7 @@ public:
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_prefix);
public:
using Impl = std::map<String, ClusterPtr>;
@ -239,6 +256,4 @@ protected:
mutable std::mutex mutex;
};
using ClustersPtr = std::shared_ptr<Clusters>;
}

View File

@ -677,7 +677,7 @@ ConfigurationPtr Context::getUsersConfig()
}
void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address)
void Context::setUserImpl(const String & name, const std::optional<String> & password, const Poco::Net::SocketAddress & address)
{
auto lock = getLock();
@ -686,7 +686,7 @@ void Context::setUser(const String & name, const String & password, const Poco::
#if defined(ARCADIA_BUILD)
/// This is harmful field that is used only in foreign "Arcadia" build.
client_info.current_password = password;
client_info.current_password = password.value_or("");
#endif
auto new_user_id = getAccessControlManager().find<User>(name);
@ -694,7 +694,9 @@ void Context::setUser(const String & name, const String & password, const Poco::
if (new_user_id)
{
new_access = getAccessControlManager().getContextAccess(*new_user_id, {}, true, settings, current_database, client_info);
if (!new_access->isClientHostAllowed() || !new_access->isCorrectPassword(password))
/// Access w/o password is done under interserver-secret (remote_servers.secret)
/// So it is okay not to check client's host (since there is trust).
if (password && (!new_access->isClientHostAllowed() || !new_access->isCorrectPassword(*password)))
{
new_user_id = {};
new_access = nullptr;
@ -712,6 +714,16 @@ void Context::setUser(const String & name, const String & password, const Poco::
setSettings(*access->getDefaultSettings());
}
void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address)
{
setUserImpl(name, std::make_optional(password), address);
}
void Context::setUserWithoutCheckingPassword(const String & name, const Poco::Net::SocketAddress & address)
{
setUserImpl(name, {} /* no password */, address);
}
std::shared_ptr<const User> Context::getUser() const
{
return getAccess()->getUser();

View File

@ -258,6 +258,11 @@ public:
/// Sets the current user, checks the password and that the specified host is allowed.
/// Must be called before getClientInfo.
void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address);
/// Sets the current user, *do not checks the password* but check that the specified host is allowed.
/// Must be called before getClientInfo.
///
/// (Used only internally in cluster, if the secret matches)
void setUserWithoutCheckingPassword(const String & name, const Poco::Net::SocketAddress & address);
void setQuotaKey(String quota_key_);
UserPtr getUser() const;
@ -640,6 +645,9 @@ private:
StoragePolicySelectorPtr getStoragePolicySelector(std::lock_guard<std::mutex> & lock) const;
DiskSelectorPtr getDiskSelector(std::lock_guard<std::mutex> & /* lock */) const;
/// If the password is not set, the password will not be checked
void setUserImpl(const String & name, const std::optional<String> & password, const Poco::Net::SocketAddress & address);
};

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
#include <Common/setThreadName.h>
#include <Common/OpenSSLHelpers.h>
#include <IO/Progress.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
@ -51,6 +52,7 @@ namespace ErrorCodes
extern const int POCO_EXCEPTION;
extern const int SOCKET_TIMEOUT;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int SUPPORT_IS_DISABLED;
}
@ -293,6 +295,12 @@ void TCPHandler::runImpl()
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
/// If there is UNEXPECTED_PACKET_FROM_CLIENT emulate network_error
/// to break the loop, but do not throw to send the exception to
/// the client.
if (e.code() == ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT)
network_error = true;
/// If a timeout occurred, try to inform client about it and close the session
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
network_error = true;
@ -351,6 +359,8 @@ void TCPHandler::runImpl()
tryLogCurrentException(log, "Can't send logs to client");
}
const auto & e = *exception;
LOG_ERROR(log, "Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
sendException(*exception, send_exception_with_stack_trace);
}
}
@ -716,7 +726,7 @@ void TCPHandler::receiveHello()
{
/// Receive `hello` packet.
UInt64 packet_type = 0;
String user = "default";
String user;
String password;
readVarUInt(packet_type, *in);
@ -747,14 +757,25 @@ void TCPHandler::receiveHello()
readStringBinary(user, *in);
readStringBinary(password, *in);
if (user.empty())
throw NetException("Unexpected packet from client (no user in Hello package)", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.",
client_name,
client_version_major, client_version_minor, client_version_patch,
client_revision,
(!default_database.empty() ? ", database: " + default_database : ""),
(!user.empty() ? ", user: " + user : ""));
(!user.empty() ? ", user: " + user : "")
);
connection_context.setUser(user, password, socket().peerAddress());
if (user != USER_INTERSERVER_MARKER)
{
connection_context.setUser(user, password, socket().peerAddress());
}
else
{
receiveClusterNameAndSalt();
}
}
@ -836,6 +857,30 @@ bool TCPHandler::receivePacket()
}
}
void TCPHandler::receiveClusterNameAndSalt()
{
readStringBinary(cluster, *in);
readStringBinary(salt, *in, 32);
try
{
if (salt.empty())
throw NetException("Empty salt is not allowed", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
cluster_secret = query_context->getCluster(cluster)->getSecret();
}
catch (const Exception & e)
{
try
{
/// We try to send error information to the client.
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
}
catch (...) {}
throw;
}
}
void TCPHandler::receiveQuery()
{
@ -873,10 +918,6 @@ void TCPHandler::receiveQuery()
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
else
{
query_context->setInitialRowPolicy();
}
/// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints.
@ -884,6 +925,64 @@ void TCPHandler::receiveQuery()
: SettingsWriteFormat::BINARY;
Settings passed_settings;
passed_settings.read(*in, settings_format);
/// Interserver secret.
std::string received_hash;
if (client_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET)
{
readStringBinary(received_hash, *in, 32);
}
readVarUInt(stage, *in);
state.stage = QueryProcessingStage::Enum(stage);
readVarUInt(compression, *in);
state.compression = static_cast<Protocol::Compression>(compression);
readStringBinary(state.query, *in);
/// It is OK to check only when query != INITIAL_QUERY,
/// since only in that case the actions will be done.
if (!cluster.empty() && client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
{
#if USE_SSL
std::string data(salt);
data += cluster_secret;
data += state.query;
data += state.query_id;
data += client_info.initial_user;
if (received_hash.size() != 32)
throw NetException("Unexpected hash received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
std::string calculated_hash = encodeSHA256(data);
if (calculated_hash != received_hash)
throw NetException("Hash mismatch", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
/// TODO: change error code?
/// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka,
/// i.e. when the INSERT is done with the global context (w/o user).
if (!client_info.initial_user.empty())
{
query_context->setUserWithoutCheckingPassword(client_info.initial_user, socket().peerAddress());
LOG_DEBUG(log, "User (initial): {}", query_context->getUserName());
}
/// No need to update connection_context, since it does not requires user (it will not be used for query execution)
#else
throw Exception(
"Inter-server secret support is disabled, because ClickHouse was built without SSL library",
ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
else
{
query_context->setInitialRowPolicy();
}
///
/// Settings
///
auto settings_changes = passed_settings.changes();
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
@ -897,20 +996,11 @@ void TCPHandler::receiveQuery()
}
query_context->applySettingsChanges(settings_changes);
const Settings & settings = query_context->getSettingsRef();
/// Sync timeouts on client and server during current query to avoid dangling queries on server
/// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),
/// because settings.send_timeout is client-side setting which has opposite meaning on the server side.
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.receive_timeout, settings.send_timeout);
readVarUInt(stage, *in);
state.stage = QueryProcessingStage::Enum(stage);
readVarUInt(compression, *in);
state.compression = static_cast<Protocol::Compression>(compression);
readStringBinary(state.query, *in);
}
void TCPHandler::receiveUnexpectedQuery()
@ -929,6 +1019,11 @@ void TCPHandler::receiveUnexpectedQuery()
: SettingsWriteFormat::BINARY;
skip_settings.read(*in, settings_format);
std::string skip_hash;
bool interserver_secret = client_revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET;
if (interserver_secret)
readStringBinary(skip_hash, *in, 32);
readVarUInt(skip_uint_64, *in);
readVarUInt(skip_uint_64, *in);
readStringBinary(skip_string, *in);

View File

@ -97,7 +97,6 @@ struct LastBlockInputParameters
Block header;
};
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
@ -139,6 +138,12 @@ private:
String default_database;
/// For inter-server secret (remote_server.*.secret)
String salt;
String cluster;
String cluster_secret;
/// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state;
@ -187,6 +192,8 @@ private:
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
void receiveClusterNameAndSalt();
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
void initBlockOutput(const Block & block);

View File

@ -236,8 +236,17 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
return std::make_shared<ConnectionPool>(
1, address.host_name, address.port, address.default_database, address.user, address.password,
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
1, /* max_connections */
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
address.cluster,
address.cluster_secret,
storage.getName() + '_' + address.user, /* client */
Protocol::Compression::Enable,
address.secure);
};
auto pools = createPoolsForAddresses(name, pool_factory);

View File

@ -60,7 +60,5 @@ def test_allowed_host():
assert query_from_one_node_to_another(client_node, server, "SELECT * FROM test_table") == "5\n"
for client_node in expected_to_fail:
with pytest.raises(Exception) as e:
result = query_from_one_node_to_another(client_node, server, "SELECT * FROM test_table")
print("Client node: {} Server node: {} Result: {}".format(client_node, server_node, result))
assert "default: Authentication failed" in str(e)
with pytest.raises(Exception, match=r'default: Authentication failed'):
query_from_one_node_to_another(client_node, server, "SELECT * FROM test_table")

View File

@ -0,0 +1,26 @@
<yandex>
<remote_servers>
<insecure>
<node>
<host>n1</host>
<port>9000</port>
</node>
<node>
<host>n2</host>
<port>9000</port>
</node>
</insecure>
<secure>
<secret>foo</secret>
<node>
<host>n1</host>
<port>9000</port>
</node>
<node>
<host>n2</host>
<port>9000</port>
</node>
</secure>
</remote_servers>
</yandex>

View File

@ -0,0 +1,15 @@
<yandex>
<remote_servers>
<secure_disagree>
<secret>bar_n1</secret>
<node>
<host>n1</host>
<port>9000</port>
</node>
<node>
<host>n2</host>
<port>9000</port>
</node>
</secure_disagree>
</remote_servers>
</yandex>

View File

@ -0,0 +1,15 @@
<yandex>
<remote_servers>
<secure_disagree>
<secret>bar_n2</secret>
<node>
<host>n1</host>
<port>9000</port>
</node>
<node>
<host>n2</host>
<port>9000</port>
</node>
</secure_disagree>
</remote_servers>
</yandex>

View File

@ -0,0 +1,41 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<nopass>
<password></password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</nopass>
<pass>
<password>foo</password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</pass>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,152 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def make_instance(name, cfg):
return cluster.add_instance(name,
with_zookeeper=True,
main_configs=['configs/remote_servers.xml', cfg],
user_configs=['configs/users.xml'])
# _n1/_n2 contains cluster with different <secret> -- should fail
n1 = make_instance('n1', 'configs/remote_servers_n1.xml')
n2 = make_instance('n2', 'configs/remote_servers_n2.xml')
users = pytest.mark.parametrize('user,password', [
('default', '' ),
('nopass', '' ),
('pass', 'foo'),
])
def bootstrap():
for n in cluster.instances.values():
n.query('DROP TABLE IF EXISTS data')
n.query('DROP TABLE IF EXISTS dist')
n.query('CREATE TABLE data (key Int) Engine=Memory()')
n.query("""
CREATE TABLE dist_insecure AS data
Engine=Distributed(insecure, currentDatabase(), data, key)
""")
n.query("""
CREATE TABLE dist_secure AS data
Engine=Distributed(secure, currentDatabase(), data, key)
""")
n.query("""
CREATE TABLE dist_secure_disagree AS data
Engine=Distributed(secure_disagree, currentDatabase(), data, key)
""")
n.query("""
CREATE TABLE dist_secure_buffer AS dist_secure
Engine=Buffer(currentDatabase(), dist_secure,
/* settings for manual flush only */
1, /* num_layers */
10e6, /* min_time, placeholder */
10e6, /* max_time, placeholder */
0, /* min_rows */
10e6, /* max_rows */
0, /* min_bytes */
80e6 /* max_bytes */
)
""")
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
bootstrap()
yield cluster
finally:
cluster.shutdown()
def query_with_id(node, id_, query, **kwargs):
return node.query("WITH '{}' AS __id {}".format(id_, query), **kwargs)
# @return -- [user, initial_user]
def get_query_user_info(node, query_pattern):
node.query("SYSTEM FLUSH LOGS")
return node.query("""
SELECT user, initial_user
FROM system.query_log
WHERE
query LIKE '%{}%' AND
query NOT LIKE '%system.query_log%' AND
type = 'QueryFinish'
""".format(query_pattern)).strip().split('\t')
def test_insecure():
n1.query('SELECT * FROM dist_insecure')
def test_insecure_insert_async():
n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER insecure dist_insecure')
assert int(n1.query('SELECT count() FROM dist_insecure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER insecure')
def test_insecure_insert_sync():
n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1})
assert int(n1.query('SELECT count() FROM dist_insecure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
def test_secure():
n1.query('SELECT * FROM dist_secure')
def test_secure_insert_async():
n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
def test_secure_insert_sync():
n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1})
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
# INSERT w/o initial_user
#
# Buffer() flush happens with global context, that does not have user
# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty
def test_secure_insert_buffer_async():
n1.query('INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
# no Buffer flush happened
assert int(n1.query('SELECT count() FROM dist_secure')) == 0
n1.query('OPTIMIZE TABLE dist_secure_buffer')
# manual flush
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
def test_secure_disagree():
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
n1.query('SELECT * FROM dist_secure_disagree')
def test_secure_disagree_insert():
n1.query('INSERT INTO dist_secure_disagree SELECT * FROM numbers(2)')
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure_disagree dist_secure_disagree')
# check the the connection will be re-established
# IOW that we will not get "Unknown BlockInfo field"
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
assert int(n1.query('SELECT count() FROM dist_secure_disagree')) == 0
@users
def test_user_insecure_cluster(user, password):
id_ = 'query-dist_insecure-' + user
query_with_id(n1, id_, 'SELECT * FROM dist_insecure', user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user] # due to prefer_localhost_replica
assert get_query_user_info(n2, id_) == ['default', user]
@users
def test_user_secure_cluster(user, password):
id_ = 'query-dist_secure-' + user
query_with_id(n1, id_, 'SELECT * FROM dist_secure', user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user]
# TODO: check user for INSERT

View File

@ -4,6 +4,7 @@ CREATE TABLE t (x UInt8) ENGINE = MergeTree ORDER BY x;
INSERT INTO t VALUES (1), (2), (3);
SELECT count() FROM t;
DROP ROW POLICY IF EXISTS filter ON t;
CREATE ROW POLICY filter ON t USING (x % 2 = 1) TO ALL;
SELECT count() FROM t;
DROP ROW POLICY filter ON t;