ClickHouse/programs/copier/ClusterCopierApp.cpp

245 lines
10 KiB
C++
Raw Normal View History

2020-02-19 15:01:08 +00:00
#include "ClusterCopierApp.h"
2020-07-04 13:54:24 +00:00
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <IO/ConnectionTimeoutsContext.h>
2020-10-29 03:39:43 +00:00
#include <Formats/registerFormats.h>
2021-10-02 07:13:14 +00:00
#include <base/scope_guard_safe.h>
#include <unistd.h>
2021-05-16 22:06:09 +00:00
#include <filesystem>
2020-07-04 13:54:24 +00:00
2021-05-16 22:06:09 +00:00
namespace fs = std::filesystem;
2020-02-19 15:01:08 +00:00
namespace DB
{
/// ClusterCopierApp
void ClusterCopierApp::initialize(Poco::Util::Application & self)
{
is_help = config().has("help");
if (is_help)
return;
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
2021-04-22 18:04:32 +00:00
log_level = config().getString("log-level", "info");
2020-02-19 15:01:08 +00:00
is_safe_mode = config().has("safe-mode");
2021-04-29 19:16:51 +00:00
is_status_mode = config().has("status");
2020-02-19 15:01:08 +00:00
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
2020-03-16 21:05:38 +00:00
if (config().has("move-fault-probability"))
move_fault_probability = std::max(std::min(config().getDouble("move-fault-probability"), 1.0), 0.0);
2021-05-16 22:06:09 +00:00
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : fs::current_path().string();
2020-04-21 17:37:40 +00:00
max_table_tries = std::max<size_t>(config().getUInt("max-table-tries", 3), 1);
max_shard_partition_tries = std::max<size_t>(config().getUInt("max-shard-partition-tries", 3), 1);
max_shard_partition_piece_tries_for_alter = std::max<size_t>(config().getUInt("max-shard-partition-piece-tries-for-alter", 10), 1);
retry_delay_ms = std::chrono::milliseconds(std::max<size_t>(config().getUInt("retry-delay-ms", 1000), 100));
2020-04-21 17:37:40 +00:00
if (config().has("experimental-use-sample-offset"))
experimental_use_sample_offset = config().getBool("experimental-use-sample-offset");
2020-02-19 15:01:08 +00:00
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco::Timestamp().epochTime();
auto curr_pid = Poco::Process::id();
process_id = std::to_string(DateLUT::instance().toNumYYYYMMDDhhmmss(timestamp)) + "_" + std::to_string(curr_pid);
host_id = escapeForFileName(getFQDNOrHostName()) + '#' + process_id;
2021-05-24 16:03:09 +00:00
process_path = fs::weakly_canonical(fs::path(base_dir) / ("clickhouse-copier_" + process_id));
2021-05-16 22:06:09 +00:00
fs::create_directories(process_path);
2020-02-19 15:01:08 +00:00
/// Override variables for BaseDaemon
if (config().has("log-level"))
config().setString("logger.level", config().getString("log-level"));
if (config().has("base-dir") || !config().has("logger.log"))
2021-05-16 22:06:09 +00:00
config().setString("logger.log", fs::path(process_path) / "log.log");
2020-02-19 15:01:08 +00:00
if (config().has("base-dir") || !config().has("logger.errorlog"))
2021-05-16 22:06:09 +00:00
config().setString("logger.errorlog", fs::path(process_path) / "log.err.log");
2020-02-19 15:01:08 +00:00
Base::initialize(self);
}
void ClusterCopierApp::handleHelp(const std::string &, const std::string &)
{
uint16_t terminal_width = 0;
if (isatty(STDIN_FILENO))
terminal_width = getTerminalWidth();
Poco::Util::HelpFormatter help_formatter(options());
if (terminal_width)
help_formatter.setWidth(terminal_width);
help_formatter.setCommand(commandName());
help_formatter.setHeader("Copies tables from one cluster to another");
help_formatter.setUsage("--config-file <config-file> --task-path <task-path>");
help_formatter.format(std::cerr);
2020-02-19 15:01:08 +00:00
stopOptionsProcessing();
}
void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
{
Base::defineOptions(options);
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
2020-02-11 18:34:48 +00:00
.argument("task-path").binding("task-path"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
2020-02-11 18:34:48 +00:00
.argument("task-file").binding("task-file"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
2020-02-11 18:34:48 +00:00
.argument("task-upload-force").binding("task-upload-force"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
2020-02-11 18:34:48 +00:00
.binding("safe-mode"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
2020-02-11 18:34:48 +00:00
.argument("copy-fault-probability").binding("copy-fault-probability"));
2020-03-16 21:05:38 +00:00
options.addOption(Poco::Util::Option("move-fault-probability", "", "the moving fails with specified probability (used to test partition state recovering)")
.argument("move-fault-probability").binding("move-fault-probability"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("log-level", "", "sets log level")
2020-02-11 18:34:48 +00:00
.argument("log-level").binding("log-level"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("base-dir", "", "base directory for copiers, consecutive copier launches will populate /base-dir/launch_id/* directories")
2020-02-11 18:34:48 +00:00
.argument("base-dir").binding("base-dir"));
2020-04-21 17:37:40 +00:00
options.addOption(Poco::Util::Option("experimental-use-sample-offset", "", "Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k")
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
2021-04-29 19:16:51 +00:00
options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
2020-02-19 15:01:08 +00:00
options.addOption(Poco::Util::Option("max-table-tries", "", "Number of tries for the copy table task")
.argument("max-table-tries").binding("max-table-tries"));
options.addOption(Poco::Util::Option("max-shard-partition-tries", "", "Number of tries for the copy one partition task")
.argument("max-shard-partition-tries").binding("max-shard-partition-tries"));
options.addOption(Poco::Util::Option("max-shard-partition-piece-tries-for-alter", "", "Number of tries for final ALTER ATTACH to destination table")
.argument("max-shard-partition-piece-tries-for-alter").binding("max-shard-partition-piece-tries-for-alter"));
options.addOption(Poco::Util::Option("retry-delay-ms", "", "Delay between task retries")
.argument("retry-delay-ms").binding("retry-delay-ms"));
2020-02-19 15:01:08 +00:00
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
2020-02-11 18:34:48 +00:00
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
2020-02-19 15:01:08 +00:00
}
void ClusterCopierApp::mainImpl()
{
2021-04-29 19:16:51 +00:00
/// Status command
{
if (is_status_mode)
{
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
SCOPE_EXIT_SAFE(context->shutdown());
auto zookeeper = context->getZooKeeper();
auto status_json = zookeeper->get(task_path + "/status");
LOG_INFO(&logger(), "{}", status_json);
std::cout << status_json << std::endl;
context->resetZooKeeper();
return;
}
}
2020-07-04 13:54:24 +00:00
StatusFile status_file(process_path + "/status", StatusFile::write_full_info);
2020-02-19 15:01:08 +00:00
ThreadStatus thread_status;
2020-05-18 08:08:55 +00:00
auto * log = &logger();
2020-09-17 12:15:05 +00:00
LOG_INFO(log, "Starting clickhouse-copier (id {}, host_id {}, path {}, revision {})", process_id, host_id, process_path, ClickHouseRevision::getVersionRevision());
2020-02-19 15:01:08 +00:00
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
2020-02-19 15:01:08 +00:00
context->makeGlobalContext();
SCOPE_EXIT_SAFE(context->shutdown());
2020-02-19 15:01:08 +00:00
context->setConfig(loaded_config.configuration);
context->setApplicationType(Context::ApplicationType::LOCAL);
2020-07-26 16:23:08 +00:00
context->setPath(process_path + "/");
2020-02-19 15:01:08 +00:00
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerDictionaries();
registerDisks();
2020-10-29 03:39:43 +00:00
registerFormats();
2020-02-19 15:01:08 +00:00
static const std::string default_database = "_local";
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, context));
2020-02-19 15:01:08 +00:00
context->setCurrentDatabase(default_database);
/// Disable queries logging, since:
/// - There are bits that is not allowed for global context, like adding factories info (for the query_log)
/// - And anyway it is useless for copier.
context->setSetting("log_queries", false);
auto local_context = Context::createCopy(context);
2020-02-19 15:01:08 +00:00
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(local_context);
2020-02-19 15:01:08 +00:00
auto copier = std::make_unique<ClusterCopier>(
task_path, host_id, default_database, local_context, log);
2020-02-19 15:01:08 +00:00
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
2020-03-16 21:05:38 +00:00
copier->setMoveFaultProbability(move_fault_probability);
copier->setMaxTableTries(max_table_tries);
copier->setMaxShardPartitionTries(max_shard_partition_tries);
copier->setMaxShardPartitionPieceTriesForAlter(max_shard_partition_piece_tries_for_alter);
copier->setRetryDelayMs(retry_delay_ms);
2020-04-21 17:37:40 +00:00
copier->setExperimentalUseSampleOffset(experimental_use_sample_offset);
2020-02-19 15:01:08 +00:00
auto task_file = config().getString("task-file", "");
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));
/// Reset ZooKeeper before removing ClusterCopier.
/// Otherwise zookeeper watch can call callback which use already removed ClusterCopier object.
context->resetZooKeeper();
}
int ClusterCopierApp::main(const std::vector<std::string> &)
{
if (is_help)
return 0;
try
{
mainImpl();
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::root(), __PRETTY_FUNCTION__);
auto code = getCurrentExceptionCode();
return (code) ? code : -1;
}
return 0;
}
}
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseClusterCopier(int argc, char ** argv)
{
try
{
DB::ClusterCopierApp app;
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return (code) ? code : -1;
}
}