mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Refactor
This commit is contained in:
parent
65b99ec6a7
commit
a4d1cd514d
@ -1,4 +1,5 @@
|
|||||||
#include "ClusterCopierApp.h"
|
#include "ClusterCopierApp.h"
|
||||||
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||||
#include <Common/StatusFile.h>
|
#include <Common/StatusFile.h>
|
||||||
#include <Common/TerminalSize.h>
|
#include <Common/TerminalSize.h>
|
||||||
#include <IO/ConnectionTimeouts.h>
|
#include <IO/ConnectionTimeouts.h>
|
||||||
@ -12,11 +13,6 @@ namespace fs = std::filesystem;
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// ClusterCopierApp
|
/// ClusterCopierApp
|
||||||
|
|
||||||
void ClusterCopierApp::initialize(Poco::Util::Application & self)
|
void ClusterCopierApp::initialize(Poco::Util::Application & self)
|
||||||
@ -197,8 +193,7 @@ void ClusterCopierApp::mainImpl()
|
|||||||
if (!task_file.empty())
|
if (!task_file.empty())
|
||||||
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
|
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
|
||||||
|
|
||||||
if (config().has("zookeeper") && config().has("keeper"))
|
zkutil::validateZooKeeperConfig(config());
|
||||||
throw Exception("Both ZooKeeper and Keeper are specified", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
|
|
||||||
|
|
||||||
copier->init();
|
copier->init();
|
||||||
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
|
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
|
||||||
|
@ -20,11 +20,6 @@
|
|||||||
|
|
||||||
#include <re2/re2.h>
|
#include <re2/re2.h>
|
||||||
|
|
||||||
namespace DB::ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setupLogging(const std::string & log_level)
|
static void setupLogging(const std::string & log_level)
|
||||||
{
|
{
|
||||||
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel);
|
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel);
|
||||||
@ -95,11 +90,9 @@ static std::vector<std::string> extractFromConfig(
|
|||||||
{
|
{
|
||||||
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
|
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
|
||||||
|
|
||||||
if (bootstrap_configuration->has("zookeeper") && bootstrap_configuration->has("keeper"))
|
zkutil::validateZooKeeperConfig(*bootstrap_configuration);
|
||||||
throw DB::Exception(DB::ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both ZooKeeper and Keeper are specified");
|
|
||||||
|
|
||||||
zkutil::ZooKeeperPtr zookeeper;
|
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(
|
||||||
zookeeper = std::make_shared<zkutil::ZooKeeper>(
|
|
||||||
*bootstrap_configuration, bootstrap_configuration->has("zookeeper") ? "zookeeper" : "keeper", nullptr);
|
*bootstrap_configuration, bootstrap_configuration->has("zookeeper") ? "zookeeper" : "keeper", nullptr);
|
||||||
|
|
||||||
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
|
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
|
||||||
|
@ -815,10 +815,8 @@ try
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
if (config().has("zookeeper") && config().has("keeper"))
|
zkutil::validateZooKeeperConfig(config());
|
||||||
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both ZooKeeper and Keeper are specified");
|
bool has_zookeeper = zkutil::hasZooKeeperConfig(config());
|
||||||
|
|
||||||
bool has_zookeeper = config().has("zookeeper") || config().has("keeper") || config().has("keeper_server");
|
|
||||||
|
|
||||||
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
|
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
|
||||||
zkutil::EventPtr main_config_zk_changed_event = std::make_shared<Poco::Event>();
|
zkutil::EventPtr main_config_zk_changed_event = std::make_shared<Poco::Event>();
|
||||||
@ -1310,7 +1308,7 @@ try
|
|||||||
{
|
{
|
||||||
/// We do not load ZooKeeper configuration on the first config loading
|
/// We do not load ZooKeeper configuration on the first config loading
|
||||||
/// because TestKeeper server is not started yet.
|
/// because TestKeeper server is not started yet.
|
||||||
if (config->has("zookeeper") || config->has("keeper") || config->has("keeper_server"))
|
if (zkutil::hasZooKeeperConfig(config))
|
||||||
global_context->reloadZooKeeperIfChanged(config);
|
global_context->reloadZooKeeperIfChanged(config);
|
||||||
|
|
||||||
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
|
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
|
||||||
|
@ -30,6 +30,7 @@ namespace ErrorCodes
|
|||||||
extern const int NOT_IMPLEMENTED;
|
extern const int NOT_IMPLEMENTED;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int NO_ELEMENTS_IN_CONFIG;
|
extern const int NO_ELEMENTS_IN_CONFIG;
|
||||||
|
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1335,16 +1336,29 @@ String getSequentialNodeName(const String & prefix, UInt64 number)
|
|||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
String getZookeeperConfigName(const Poco::Util::AbstractConfiguration & config)
|
void validateZooKeeperConfig(const Poco::Util::AbstractConfiguration & config)
|
||||||
|
{
|
||||||
|
if (config.has("zookeeper") && config.has("keeper"))
|
||||||
|
throw DB::Exception(DB::ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both ZooKeeper and Keeper are specified");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config, bool allow_keeper_server)
|
||||||
|
{
|
||||||
|
return config.has("zookeeper") || config.has("keeper") || (allow_keeper_server && config.has("keeper_server"));
|
||||||
|
}
|
||||||
|
|
||||||
|
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config, bool allow_keeper_server)
|
||||||
{
|
{
|
||||||
if (config.has("zookeeper"))
|
if (config.has("zookeeper"))
|
||||||
return "zookeeper";
|
return "zookeeper";
|
||||||
else if (config.has("keeper"))
|
|
||||||
|
if (config.has("keeper"))
|
||||||
return "keeper";
|
return "keeper";
|
||||||
else if (config.has("keeper_server"))
|
|
||||||
|
if (allow_keeper_server && config.has("keeper_server"))
|
||||||
return "keeper_server";
|
return "keeper_server";
|
||||||
else
|
|
||||||
throw DB::Exception("There is no Zookeeper configuration in server config", DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
throw DB::Exception(DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -667,6 +667,10 @@ String extractZooKeeperPath(const String & path, bool check_starts_with_slash, P
|
|||||||
|
|
||||||
String getSequentialNodeName(const String & prefix, UInt64 number);
|
String getSequentialNodeName(const String & prefix, UInt64 number);
|
||||||
|
|
||||||
String getZookeeperConfigName(const Poco::Util::AbstractConfiguration & config);
|
void validateZooKeeperConfig(const Poco::Util::AbstractConfiguration & config);
|
||||||
|
|
||||||
|
bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config, bool allow_keeper_server = true);
|
||||||
|
|
||||||
|
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config, bool allow_keeper_server = true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,66 +0,0 @@
|
|||||||
#include <Common/Config/ConfigProcessor.h>
|
|
||||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
|
||||||
#include <Poco/Event.h>
|
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
/// A tool for reproducing https://issues.apache.org/jira/browse/ZOOKEEPER-706
|
|
||||||
/// Original libzookeeper can't reconnect the session if the length of SET_WATCHES message
|
|
||||||
/// exceeds jute.maxbuffer (0xfffff by default).
|
|
||||||
/// This happens when the number of watches exceeds ~29000.
|
|
||||||
///
|
|
||||||
/// Session reconnect can be caused by forbidding packets to the current zookeeper server, e.g.
|
|
||||||
/// sudo ip6tables -A OUTPUT -d mtzoo01it.haze.yandex.net -j REJECT
|
|
||||||
|
|
||||||
const size_t N_THREADS = 100;
|
|
||||||
|
|
||||||
int main(int argc, char ** argv)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (argc != 3)
|
|
||||||
{
|
|
||||||
std::cerr << "usage: " << argv[0] << " <zookeeper_config> <number_of_watches>" << std::endl;
|
|
||||||
return 3;
|
|
||||||
}
|
|
||||||
|
|
||||||
DB::ConfigProcessor processor(argv[1], false, true);
|
|
||||||
auto config = processor.loadConfig().configuration;
|
|
||||||
zkutil::ZooKeeper zk(*config, zkutil::getZookeeperConfigName(*config), nullptr);
|
|
||||||
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
|
|
||||||
|
|
||||||
/// NOTE: setting watches in multiple threads because doing it in a single thread is too slow.
|
|
||||||
size_t watches_per_thread = std::stoull(argv[2]) / N_THREADS;
|
|
||||||
std::vector<std::thread> threads;
|
|
||||||
for (size_t i_thread = 0; i_thread < N_THREADS; ++i_thread)
|
|
||||||
{
|
|
||||||
threads.emplace_back([&, i_thread]
|
|
||||||
{
|
|
||||||
for (size_t i = 0; i < watches_per_thread; ++i)
|
|
||||||
zk.exists("/clickhouse/nonexistent_node" + std::to_string(i * N_THREADS + i_thread), nullptr, watch);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
for (size_t i_thread = 0; i_thread < N_THREADS; ++i_thread)
|
|
||||||
threads[i_thread].join();
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
std::cerr << "WAITING..." << std::endl;
|
|
||||||
sleep(10);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Poco::Exception & e)
|
|
||||||
{
|
|
||||||
std::cerr << "Exception: " << e.displayText() << std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
catch (std::exception & e)
|
|
||||||
{
|
|
||||||
std::cerr << "std::exception: " << e.what() << std::endl;
|
|
||||||
return 3;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
std::cerr << "Some exception" << std::endl;
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
}
|
|
@ -2360,7 +2360,7 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
|
|||||||
|
|
||||||
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
|
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
|
||||||
if (!shared->zookeeper)
|
if (!shared->zookeeper)
|
||||||
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, zkutil::getZookeeperConfigName(config), getZooKeeperLog());
|
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog());
|
||||||
else if (shared->zookeeper->expired())
|
else if (shared->zookeeper->expired())
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
@ -2399,9 +2399,9 @@ bool Context::tryCheckClientConnectionToMyKeeperCluster() const
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
const auto config_name = zkutil::getZooKeeperConfigName(getConfigRef());
|
||||||
/// If our server is part of main Keeper cluster
|
/// If our server is part of main Keeper cluster
|
||||||
if (checkZooKeeperConfigIsLocal(getConfigRef(), "zookeeper") || checkZooKeeperConfigIsLocal(getConfigRef(), "keeper")
|
if (config_name == "keeper_server" || checkZooKeeperConfigIsLocal(getConfigRef(), config_name))
|
||||||
|| (!getConfigRef().has("zookeeper") && !getConfigRef().has("keeper") && getConfigRef().has("keeper_server")))
|
|
||||||
{
|
{
|
||||||
LOG_DEBUG(shared->log, "Keeper server is participant of the main zookeeper cluster, will try to connect to it");
|
LOG_DEBUG(shared->log, "Keeper server is participant of the main zookeeper cluster, will try to connect to it");
|
||||||
getZooKeeper();
|
getZooKeeper();
|
||||||
@ -2600,7 +2600,7 @@ void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
|
|||||||
{
|
{
|
||||||
std::lock_guard lock(shared->zookeeper_mutex);
|
std::lock_guard lock(shared->zookeeper_mutex);
|
||||||
shared->zookeeper_config = config;
|
shared->zookeeper_config = config;
|
||||||
reloadZooKeeperIfChangedImpl(config, zkutil::getZookeeperConfigName(*config), shared->zookeeper, getZooKeeperLog());
|
reloadZooKeeperIfChangedImpl(config, zkutil::getZooKeeperConfigName(*config), shared->zookeeper, getZooKeeperLog());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
|
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
|
||||||
|
@ -26,7 +26,7 @@ try
|
|||||||
auto config = processor.loadConfig().configuration;
|
auto config = processor.loadConfig().configuration;
|
||||||
String root_path = argv[2];
|
String root_path = argv[2];
|
||||||
|
|
||||||
zkutil::ZooKeeper zk(*config, zkutil::getZookeeperConfigName(*config), nullptr);
|
zkutil::ZooKeeper zk(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
|
||||||
|
|
||||||
String temp_path = root_path + "/temp";
|
String temp_path = root_path + "/temp";
|
||||||
String blocks_path = root_path + "/block_numbers";
|
String blocks_path = root_path + "/block_numbers";
|
||||||
|
@ -29,7 +29,7 @@ try
|
|||||||
auto config = processor.loadConfig().configuration;
|
auto config = processor.loadConfig().configuration;
|
||||||
String zookeeper_path = argv[2];
|
String zookeeper_path = argv[2];
|
||||||
|
|
||||||
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, zkutil::getZookeeperConfigName(*config), nullptr);
|
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
|
||||||
|
|
||||||
std::unordered_map<String, std::set<Int64>> current_inserts;
|
std::unordered_map<String, std::set<Int64>> current_inserts;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user