Small improvements

This commit is contained in:
alesapin 2021-04-07 16:52:11 +03:00
parent feff1175f4
commit 2987bbc948
9 changed files with 132 additions and 144 deletions

View File

@ -689,7 +689,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}
LOG_DEBUG(log, "Initiaializing InterserverCredentials.");
LOG_DEBUG(log, "Initiailizing interserver credentials.");
global_context->updateInterserverCredentials(config());
if (config().has("macros"))

View File

@ -324,8 +324,7 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_scheme; /// http or https
mutable std::mutex interserver_io_credentials_mutex;
std::shared_ptr<BaseInterserverCredentials> interserver_io_credentials;
MultiVersion<InterserverCredentials> interserver_io_credentials;
String path; /// Path to the data directory, with a slash at the end.
String flags_path; /// Path to the directory with some control flags for server maintenance.
@ -1735,40 +1734,15 @@ bool Context::hasAuxiliaryZooKeeper(const String & name) const
return getConfigRef().has("auxiliary_zookeepers." + name);
}
std::shared_ptr<BaseInterserverCredentials> Context::getInterserverCredential()
InterserverCredentialsPtr Context::getInterserverCredentials()
{
std::lock_guard lock(shared->interserver_io_credentials_mutex);
return shared->interserver_io_credentials;
}
void Context::setInterserverCredentials(std::shared_ptr<BaseInterserverCredentials> credentials)
{
std::lock_guard lock(shared->interserver_io_credentials_mutex);
shared->interserver_io_credentials = credentials;
return shared->interserver_io_credentials.get();
}
void Context::updateInterserverCredentials(const Poco::Util::AbstractConfiguration & config)
{
std::shared_ptr<BaseInterserverCredentials> interserver_credentials = nullptr;
if (config.has("interserver_http_credentials"))
{
interserver_credentials = ConfigInterserverCredentials::make(config, "interserver_http_credentials");
}
else
{
interserver_credentials = NullInterserverCredentials::make();
}
global_context->setInterserverCredentials(interserver_credentials);
}
std::pair<String, String> Context::getInterserverCredentials() const
{
std::lock_guard lock(shared->interserver_io_credentials_mutex);
auto & credentials = shared->interserver_io_credentials;
return { credentials->getUser(), credentials->getPassword() };
auto credentials = InterserverCredentials::make(config, "interserver_http_credentials");
shared->interserver_io_credentials.set(std::move(credentials));
}
void Context::setInterserverIOAddress(const String & host, UInt16 port)

View File

@ -61,7 +61,8 @@ class AccessRightsElements;
class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalModelsLoader;
class BaseInterserverCredentials;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
class BackgroundSchedulePool;
class MergeList;
@ -524,8 +525,7 @@ public:
/// Credentials which server will use to communicate with others
void updateInterserverCredentials(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<BaseInterserverCredentials> getInterserverCredential();
std::pair<String, String> getInterserverCredentials() const;
InterserverCredentialsPtr getInterserverCredentials();
/// Interserver requests scheme (http or https)
void setInterserverScheme(const String & scheme);
@ -793,8 +793,6 @@ private:
/// If the password is not set, the password will not be checked
void setUserImpl(const String & name, const std::optional<String> & password, const Poco::Net::SocketAddress & address);
void setInterserverCredentials(std::shared_ptr<BaseInterserverCredentials> credentials);
};

View File

