Merge pull request #14113 from johnskopis/dynamic-interserver-creds-v20

Support interserver credential rotation
This commit is contained in:
alesapin 2021-04-08 11:01:40 +03:00 committed by GitHub
commit 1533f9b9aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 247 additions and 45 deletions

View File

@ -47,6 +47,7 @@
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/ExpressionJIT.h>
#include <Access/AccessControlManager.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -688,16 +689,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}
if (config().has("interserver_http_credentials"))
{
String user = config().getString("interserver_http_credentials.user", "");
String password = config().getString("interserver_http_credentials.password", "");
if (user.empty())
throw Exception("Configuration parameter interserver_http_credentials user can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->setInterserverCredentials(user, password);
}
LOG_DEBUG(log, "Initiailizing interserver credentials.");
global_context->updateInterserverCredentials(config());
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
@ -777,6 +770,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
global_context->updateStorageConfiguration(*config);
global_context->updateInterserverCredentials(*config);
},
/* already_loaded = */ false); /// Reload it right now (initial loading)

View File

@ -49,6 +49,7 @@
#include <Interpreters/ExternalModelsLoader.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/InterserverIOHandler.h>
#include <Interpreters/SystemLog.h>
@ -322,9 +323,8 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_io_user;
String interserver_io_password;
String interserver_scheme; /// http or https
MultiVersion<InterserverCredentials> interserver_io_credentials;
String path; /// Path to the data directory, with a slash at the end.
String flags_path; /// Path to the directory with some control flags for server maintenance.
@ -1734,6 +1734,17 @@ bool Context::hasAuxiliaryZooKeeper(const String & name) const
return getConfigRef().has("auxiliary_zookeepers." + name);
}
InterserverCredentialsPtr Context::getInterserverCredentials()
{
return shared->interserver_io_credentials.get();
}
void Context::updateInterserverCredentials(const Poco::Util::AbstractConfiguration & config)
{
auto credentials = InterserverCredentials::make(config, "interserver_http_credentials");
shared->interserver_io_credentials.set(std::move(credentials));
}
void Context::setInterserverIOAddress(const String & host, UInt16 port)
{
shared->interserver_io_host = host;
@ -1749,17 +1760,6 @@ std::pair<String, UInt16> Context::getInterserverIOAddress() const
return { shared->interserver_io_host, shared->interserver_io_port };
}
void Context::setInterserverCredentials(const String & user_, const String & password)
{
shared->interserver_io_user = user_;
shared->interserver_io_password = password;
}
std::pair<String, String> Context::getInterserverCredentials() const
{
return { shared->interserver_io_user, shared->interserver_io_password };
}
void Context::setInterserverScheme(const String & scheme)
{
shared->interserver_scheme = scheme;

View File

@ -61,6 +61,8 @@ class AccessRightsElements;
class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalModelsLoader;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
class BackgroundSchedulePool;
class MergeList;
@ -522,8 +524,8 @@ public:
std::pair<String, UInt16> getInterserverIOAddress() const;
/// Credentials which server will use to communicate with others
void setInterserverCredentials(const String & user, const String & password);
std::pair<String, String> getInterserverCredentials() const;
void updateInterserverCredentials(const Poco::Util::AbstractConfiguration & config);
InterserverCredentialsPtr getInterserverCredentials();
/// Interserver requests scheme (http or https)
void setInterserverScheme(const String & scheme);
@ -788,6 +790,9 @@ private:
StoragePolicySelectorPtr getStoragePolicySelector(std::lock_guard<std::mutex> & lock) const;
DiskSelectorPtr getDiskSelector(std::lock_guard<std::mutex> & /* lock */) const;
/// If the password is not set, the password will not be checked
void setUserImpl(const String & name, const std::optional<String> & password, const Poco::Net::SocketAddress & address);
};

View File

