ClickHouse/src/Client/ConnectionPool.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

200 lines
5.6 KiB
C++
Raw Normal View History

#pragma once
#include <Common/PoolBase.h>
#include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
#include <Core/Settings.h>
Support for Clang Thread Safety Analysis (TSA) - TSA is a static analyzer build by Google which finds race conditions and deadlocks at compile time. - It works by associating a shared member variable with a synchronization primitive that protects it. The compiler can then check at each access if proper locking happened before. A good introduction are [0] and [1]. - TSA requires some help by the programmer via annotations. Luckily, LLVM's libcxx already has annotations for std::mutex, std::lock_guard, std::shared_mutex and std::scoped_lock. This commit enables them (--> contrib/libcxx-cmake/CMakeLists.txt). - Further, this commit adds convenience macros for the low-level annotations for use in ClickHouse (--> base/defines.h). For demonstration, they are leveraged in a few places. - As we compile with "-Wall -Wextra -Weverything", the required compiler flag "-Wthread-safety-analysis" was already enabled. Negative checks are an experimental feature of TSA and disabled (--> cmake/warnings.cmake). Compile times did not increase noticeably. - TSA is used in a few places with simple locking. I tried TSA also where locking is more complex. The problem was usually that it is unclear which data is protected by which lock :-(. But there was definitely some weird code where locking looked broken. So there is some potential to find bugs. *** Limitations of TSA besides the ones listed in [1]: - The programmer needs to know which lock protects which piece of shared data. This is not always easy for large classes. - Two synchronization primitives used in ClickHouse are not annotated in libcxx: (1) std::unique_lock: A releaseable lock handle often together with std::condition_variable, e.g. in solve producer-consumer problems. (2) std::recursive_mutex: A re-entrant mutex variant. Its usage can be considered a design flaw + typically it is slower than a standard mutex. In this commit, one std::recursive_mutex was converted to std::mutex and annotated with TSA. - For free-standing functions (e.g. helper functions) which are passed shared data members, it can be tricky to specify the associated lock. This is because the annotations use the normal C++ rules for symbol resolution. [0] https://clang.llvm.org/docs/ThreadSafetyAnalysis.html [1] https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/42958.pdf
2022-06-14 22:35:55 +00:00
#include <base/defines.h>
namespace DB
{
2017-03-09 00:56:38 +00:00
/** Interface for connection pools.
*
2017-03-09 00:56:38 +00:00
* Usage (using the usual `ConnectionPool` example)
* ConnectionPool pool(...);
*
* void thread()
* {
2020-05-17 05:45:20 +00:00
* auto connection = pool.get();
* connection->sendQuery(...);
* }
*/
class IConnectionPool : private boost::noncopyable
{
public:
using Entry = PoolBase<Connection>::Entry;
2015-10-12 14:53:16 +00:00
2021-03-19 21:49:18 +00:00
virtual ~IConnectionPool() = default;
/// Selects the connection to work.
/// If force_connected is false, the client must manually ensure that returned connection is good.
virtual Entry get(const ConnectionTimeouts & timeouts, /// NOLINT
const Settings * settings = nullptr,
bool force_connected = true) = 0;
virtual Int64 getPriority() const { return 1; }
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
using ConnectionPoolPtrs = std::vector<ConnectionPoolPtr>;
2017-03-09 00:56:38 +00:00
/** A common connection pool, without fault tolerance.
*/
class ConnectionPool : public IConnectionPool, private PoolBase<Connection>
{
public:
using Entry = IConnectionPool::Entry;
using Base = PoolBase<Connection>;
ConnectionPool(unsigned max_connections_,
const String & host_,
UInt16 port_,
2015-05-28 21:41:28 +00:00
const String & default_database_,
const String & user_,
const String & password_,
2022-08-03 19:44:08 +00:00
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
2021-03-29 00:39:10 +00:00
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Int64 priority_ = 1)
: Base(max_connections_,
2020-05-30 21:57:37 +00:00
&Poco::Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_),
port(port_),
default_database(default_database_),
user(user_),
password(password_),
2022-08-03 19:44:08 +00:00
quota_key(quota_key_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
priority(priority_)
{
}
Entry get(const ConnectionTimeouts & timeouts, /// NOLINT
const Settings * settings = nullptr,
bool force_connected = true) override
{
Entry entry;
if (settings)
entry = Base::get(settings->connection_pool_max_wait_ms.totalMilliseconds());
else
entry = Base::get(-1);
if (force_connected)
entry->forceConnected(timeouts);
return entry;
}
const std::string & getHost() const
{
return host;
}
2019-08-21 08:53:41 +00:00
std::string getDescription() const
{
return host + ":" + toString(port);
}
Int64 getPriority() const override
{
return priority;
}
protected:
2017-03-09 04:26:17 +00:00
/** Creates a new object to put in the pool. */
ConnectionPtr allocObject() override
{
return std::make_shared<Connection>(
host, port,
2022-08-03 19:44:08 +00:00
default_database, user, password, quota_key,
cluster, cluster_secret,
client_name, compression, secure);
}
private:
String host;
UInt16 port;
String default_database;
2013-08-10 09:04:45 +00:00
String user;
String password;
2022-08-03 19:44:08 +00:00
String quota_key;
/// For inter-server authorization
String cluster;
String cluster_secret;
String client_name;
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
Int64 priority; /// priority from <remote_servers>
};
2021-07-01 10:18:29 +00:00
/**
* Connection pool factory. Responsible for creating new connection pools and reuse existing ones.
*/
class ConnectionPoolFactory final : private boost::noncopyable
{
public:
struct Key
{
unsigned max_connections;
String host;
UInt16 port;
String default_database;
String user;
String password;
2022-08-03 19:44:08 +00:00
String quota_key;
2021-07-01 10:18:29 +00:00
String cluster;
String cluster_secret;
String client_name;
Protocol::Compression compression;
Protocol::Secure secure;
Int64 priority;
};
struct KeyHash
{
size_t operator()(const ConnectionPoolFactory::Key & k) const;
};
static ConnectionPoolFactory & instance();
ConnectionPoolPtr
get(unsigned max_connections,
String host,
UInt16 port,
String default_database,
String user,
String password,
2022-08-03 19:44:08 +00:00
String quota_key,
2021-07-01 10:18:29 +00:00
String cluster,
String cluster_secret,
String client_name,
Protocol::Compression compression,
Protocol::Secure secure,
Int64 priority);
private:
mutable std::mutex mutex;
using ConnectionPoolWeakPtr = std::weak_ptr<IConnectionPool>;
Support for Clang Thread Safety Analysis (TSA) - TSA is a static analyzer build by Google which finds race conditions and deadlocks at compile time. - It works by associating a shared member variable with a synchronization primitive that protects it. The compiler can then check at each access if proper locking happened before. A good introduction are [0] and [1]. - TSA requires some help by the programmer via annotations. Luckily, LLVM's libcxx already has annotations for std::mutex, std::lock_guard, std::shared_mutex and std::scoped_lock. This commit enables them (--> contrib/libcxx-cmake/CMakeLists.txt). - Further, this commit adds convenience macros for the low-level annotations for use in ClickHouse (--> base/defines.h). For demonstration, they are leveraged in a few places. - As we compile with "-Wall -Wextra -Weverything", the required compiler flag "-Wthread-safety-analysis" was already enabled. Negative checks are an experimental feature of TSA and disabled (--> cmake/warnings.cmake). Compile times did not increase noticeably. - TSA is used in a few places with simple locking. I tried TSA also where locking is more complex. The problem was usually that it is unclear which data is protected by which lock :-(. But there was definitely some weird code where locking looked broken. So there is some potential to find bugs. *** Limitations of TSA besides the ones listed in [1]: - The programmer needs to know which lock protects which piece of shared data. This is not always easy for large classes. - Two synchronization primitives used in ClickHouse are not annotated in libcxx: (1) std::unique_lock: A releaseable lock handle often together with std::condition_variable, e.g. in solve producer-consumer problems. (2) std::recursive_mutex: A re-entrant mutex variant. Its usage can be considered a design flaw + typically it is slower than a standard mutex. In this commit, one std::recursive_mutex was converted to std::mutex and annotated with TSA. - For free-standing functions (e.g. helper functions) which are passed shared data members, it can be tricky to specify the associated lock. This is because the annotations use the normal C++ rules for symbol resolution. [0] https://clang.llvm.org/docs/ThreadSafetyAnalysis.html [1] https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/42958.pdf
2022-06-14 22:35:55 +00:00
std::unordered_map<Key, ConnectionPoolWeakPtr, KeyHash> pools TSA_GUARDED_BY(mutex);
2021-07-01 10:18:29 +00:00
};
inline bool operator==(const ConnectionPoolFactory::Key & lhs, const ConnectionPoolFactory::Key & rhs)
{
return lhs.max_connections == rhs.max_connections && lhs.host == rhs.host && lhs.port == rhs.port
&& lhs.default_database == rhs.default_database && lhs.user == rhs.user && lhs.password == rhs.password
&& lhs.quota_key == rhs.quota_key
2021-07-01 10:18:29 +00:00
&& lhs.cluster == rhs.cluster && lhs.cluster_secret == rhs.cluster_secret && lhs.client_name == rhs.client_name
&& lhs.compression == rhs.compression && lhs.secure == rhs.secure && lhs.priority == rhs.priority;
}
}