@ -1,59 +1,76 @@
#include <Interpreters/InterserverCredentials.h>
#include <common/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
}
std::shared_ptr<ConfigInterserverCredentials>
ConfigInterserverCredentials::make(const Poco::Util::AbstractConfiguration & config, const std::string root_tag)
std::unique_ptr<InterserverCredentials>
InterserverCredentials::make(const Poco::Util::AbstractConfiguration & config, const std::string & root_tag)
{
const auto user = config.getString(root_tag + ".user", "");
const auto password = config.getString(root_tag + ".password", "");
if (config.has("user") && !config.has("password"))
throw Exception("Configuration parameter interserver_http_credentials.password can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (user.empty())
throw Exception("Configuration parameter interserver_http_credentials user can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (!config.has("user") && config.has("password"))
throw Exception("Configuration parameter interserver_http_credentials.user can't be empty if user specified", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
auto store = makeCredentialStore(user, password, config, root_tag);
/// They both can be empty
auto user = config.getString(root_tag + ".user", "");
auto password = config.getString(root_tag + ".password", "");
return std::make_shared<ConfigInterserverCredentials>(user, password, store);
auto store = parseCredentialsFromConfig(user, password, config, root_tag);
return std::make_unique<InterserverCredentials>(user, password, store);
}
ConfigInterserverCredentials::Store ConfigInterserverCredentials::makeCredentialStore(
const std::string current_user_,
const std::string current_password_,
InterserverCredentials::CurrentCredentials InterserverCredentials::parseCredentialsFromConfig(
const std::string & current_user_,
const std::string & current_password_,
const Poco::Util::AbstractConfiguration & config,
const std::string root_tag)
const std::string & root_tag)
{
Store store;
store.insert({{current_user_, current_password_}, true});
if (config.has(root_tag + ".allow_empty") && config.getBool(root_tag + ".allow_empty"))
auto * log = &Poco::Logger::get("InterserverCredentials");
CurrentCredentials store;
store.emplace_back(current_user_, current_password_);
if (config.getBool(root_tag + ".allow_empty", false))
{
LOG_DEBUG(log, "Allowing empty credentials");
/// Allow empty credential to support migrating from no auth
store.insert({{"", ""}, true});
store.emplace_back("", "");
}
Poco::Util::AbstractConfiguration::Keys old_users;
config.keys(root_tag, old_users);
Poco::Util::AbstractConfiguration::Keys users;
config.keys(root_tag + ".users", users);
for (const auto & user : users)
for (const auto & user_key : old_users)
{
LOG_DEBUG(&Poco::Logger::get("InterserverCredentials"), "Adding credential for {}", user);
const auto password = config.getString(root_tag + ".users." + user);
store.insert({{user, password}, true});
if (startsWith(user_key, "old"))
{
std::string full_prefix = root_tag + "." + user_key;
std::string old_user_name = config.getString(full_prefix + ".user");
LOG_DEBUG(log, "Adding credentials for old user {}", old_user_name);
std::string old_user_password = config.getString(full_prefix + ".password");
store.emplace_back(old_user_name, old_user_password);
}
}
return store;
}
std::pair<String, bool> ConfigInterserverCredentials::isValidUser(const std::pair<std::string, std::string> credentials)
InterserverCredentials::CheckResult InterserverCredentials::isValidUser(const UserWithPassword & credentials) const
{
const auto & valid = store.find(credentials);
if (valid == store.end())
auto itr = std::find(all_users_store.begin(), all_users_store.end(), credentials);
if (itr == all_users_store.end())
return {"Incorrect user or password in HTTP basic authentication: " + credentials.first, false};
return {"", true};
}

View File

@ -3,92 +3,68 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <unordered_set>
namespace DB
{
/// InterserverCredentials holds credentials for server (store) and client
/// credentials (current_*). The container is constructed through `make` and a
/// shared_ptr is captured inside Context.
class BaseInterserverCredentials
{
public:
BaseInterserverCredentials(std::string current_user_, std::string current_password_)
: current_user(current_user_), current_password(current_password_)
{ }
virtual ~BaseInterserverCredentials() { }
/// isValidUser returns true or throws WRONG_PASSWORD
virtual std::pair<String, bool> isValidUser(const std::pair<std::string, std::string> credentials) = 0;
std::string getUser() { return current_user; }
std::string getPassword() { return current_password; }
protected:
std::string current_user;
std::string current_password;
};
/// NullInterserverCredentials are used when authentication is not configured
class NullInterserverCredentials : public BaseInterserverCredentials
{
public:
NullInterserverCredentials(const NullInterserverCredentials &) = delete;
NullInterserverCredentials() : BaseInterserverCredentials("", "") { }
~NullInterserverCredentials() override { }
static std::shared_ptr<NullInterserverCredentials> make() { return std::make_shared<NullInterserverCredentials>(); }
std::pair<String, bool> isValidUser(const std::pair<std::string, std::string> credentials) override
{
std::ignore = credentials;
return {"", true};
}
};
/// ConfigInterserverCredentials implements authentication using a Store, which
/// InterserverCredentials implements authentication using a CurrentCredentials, which
/// is configured, e.g.
/// <interserver_http_credentials>
/// <user>admin</user>
/// <password>222</password>
/// <!-- To support mix of un/authenticated clients -->
/// <!-- <allow_empty>true</allow_empty> -->
/// <users>
/// <old>
/// <!-- Allow authentication using previous passwords during rotation -->
/// <admin>111</admin>
/// </users>
/// <user>admin</user>
/// <password>qqq</password>
/// </old>
/// <old>
/// <!-- Allow authentication using previous users during rotation -->
/// <user>johny</user>
/// <password>333</password>
/// </old>
/// </interserver_http_credentials>
class ConfigInterserverCredentials : public BaseInterserverCredentials
class InterserverCredentials
{
public:
using Store = std::map<std::pair<std::string, std::string>, bool>;
using UserWithPassword = std::pair<std::string, std::string>;
using CheckResult = std::pair<std::string, bool>;
using CurrentCredentials = std::vector<UserWithPassword>;
ConfigInterserverCredentials(const ConfigInterserverCredentials &) = delete;
InterserverCredentials(const InterserverCredentials &) = delete;
static std::shared_ptr<ConfigInterserverCredentials> make(const Poco::Util::AbstractConfiguration & config, const std::string root_tag);
static std::unique_ptr<InterserverCredentials> make(const Poco::Util::AbstractConfiguration & config, const std::string & root_tag);
~ConfigInterserverCredentials() override { }
InterserverCredentials(const std::string & current_user_, const std::string & current_password_, const CurrentCredentials & all_users_store_)
: current_user(current_user_)
, current_password(current_password_)
, all_users_store(all_users_store_)
{}
ConfigInterserverCredentials(const std::string current_user_, const std::string current_password_, const Store & store_)
: BaseInterserverCredentials(current_user_, current_password_), store(std::move(store_))
{
}
CheckResult isValidUser(const UserWithPassword & credentials) const;
std::string getUser() const { return current_user; }
std::string getPassword() const { return current_password; }
std::pair<String, bool> isValidUser(const std::pair<std::string, std::string> credentials) override;
private:
Store store;
std::string current_user;
std::string current_password;
static Store makeCredentialStore(
const std::string current_user_,
const std::string current_password_,
/// In common situation this store contains one record
CurrentCredentials all_users_store;
static CurrentCredentials parseCredentialsFromConfig(
const std::string & current_user_,
const std::string & current_password_,
const Poco::Util::AbstractConfiguration & config,
const std::string root_tag);
const std::string & root_tag);
};
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
}

View File

@ -26,11 +26,11 @@ namespace ErrorCodes
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(HTTPServerRequest & request) const
{
auto server_credentials = server.context().getInterserverCredential();
auto server_credentials = server.context().getInterserverCredentials();
if (server_credentials)
{
if (!request.hasCredentials())
return server_credentials->isValidUser(std::make_pair(default_user, default_password));
return server_credentials->isValidUser(std::make_pair("", ""));
String scheme, info;
request.getCredentials(scheme, info);

View File

@ -46,8 +46,6 @@ private:
void processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output);
std::pair<String, bool> checkAuthentication(HTTPServerRequest & request) const;
const std::string default_user;
const std::string default_password;
};
}

View File

@ -52,6 +52,7 @@
#include <Interpreters/PartLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/InterserverCredentials.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/copyData.h>
@ -2401,7 +2402,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = getFetchPartHTTPTimeouts(global_context);
auto [user, password] = global_context.getInterserverCredentials();
auto credentials = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
if (interserver_scheme != address.scheme)
@ -2409,7 +2410,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
part_desc->res_part = fetcher.fetchPart(
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
address.host, address.replication_port, timeouts, user, password, interserver_scheme, false, TMP_PREFIX + "fetch_");
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -3755,7 +3756,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
ReplicatedMergeTreeAddress address;
ConnectionTimeouts timeouts;
std::pair<String, String> user_password;
String interserver_scheme;
std::optional<CurrentlySubmergingEmergingTagger> tagger_ptr;
std::function<MutableDataPartPtr()> get_part;
@ -3772,10 +3772,10 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
address.fromString(zookeeper->get(source_replica_path + "/host"));
timeouts = getFetchPartHTTPTimeouts(global_context);
user_password = global_context.getInterserverCredentials();
auto credentials = global_context.getInterserverCredentials();
interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
get_part = [&, address, timeouts, credentials, interserver_scheme]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
@ -3789,8 +3789,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
address.host,
address.replication_port,
timeouts,
user_password.first,
user_password.second,
credentials->getUser(),
credentials->getPassword(),
interserver_scheme,
to_detached,
"",
@ -3928,10 +3928,10 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
ReplicatedMergeTreeAddress address(zookeeper->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
auto credentials = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
get_part = [&, address, timeouts, interserver_scheme, credentials]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
@ -3941,7 +3941,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
return fetcher.fetchPart(
metadata_snapshot, part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, false, "", nullptr, true,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
replaced_disk);
};

View File

@ -113,6 +113,32 @@ def test_different_credentials(different_credentials_cluster):
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == '222\n'
add_old = """
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>admin</user>
<password>222</password>
<old>
<user>root</user>
<password>111</password>
</old>
<old>
<user>aaa</user>
<password>333</password>
</old>
</interserver_http_credentials>
</yandex>
"""
node5.replace_config("/etc/clickhouse-server/config.d/credentials1.xml", add_old)
node5.query("SYSTEM RELOAD CONFIG")
node5.query("INSERT INTO test_table values('2017-06-21', 333, 1)")
node6.query("SYSTEM SYNC REPLICA test_table", timeout=10)
assert node6.query("SELECT id FROM test_table order by id") == '111\n222\n333\n'
node7 = cluster.add_instance('node7', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'],
with_zookeeper=True)
@ -134,7 +160,6 @@ def credentials_and_no_credentials_cluster():
def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
# Initial state: node7 requires auth; node8 open
node7.query("insert into test_table values ('2017-06-21', 111, 0)")
time.sleep(1)
@ -144,7 +169,7 @@ def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
node8.query("insert into test_table values ('2017-06-22', 222, 1)")
time.sleep(1)
assert node7.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node7.query("SELECT id FROM test_table order by id") == '111\n'
assert node8.query("SELECT id FROM test_table order by id") == '222\n'
allow_empty = """
@ -161,8 +186,8 @@ def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
# change state: Flip node7 to mixed auth/non-auth (allow node8)
node7.replace_config("/etc/clickhouse-server/config.d/credentials1.xml",
allow_empty)
node7.query("SYSTEM RELOAD CONFIG")
node7.query("insert into test_table values ('2017-06-22', 333, 1)")
node8.query("DETACH TABLE test_table")
node8.query("ATTACH TABLE test_table")
time.sleep(3)
node8.query("SYSTEM SYNC REPLICA test_table", timeout=10)
assert node8.query("SELECT id FROM test_table order by id") == '111\n222\n333\n'