@ -0,0 +1,87 @@
#include <Interpreters/InterserverCredentials.h>
#include <common/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
}
std::unique_ptr<InterserverCredentials>
InterserverCredentials::make(const Poco::Util::AbstractConfiguration & config, const std::string & root_tag)
{
if (config.has("user") && !config.has("password"))
throw Exception("Configuration parameter interserver_http_credentials.password can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (!config.has("user") && config.has("password"))
throw Exception("Configuration parameter interserver_http_credentials.user can't be empty if user specified", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
/// They both can be empty
auto user = config.getString(root_tag + ".user", "");
auto password = config.getString(root_tag + ".password", "");
auto store = parseCredentialsFromConfig(user, password, config, root_tag);
return std::make_unique<InterserverCredentials>(user, password, store);
}
InterserverCredentials::CurrentCredentials InterserverCredentials::parseCredentialsFromConfig(
const std::string & current_user_,
const std::string & current_password_,
const Poco::Util::AbstractConfiguration & config,
const std::string & root_tag)
{
auto * log = &Poco::Logger::get("InterserverCredentials");
CurrentCredentials store;
store.emplace_back(current_user_, current_password_);
if (config.getBool(root_tag + ".allow_empty", false))
{
LOG_DEBUG(log, "Allowing empty credentials");
/// Allow empty credential to support migrating from no auth
store.emplace_back("", "");
}
Poco::Util::AbstractConfiguration::Keys old_users;
config.keys(root_tag, old_users);
for (const auto & user_key : old_users)
{
if (startsWith(user_key, "old"))
{
std::string full_prefix = root_tag + "." + user_key;
std::string old_user_name = config.getString(full_prefix + ".user");
LOG_DEBUG(log, "Adding credentials for old user {}", old_user_name);
std::string old_user_password = config.getString(full_prefix + ".password");
store.emplace_back(old_user_name, old_user_password);
}
}
return store;
}
InterserverCredentials::CheckResult InterserverCredentials::isValidUser(const UserWithPassword & credentials) const
{
auto itr = std::find(all_users_store.begin(), all_users_store.end(), credentials);
if (itr == all_users_store.end())
{
if (credentials.first.empty())
return {"Server requires HTTP Basic authentication, but client doesn't provide it", false};
return {"Incorrect user or password in HTTP basic authentication: " + credentials.first, false};
}
return {"", true};
}
InterserverCredentials::CheckResult InterserverCredentials::isValidUser(const std::string & user, const std::string & password) const
{
return isValidUser(std::make_pair(user, password));
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <unordered_set>
namespace DB
{
/// InterserverCredentials implements authentication using a CurrentCredentials, which
/// is configured, e.g.
/// <interserver_http_credentials>
/// <user>admin</user>
/// <password>222</password>
/// <!-- To support mix of un/authenticated clients -->
/// <!-- <allow_empty>true</allow_empty> -->
/// <old>
/// <!-- Allow authentication using previous passwords during rotation -->
/// <user>admin</user>
/// <password>qqq</password>
/// </old>
/// <old>
/// <!-- Allow authentication using previous users during rotation -->
/// <user>johny</user>
/// <password>333</password>
/// </old>
/// </interserver_http_credentials>
class InterserverCredentials
{
public:
using UserWithPassword = std::pair<std::string, std::string>;
using CheckResult = std::pair<std::string, bool>;
using CurrentCredentials = std::vector<UserWithPassword>;
InterserverCredentials(const InterserverCredentials &) = delete;
static std::unique_ptr<InterserverCredentials> make(const Poco::Util::AbstractConfiguration & config, const std::string & root_tag);
InterserverCredentials(const std::string & current_user_, const std::string & current_password_, const CurrentCredentials & all_users_store_)
: current_user(current_user_)
, current_password(current_password_)
, all_users_store(all_users_store_)
{}
CheckResult isValidUser(const UserWithPassword & credentials) const;
CheckResult isValidUser(const std::string & user, const std::string & password) const;
std::string getUser() const { return current_user; }
std::string getPassword() const { return current_password; }
private:
std::string current_user;
std::string current_password;
/// In common situation this store contains one record
CurrentCredentials all_users_store;
static CurrentCredentials parseCredentialsFromConfig(
const std::string & current_user_,
const std::string & current_password_,
const Poco::Util::AbstractConfiguration & config,
const std::string & root_tag);
};
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
}

View File

@ -103,6 +103,7 @@ SRCS(
InterpreterSystemQuery.cpp
InterpreterUseQuery.cpp
InterpreterWatchQuery.cpp
InterserverCredentials.cpp
JoinSwitcher.cpp
JoinToSubqueryTransformVisitor.cpp
JoinedTables.cpp

View File

@ -25,29 +25,26 @@ namespace ErrorCodes
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(HTTPServerRequest & request) const
{
const auto & config = server.config();
if (config.has("interserver_http_credentials.user"))
auto server_credentials = server.context().getInterserverCredentials();
if (server_credentials)
{
if (!request.hasCredentials())
return {"Server requires HTTP Basic authentication, but client doesn't provide it", false};
return server_credentials->isValidUser("", "");
String scheme, info;
request.getCredentials(scheme, info);
if (scheme != "Basic")
return {"Server requires HTTP Basic authentication but client provides another method", false};
String user = config.getString("interserver_http_credentials.user");
String password = config.getString("interserver_http_credentials.password", "");
Poco::Net::HTTPBasicCredentials credentials(info);
if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword()))
return {"Incorrect user or password in HTTP Basic authentication", false};
return server_credentials->isValidUser(credentials.getUsername(), credentials.getPassword());
}
else if (request.hasCredentials())
{
return {"Client requires HTTP Basic authentication, but server doesn't provide it", false};
}
return {"", true};
}

View File

@ -2,10 +2,12 @@
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/InterserverCredentials.h>
#include <Poco/Logger.h>
#include <memory>
#include <string>
namespace CurrentMetrics

View File

@ -52,6 +52,7 @@
#include <Interpreters/PartLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/InterserverCredentials.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/copyData.h>
@ -2401,7 +2402,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = getFetchPartHTTPTimeouts(global_context);
auto [user, password] = global_context.getInterserverCredentials();
auto credentials = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
if (interserver_scheme != address.scheme)
@ -2409,7 +2410,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
part_desc->res_part = fetcher.fetchPart(
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -3755,8 +3756,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
ReplicatedMergeTreeAddress address;
ConnectionTimeouts timeouts;
std::pair<String, String> user_password;
String interserver_scheme;
InterserverCredentialsPtr credentials;
std::optional<CurrentlySubmergingEmergingTagger> tagger_ptr;
std::function<MutableDataPartPtr()> get_part;
@ -3772,10 +3773,10 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
address.fromString(zookeeper->get(source_replica_path + "/host"));
timeouts = getFetchPartHTTPTimeouts(global_context);
user_password = global_context.getInterserverCredentials();
credentials = global_context.getInterserverCredentials();
interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
get_part = [&, address, timeouts, credentials, interserver_scheme]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
@ -3789,8 +3790,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
address.host,
address.replication_port,
timeouts,
user_password.first,
user_password.second,
credentials->getUser(),
credentials->getPassword(),
interserver_scheme,
to_detached,
"",
@ -3928,10 +3929,10 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
ReplicatedMergeTreeAddress address(zookeeper->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
auto credentials = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
get_part = [&, address, timeouts, interserver_scheme, credentials]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
@ -3941,7 +3942,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
return fetcher.fetchPart(
metadata_snapshot, part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, false, "", nullptr, true,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
replaced_disk);
};

View File

@ -9,7 +9,6 @@ def _fill_nodes(nodes, shard):
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}', date, id, 8192);
'''.format(shard=shard, replica=node.name))
@ -114,6 +113,32 @@ def test_different_credentials(different_credentials_cluster):
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == '222\n'
add_old = """
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>admin</user>
<password>222</password>
<old>
<user>root</user>
<password>111</password>
</old>
<old>
<user>aaa</user>
<password>333</password>
</old>
</interserver_http_credentials>
</yandex>
"""
node5.replace_config("/etc/clickhouse-server/config.d/credentials1.xml", add_old)
node5.query("SYSTEM RELOAD CONFIG")
node5.query("INSERT INTO test_table values('2017-06-21', 333, 1)")
node6.query("SYSTEM SYNC REPLICA test_table", timeout=10)
assert node6.query("SELECT id FROM test_table order by id") == '111\n222\n333\n'
node7 = cluster.add_instance('node7', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'],
with_zookeeper=True)
@ -146,3 +171,23 @@ def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
assert node7.query("SELECT id FROM test_table order by id") == '111\n'
assert node8.query("SELECT id FROM test_table order by id") == '222\n'
allow_empty = """
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>admin</user>
<password>222</password>
<allow_empty>true</allow_empty>
</interserver_http_credentials>
</yandex>
"""
# change state: Flip node7 to mixed auth/non-auth (allow node8)
node7.replace_config("/etc/clickhouse-server/config.d/credentials1.xml",
allow_empty)
node7.query("SYSTEM RELOAD CONFIG")
node7.query("insert into test_table values ('2017-06-22', 333, 1)")
node8.query("SYSTEM SYNC REPLICA test_table", timeout=10)
assert node8.query("SELECT id FROM test_table order by id") == '111\n222\n333\n'