mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Optional secured communication between ClickHouse and Zookeeper
This commit is contained in:
parent
f48fdda678
commit
b666f60af8
@ -37,7 +37,7 @@
|
|||||||
<preferServerCiphers>true</preferServerCiphers>
|
<preferServerCiphers>true</preferServerCiphers>
|
||||||
</server>
|
</server>
|
||||||
|
|
||||||
<client> <!-- Used for connecting to https dictionary source -->
|
<client> <!-- Used for connecting to https dictionary source and secured Zookeeper communication -->
|
||||||
<loadDefaultCAFile>true</loadDefaultCAFile>
|
<loadDefaultCAFile>true</loadDefaultCAFile>
|
||||||
<cacheSessions>true</cacheSessions>
|
<cacheSessions>true</cacheSessions>
|
||||||
<disableProtocols>sslv2,sslv3</disableProtocols>
|
<disableProtocols>sslv2,sslv3</disableProtocols>
|
||||||
|
@ -7,6 +7,10 @@ add_library(clickhouse_common_zookeeper ${clickhouse_common_zookeeper_headers} $
|
|||||||
target_link_libraries (clickhouse_common_zookeeper PUBLIC clickhouse_common_io common PRIVATE string_utils PUBLIC ${Poco_Util_LIBRARY})
|
target_link_libraries (clickhouse_common_zookeeper PUBLIC clickhouse_common_io common PRIVATE string_utils PUBLIC ${Poco_Util_LIBRARY})
|
||||||
target_include_directories(clickhouse_common_zookeeper PUBLIC ${DBMS_INCLUDE_DIR})
|
target_include_directories(clickhouse_common_zookeeper PUBLIC ${DBMS_INCLUDE_DIR})
|
||||||
|
|
||||||
|
if (USE_POCO_NETSSL)
|
||||||
|
target_link_libraries (clickhouse_common_zookeeper PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})
|
||||||
|
endif()
|
||||||
|
|
||||||
if (ENABLE_TESTS)
|
if (ENABLE_TESTS)
|
||||||
add_subdirectory (tests)
|
add_subdirectory (tests)
|
||||||
endif ()
|
endif ()
|
||||||
|
@ -59,30 +59,36 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
|
|||||||
if (implementation == "zookeeper")
|
if (implementation == "zookeeper")
|
||||||
{
|
{
|
||||||
if (hosts.empty())
|
if (hosts.empty())
|
||||||
throw KeeperException("No addresses passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
|
throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
|
||||||
|
|
||||||
std::vector<std::string> addresses_strings;
|
std::vector<std::string> hosts_strings;
|
||||||
splitInto<','>(addresses_strings, hosts);
|
splitInto<','>(hosts_strings, hosts);
|
||||||
Coordination::ZooKeeper::Addresses addresses;
|
Coordination::ZooKeeper::Nodes nodes;
|
||||||
addresses.reserve(addresses_strings.size());
|
nodes.reserve(hosts_strings.size());
|
||||||
|
|
||||||
for (const auto & address_string : addresses_strings)
|
for (auto & host_string : hosts_strings)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
addresses.emplace_back(address_string);
|
bool secure = bool(startsWith(host_string, "secure://"));
|
||||||
|
|
||||||
|
if (secure) {
|
||||||
|
host_string.erase(0, strlen("secure://"));
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes.emplace_back(Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{host_string}, secure});
|
||||||
}
|
}
|
||||||
catch (const Poco::Net::DNSException & e)
|
catch (const Poco::Net::DNSException & e)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Cannot use ZooKeeper address " << address_string << ", reason: " << e.displayText());
|
LOG_ERROR(log, "Cannot use ZooKeeper host " << host_string << ", reason: " << e.displayText());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (addresses.empty())
|
if (nodes.empty())
|
||||||
throw KeeperException("Cannot use any of provided ZooKeeper addresses", Coordination::ZBADARGUMENTS);
|
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::ZBADARGUMENTS);
|
||||||
|
|
||||||
impl = std::make_unique<Coordination::ZooKeeper>(
|
impl = std::make_unique<Coordination::ZooKeeper>(
|
||||||
addresses,
|
nodes,
|
||||||
chroot,
|
chroot,
|
||||||
identity_.empty() ? "" : "digest",
|
identity_.empty() ? "" : "digest",
|
||||||
identity_,
|
identity_,
|
||||||
@ -130,6 +136,7 @@ struct ZooKeeperArgs
|
|||||||
if (startsWith(key, "node"))
|
if (startsWith(key, "node"))
|
||||||
{
|
{
|
||||||
hosts_strings.push_back(
|
hosts_strings.push_back(
|
||||||
|
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "") +
|
||||||
config.getString(config_name + "." + key + ".host") + ":"
|
config.getString(config_name + "." + key + ".host") + ":"
|
||||||
+ config.getString(config_name + "." + key + ".port", "2181")
|
+ config.getString(config_name + "." + key + ".port", "2181")
|
||||||
);
|
);
|
||||||
|
@ -63,10 +63,14 @@ public:
|
|||||||
<node>
|
<node>
|
||||||
<host>example1</host>
|
<host>example1</host>
|
||||||
<port>2181</port>
|
<port>2181</port>
|
||||||
|
<!-- Optional. Enables communication over SSL . -->
|
||||||
|
<secure>1</secure>
|
||||||
</node>
|
</node>
|
||||||
<node>
|
<node>
|
||||||
<host>example2</host>
|
<host>example2</host>
|
||||||
<port>2181</port>
|
<port>2181</port>
|
||||||
|
<!-- Optional. Enables communication over SSL . -->
|
||||||
|
<secure>1</secure>
|
||||||
</node>
|
</node>
|
||||||
<session_timeout_ms>30000</session_timeout_ms>
|
<session_timeout_ms>30000</session_timeout_ms>
|
||||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||||
|
@ -11,6 +11,11 @@
|
|||||||
#include <Poco/Exception.h>
|
#include <Poco/Exception.h>
|
||||||
#include <Poco/Net/NetException.h>
|
#include <Poco/Net/NetException.h>
|
||||||
|
|
||||||
|
#include <Common/config.h>
|
||||||
|
#if USE_POCO_NETSSL
|
||||||
|
#include <Poco/Net/SecureStreamSocket.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
#include <array>
|
#include <array>
|
||||||
|
|
||||||
|
|
||||||
@ -44,6 +49,13 @@ namespace CurrentMetrics
|
|||||||
extern const Metric ZooKeeperWatch;
|
extern const Metric ZooKeeperWatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int SUPPORT_IS_DISABLED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** ZooKeeper wire protocol.
|
/** ZooKeeper wire protocol.
|
||||||
|
|
||||||
@ -817,7 +829,7 @@ ZooKeeper::~ZooKeeper()
|
|||||||
|
|
||||||
|
|
||||||
ZooKeeper::ZooKeeper(
|
ZooKeeper::ZooKeeper(
|
||||||
const Addresses & addresses,
|
const Nodes & nodes,
|
||||||
const String & root_path_,
|
const String & root_path_,
|
||||||
const String & auth_scheme,
|
const String & auth_scheme,
|
||||||
const String & auth_data,
|
const String & auth_data,
|
||||||
@ -851,7 +863,7 @@ ZooKeeper::ZooKeeper(
|
|||||||
default_acls.emplace_back(std::move(acl));
|
default_acls.emplace_back(std::move(acl));
|
||||||
}
|
}
|
||||||
|
|
||||||
connect(addresses, connection_timeout);
|
connect(nodes, connection_timeout);
|
||||||
|
|
||||||
if (!auth_scheme.empty())
|
if (!auth_scheme.empty())
|
||||||
sendAuth(auth_scheme, auth_data);
|
sendAuth(auth_scheme, auth_data);
|
||||||
@ -864,11 +876,11 @@ ZooKeeper::ZooKeeper(
|
|||||||
|
|
||||||
|
|
||||||
void ZooKeeper::connect(
|
void ZooKeeper::connect(
|
||||||
const Addresses & addresses,
|
const Nodes & nodes,
|
||||||
Poco::Timespan connection_timeout)
|
Poco::Timespan connection_timeout)
|
||||||
{
|
{
|
||||||
if (addresses.empty())
|
if (nodes.empty())
|
||||||
throw Exception("No addresses passed to ZooKeeper constructor", ZBADARGUMENTS);
|
throw Exception("No nodes passed to ZooKeeper constructor", ZBADARGUMENTS);
|
||||||
|
|
||||||
static constexpr size_t num_tries = 3;
|
static constexpr size_t num_tries = 3;
|
||||||
bool connected = false;
|
bool connected = false;
|
||||||
@ -876,12 +888,25 @@ void ZooKeeper::connect(
|
|||||||
WriteBufferFromOwnString fail_reasons;
|
WriteBufferFromOwnString fail_reasons;
|
||||||
for (size_t try_no = 0; try_no < num_tries; ++try_no)
|
for (size_t try_no = 0; try_no < num_tries; ++try_no)
|
||||||
{
|
{
|
||||||
for (const auto & address : addresses)
|
for (const auto & node : nodes)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
socket = Poco::Net::StreamSocket(); /// Reset the state of previous attempt.
|
/// Reset the state of previous attempt.
|
||||||
socket.connect(address, connection_timeout);
|
if (node.secure)
|
||||||
|
{
|
||||||
|
#if USE_POCO_NETSSL
|
||||||
|
socket = Poco::Net::SecureStreamSocket();
|
||||||
|
#else
|
||||||
|
throw Exception{"Communication with ZooKeeper over SSL is disabled because poco library was built without NetSSL support.", ErrorCodes::SUPPORT_IS_DISABLED};
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
socket = Poco::Net::StreamSocket();
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.connect(node.address, connection_timeout);
|
||||||
|
|
||||||
socket.setReceiveTimeout(operation_timeout);
|
socket.setReceiveTimeout(operation_timeout);
|
||||||
socket.setSendTimeout(operation_timeout);
|
socket.setSendTimeout(operation_timeout);
|
||||||
@ -915,7 +940,7 @@ void ZooKeeper::connect(
|
|||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << address.toString();
|
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << node.address.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -926,15 +951,19 @@ void ZooKeeper::connect(
|
|||||||
if (!connected)
|
if (!connected)
|
||||||
{
|
{
|
||||||
WriteBufferFromOwnString message;
|
WriteBufferFromOwnString message;
|
||||||
message << "All connection tries failed while connecting to ZooKeeper. Addresses: ";
|
message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
|
||||||
bool first = true;
|
bool first = true;
|
||||||
for (const auto & address : addresses)
|
for (const auto & node : nodes)
|
||||||
{
|
{
|
||||||
if (first)
|
if (first)
|
||||||
first = false;
|
first = false;
|
||||||
else
|
else
|
||||||
message << ", ";
|
message << ", ";
|
||||||
message << address.toString();
|
|
||||||
|
if (node.secure)
|
||||||
|
message << "secure://";
|
||||||
|
|
||||||
|
message << node.address.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
message << fail_reasons.str() << "\n";
|
message << fail_reasons.str() << "\n";
|
||||||
|
@ -93,17 +93,22 @@ struct ZooKeeperRequest;
|
|||||||
class ZooKeeper : public IKeeper
|
class ZooKeeper : public IKeeper
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using Addresses = std::vector<Poco::Net::SocketAddress>;
|
struct Node {
|
||||||
|
Poco::Net::SocketAddress address;
|
||||||
|
bool secure;
|
||||||
|
};
|
||||||
|
|
||||||
|
using Nodes = std::vector<Node>;
|
||||||
|
|
||||||
using XID = int32_t;
|
using XID = int32_t;
|
||||||
using OpNum = int32_t;
|
using OpNum = int32_t;
|
||||||
|
|
||||||
/** Connection to addresses is performed in order. If you want, shuffle them manually.
|
/** Connection to nodes is performed in order. If you want, shuffle them manually.
|
||||||
* Operation timeout couldn't be greater than session timeout.
|
* Operation timeout couldn't be greater than session timeout.
|
||||||
* Operation timeout applies independently for network read, network write, waiting for events and synchronization.
|
* Operation timeout applies independently for network read, network write, waiting for events and synchronization.
|
||||||
*/
|
*/
|
||||||
ZooKeeper(
|
ZooKeeper(
|
||||||
const Addresses & addresses,
|
const Nodes & nodes,
|
||||||
const String & root_path,
|
const String & root_path,
|
||||||
const String & auth_scheme,
|
const String & auth_scheme,
|
||||||
const String & auth_data,
|
const String & auth_data,
|
||||||
@ -213,7 +218,7 @@ private:
|
|||||||
ThreadFromGlobalPool receive_thread;
|
ThreadFromGlobalPool receive_thread;
|
||||||
|
|
||||||
void connect(
|
void connect(
|
||||||
const Addresses & addresses,
|
const Nodes & node,
|
||||||
Poco::Timespan connection_timeout);
|
Poco::Timespan connection_timeout);
|
||||||
|
|
||||||
void sendHandshake();
|
void sendHandshake();
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#include <Poco/ConsoleChannel.h>
|
#include <Poco/ConsoleChannel.h>
|
||||||
#include <Poco/Logger.h>
|
#include <Poco/Logger.h>
|
||||||
#include <Poco/Event.h>
|
#include <Poco/Event.h>
|
||||||
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
#include <Common/ZooKeeper/ZooKeeperImpl.h>
|
#include <Common/ZooKeeper/ZooKeeperImpl.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@ -23,15 +24,23 @@ try
|
|||||||
Poco::Logger::root().setChannel(channel);
|
Poco::Logger::root().setChannel(channel);
|
||||||
Poco::Logger::root().setLevel("trace");
|
Poco::Logger::root().setLevel("trace");
|
||||||
|
|
||||||
std::string addresses_arg = argv[1];
|
std::string hosts_arg = argv[1];
|
||||||
std::vector<std::string> addresses_strings;
|
std::vector<std::string> hosts_strings;
|
||||||
splitInto<','>(addresses_strings, addresses_arg);
|
splitInto<','>(hosts_strings, hosts_arg);
|
||||||
ZooKeeper::Addresses addresses;
|
ZooKeeper::Nodes nodes;
|
||||||
addresses.reserve(addresses_strings.size());
|
nodes.reserve(hosts_strings.size());
|
||||||
for (const auto & address_string : addresses_strings)
|
for (auto & host_string : hosts_strings) {
|
||||||
addresses.emplace_back(address_string);
|
bool secure = bool(startsWith(host_string, "secure://"));
|
||||||
|
|
||||||
ZooKeeper zk(addresses, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000});
|
if (secure) {
|
||||||
|
host_string.erase(0, strlen("secure://"));
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes.emplace_back(ZooKeeper::Node{Poco::Net::SocketAddress{host_string},secure});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ZooKeeper zk(nodes, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000});
|
||||||
|
|
||||||
Poco::Event event(true);
|
Poco::Event event(true);
|
||||||
|
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
int main()
|
int main()
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Coordination::ZooKeeper zookeeper({Poco::Net::SocketAddress{"localhost:2181"}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000});
|
Coordination::ZooKeeper zookeeper({Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{"localhost:2181"}, false}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000});
|
||||||
|
|
||||||
zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response)
|
zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response)
|
||||||
{
|
{
|
||||||
|
@ -37,7 +37,7 @@
|
|||||||
<preferServerCiphers>true</preferServerCiphers>
|
<preferServerCiphers>true</preferServerCiphers>
|
||||||
</server>
|
</server>
|
||||||
|
|
||||||
<client> <!-- Used for connecting to https dictionary source -->
|
<client> <!-- Used for connecting to https dictionary source and secured Zookeeper communication -->
|
||||||
<loadDefaultCAFile>true</loadDefaultCAFile>
|
<loadDefaultCAFile>true</loadDefaultCAFile>
|
||||||
<cacheSessions>true</cacheSessions>
|
<cacheSessions>true</cacheSessions>
|
||||||
<disableProtocols>sslv2,sslv3</disableProtocols>
|
<disableProtocols>sslv2,sslv3</disableProtocols>
|
||||||
|
@ -31,7 +31,7 @@
|
|||||||
<preferServerCiphers>true</preferServerCiphers>
|
<preferServerCiphers>true</preferServerCiphers>
|
||||||
</server>
|
</server>
|
||||||
|
|
||||||
<client> <!-- Used for connecting to https dictionary source -->
|
<client> <!-- Used for connecting to https dictionary source and secured Zookeeper communication -->
|
||||||
<loadDefaultCAFile>true</loadDefaultCAFile>
|
<loadDefaultCAFile>true</loadDefaultCAFile>
|
||||||
<cacheSessions>true</cacheSessions>
|
<cacheSessions>true</cacheSessions>
|
||||||
<disableProtocols>sslv2,sslv3</disableProtocols>
|
<disableProtocols>sslv2,sslv3</disableProtocols>
|
||||||
|
Loading…
Reference in New Issue
Block a user