mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Fix clickhouse server start when replicated access storage depend on keeper
This commit is contained in:
parent
9f12f4af13
commit
b8c6481d4c
@ -330,8 +330,6 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
|
|||||||
|
|
||||||
DB::ServerUUID::load(path + "/uuid", log);
|
DB::ServerUUID::load(path + "/uuid", log);
|
||||||
|
|
||||||
const Settings & settings = global_context->getSettingsRef();
|
|
||||||
|
|
||||||
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
|
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
|
||||||
|
|
||||||
GlobalThreadPool::initialize(
|
GlobalThreadPool::initialize(
|
||||||
@ -377,8 +375,8 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
|
|||||||
{
|
{
|
||||||
Poco::Net::ServerSocket socket;
|
Poco::Net::ServerSocket socket;
|
||||||
auto address = socketBindListen(socket, listen_host, port);
|
auto address = socketBindListen(socket, listen_host, port);
|
||||||
socket.setReceiveTimeout(settings.receive_timeout);
|
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
|
||||||
socket.setSendTimeout(settings.send_timeout);
|
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
|
||||||
servers->emplace_back(
|
servers->emplace_back(
|
||||||
listen_host,
|
listen_host,
|
||||||
port_name,
|
port_name,
|
||||||
@ -393,8 +391,8 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
|
|||||||
#if USE_SSL
|
#if USE_SSL
|
||||||
Poco::Net::SecureServerSocket socket;
|
Poco::Net::SecureServerSocket socket;
|
||||||
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
|
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
|
||||||
socket.setReceiveTimeout(settings.receive_timeout);
|
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
|
||||||
socket.setSendTimeout(settings.send_timeout);
|
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
|
||||||
servers->emplace_back(
|
servers->emplace_back(
|
||||||
listen_host,
|
listen_host,
|
||||||
secure_port_name,
|
secure_port_name,
|
||||||
|
@ -967,6 +967,83 @@ if (ThreadFuzzer::instance().isEffective())
|
|||||||
},
|
},
|
||||||
/* already_loaded = */ false); /// Reload it right now (initial loading)
|
/* already_loaded = */ false); /// Reload it right now (initial loading)
|
||||||
|
|
||||||
|
const auto listen_hosts = getListenHosts(config());
|
||||||
|
const auto listen_try = getListenTry(config());
|
||||||
|
|
||||||
|
if (config().has("keeper_server"))
|
||||||
|
{
|
||||||
|
#if USE_NURAFT
|
||||||
|
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
|
||||||
|
//// of clickhouse-keeper, so start synchronously.
|
||||||
|
bool can_initialize_keeper_async = false;
|
||||||
|
|
||||||
|
if (has_zookeeper) /// We have configured connection to some zookeeper cluster
|
||||||
|
{
|
||||||
|
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
|
||||||
|
/// synchronously.
|
||||||
|
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
|
||||||
|
}
|
||||||
|
/// Initialize keeper RAFT.
|
||||||
|
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
|
||||||
|
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
|
||||||
|
|
||||||
|
for (const auto & listen_host : listen_hosts)
|
||||||
|
{
|
||||||
|
/// TCP Keeper
|
||||||
|
const char * port_name = "keeper_server.tcp_port";
|
||||||
|
createServer(
|
||||||
|
config(), listen_host, port_name, listen_try, /* start_server: */ false,
|
||||||
|
servers_to_start_before_tables,
|
||||||
|
[&](UInt16 port) -> ProtocolServerAdapter
|
||||||
|
{
|
||||||
|
Poco::Net::ServerSocket socket;
|
||||||
|
auto address = socketBindListen(socket, listen_host, port);
|
||||||
|
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
|
||||||
|
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
|
||||||
|
return ProtocolServerAdapter(
|
||||||
|
listen_host,
|
||||||
|
port_name,
|
||||||
|
"Keeper (tcp): " + address.toString(),
|
||||||
|
std::make_unique<TCPServer>(
|
||||||
|
new KeeperTCPHandlerFactory(*this, false), server_pool, socket));
|
||||||
|
});
|
||||||
|
|
||||||
|
const char * secure_port_name = "keeper_server.tcp_port_secure";
|
||||||
|
createServer(
|
||||||
|
config(), listen_host, secure_port_name, listen_try, /* start_server: */ false,
|
||||||
|
servers_to_start_before_tables,
|
||||||
|
[&](UInt16 port) -> ProtocolServerAdapter
|
||||||
|
{
|
||||||
|
#if USE_SSL
|
||||||
|
Poco::Net::SecureServerSocket socket;
|
||||||
|
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
|
||||||
|
socket.setReceiveTimeout(config().getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC));
|
||||||
|
socket.setSendTimeout(config().getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC));
|
||||||
|
return ProtocolServerAdapter(
|
||||||
|
listen_host,
|
||||||
|
secure_port_name,
|
||||||
|
"Keeper with secure protocol (tcp_secure): " + address.toString(),
|
||||||
|
std::make_unique<TCPServer>(
|
||||||
|
new KeeperTCPHandlerFactory(*this, true), server_pool, socket));
|
||||||
|
#else
|
||||||
|
UNUSED(port);
|
||||||
|
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
|
||||||
|
ErrorCodes::SUPPORT_IS_DISABLED};
|
||||||
|
#endif
|
||||||
|
});
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination.");
|
||||||
|
#endif
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto & server : servers_to_start_before_tables)
|
||||||
|
{
|
||||||
|
server.start();
|
||||||
|
LOG_INFO(log, "Listening for {}", server.getDescription());
|
||||||
|
}
|
||||||
|
|
||||||
auto & access_control = global_context->getAccessControl();
|
auto & access_control = global_context->getAccessControl();
|
||||||
if (config().has("custom_settings_prefixes"))
|
if (config().has("custom_settings_prefixes"))
|
||||||
access_control.setCustomSettingsPrefixes(config().getString("custom_settings_prefixes"));
|
access_control.setCustomSettingsPrefixes(config().getString("custom_settings_prefixes"));
|
||||||
@ -1075,83 +1152,6 @@ if (ThreadFuzzer::instance().isEffective())
|
|||||||
/// try set up encryption. There are some errors in config, error will be printed and server wouldn't start.
|
/// try set up encryption. There are some errors in config, error will be printed and server wouldn't start.
|
||||||
CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs");
|
CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs");
|
||||||
|
|
||||||
const auto listen_hosts = getListenHosts(config());
|
|
||||||
const auto listen_try = getListenTry(config());
|
|
||||||
|
|
||||||
if (config().has("keeper_server"))
|
|
||||||
{
|
|
||||||
#if USE_NURAFT
|
|
||||||
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
|
|
||||||
//// of clickhouse-keeper, so start synchronously.
|
|
||||||
bool can_initialize_keeper_async = false;
|
|
||||||
|
|
||||||
if (has_zookeeper) /// We have configured connection to some zookeeper cluster
|
|
||||||
{
|
|
||||||
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
|
|
||||||
/// synchronously.
|
|
||||||
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
|
|
||||||
}
|
|
||||||
/// Initialize keeper RAFT.
|
|
||||||
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
|
|
||||||
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
|
|
||||||
|
|
||||||
for (const auto & listen_host : listen_hosts)
|
|
||||||
{
|
|
||||||
/// TCP Keeper
|
|
||||||
const char * port_name = "keeper_server.tcp_port";
|
|
||||||
createServer(
|
|
||||||
config(), listen_host, port_name, listen_try, /* start_server: */ false,
|
|
||||||
servers_to_start_before_tables,
|
|
||||||
[&](UInt16 port) -> ProtocolServerAdapter
|
|
||||||
{
|
|
||||||
Poco::Net::ServerSocket socket;
|
|
||||||
auto address = socketBindListen(socket, listen_host, port);
|
|
||||||
socket.setReceiveTimeout(settings.receive_timeout);
|
|
||||||
socket.setSendTimeout(settings.send_timeout);
|
|
||||||
return ProtocolServerAdapter(
|
|
||||||
listen_host,
|
|
||||||
port_name,
|
|
||||||
"Keeper (tcp): " + address.toString(),
|
|
||||||
std::make_unique<TCPServer>(
|
|
||||||
new KeeperTCPHandlerFactory(*this, false), server_pool, socket));
|
|
||||||
});
|
|
||||||
|
|
||||||
const char * secure_port_name = "keeper_server.tcp_port_secure";
|
|
||||||
createServer(
|
|
||||||
config(), listen_host, secure_port_name, listen_try, /* start_server: */ false,
|
|
||||||
servers_to_start_before_tables,
|
|
||||||
[&](UInt16 port) -> ProtocolServerAdapter
|
|
||||||
{
|
|
||||||
#if USE_SSL
|
|
||||||
Poco::Net::SecureServerSocket socket;
|
|
||||||
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
|
|
||||||
socket.setReceiveTimeout(settings.receive_timeout);
|
|
||||||
socket.setSendTimeout(settings.send_timeout);
|
|
||||||
return ProtocolServerAdapter(
|
|
||||||
listen_host,
|
|
||||||
secure_port_name,
|
|
||||||
"Keeper with secure protocol (tcp_secure): " + address.toString(),
|
|
||||||
std::make_unique<TCPServer>(
|
|
||||||
new KeeperTCPHandlerFactory(*this, true), server_pool, socket));
|
|
||||||
#else
|
|
||||||
UNUSED(port);
|
|
||||||
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
|
|
||||||
ErrorCodes::SUPPORT_IS_DISABLED};
|
|
||||||
#endif
|
|
||||||
});
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination.");
|
|
||||||
#endif
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto & server : servers_to_start_before_tables)
|
|
||||||
{
|
|
||||||
server.start();
|
|
||||||
LOG_INFO(log, "Listening for {}", server.getDescription());
|
|
||||||
}
|
|
||||||
|
|
||||||
SCOPE_EXIT({
|
SCOPE_EXIT({
|
||||||
/// Stop reloading of the main config. This must be done before `global_context->shutdown()` because
|
/// Stop reloading of the main config. This must be done before `global_context->shutdown()` because
|
||||||
/// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart.
|
/// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart.
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
#!/usr/bin/env python3
|
@ -0,0 +1,36 @@
|
|||||||
|
<?xml version="1.0" encoding="utf-8"?>
|
||||||
|
<clickhouse>
|
||||||
|
<keeper_server>
|
||||||
|
<tcp_port>9181</tcp_port>
|
||||||
|
<server_id>1</server_id>
|
||||||
|
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||||
|
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||||
|
<coordination_settings>
|
||||||
|
<operation_timeout_ms>5000</operation_timeout_ms>
|
||||||
|
<raft_logs_level>trace</raft_logs_level>
|
||||||
|
<session_timeout_ms>10000</session_timeout_ms>
|
||||||
|
</coordination_settings>
|
||||||
|
<raft_configuration>
|
||||||
|
<server>
|
||||||
|
<can_become_leader>true</can_become_leader>
|
||||||
|
<hostname>node1</hostname>
|
||||||
|
<id>1</id>
|
||||||
|
<port>2888</port>
|
||||||
|
<priority>1</priority>
|
||||||
|
</server>
|
||||||
|
</raft_configuration>
|
||||||
|
</keeper_server>
|
||||||
|
|
||||||
|
<user_directories>
|
||||||
|
<replicated>
|
||||||
|
<zookeeper_path>/clickhouse/access</zookeeper_path>
|
||||||
|
</replicated>
|
||||||
|
</user_directories>
|
||||||
|
|
||||||
|
<zookeeper>
|
||||||
|
<node index="1">
|
||||||
|
<host>node1</host>
|
||||||
|
<port>9181</port>
|
||||||
|
</node>
|
||||||
|
</zookeeper>
|
||||||
|
</clickhouse>
|
21
tests/integration/test_keeper_and_access_storage/test.py
Normal file
21
tests/integration/test_keeper_and_access_storage/test.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
|
node1 = cluster.add_instance('node1', main_configs=['configs/keeper.xml'], stay_alive=True)
|
||||||
|
|
||||||
|
# test that server is able to start
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def started_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
def test_create_replicated(started_cluster):
|
||||||
|
assert node1.query("SELECT 1") == "1\n"
|
Loading…
Reference in New Issue
Block a user