ClickHouse/programs/keeper-client/KeeperClient.cpp
Azat Khuzhin e623ad041f Make C-z ignorance configurable (ignore_shell_suspend) in clickhouse-client
C-z is extermelly useful for some users (like myself), so provide a way
to configure it in client and avoid it's ignorance in clickhouse-disks
(I hope it is OK since it is not that known utility and it does not have
it's own configuration, while cli option is useless, one should remeber
about it).

Honestly I've never seen any interactive client that forbids C-z, so
ignoring it my default looks strange to me.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2024-08-19 08:33:53 +02:00

454 lines
13 KiB
C++

#include "KeeperClient.h"
#include "Commands.h"
#include <Client/ReplxxLineReader.h>
#include <Client/ClientBase.h>
#include "Common/VersionNumber.h"
#include <Common/Config/ConfigProcessor.h>
#include <Common/EventNotifier.h>
#include <Common/filesystemHelpers.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/parseQuery.h>
#include <Poco/Util/HelpFormatter.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
String KeeperClient::executeFourLetterCommand(const String & command)
{
/// We need to create a new socket every time because ZooKeeper forcefully shuts down the connection after a four-letter-word command.
Poco::Net::StreamSocket socket;
socket.connect(Poco::Net::SocketAddress{zk_args.hosts[0]}, zk_args.connection_timeout_ms * 1000);
socket.setReceiveTimeout(zk_args.operation_timeout_ms * 1000);
socket.setSendTimeout(zk_args.operation_timeout_ms * 1000);
socket.setNoDelay(true);
ReadBufferFromPocoSocket in(socket);
WriteBufferFromPocoSocket out(socket);
out.write(command.data(), command.size());
out.next();
String result;
readStringUntilEOF(result, in);
in.next();
return result;
}
std::vector<String> KeeperClient::getCompletions(const String & prefix) const
{
Tokens tokens(prefix.data(), prefix.data() + prefix.size(), 0, false);
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS);
if (pos->type != TokenType::BareWord)
return registered_commands_and_four_letter_words;
++pos;
if (pos->isEnd())
return registered_commands_and_four_letter_words;
ParserToken{TokenType::Whitespace}.ignore(pos);
std::vector<String> result;
String string_path;
Expected expected;
if (!parseKeeperPath(pos, expected, string_path))
string_path = cwd;
if (!pos->isEnd())
return result;
fs::path path = string_path;
String parent_path;
if (string_path.ends_with("/"))
parent_path = getAbsolutePath(string_path);
else
parent_path = getAbsolutePath(path.parent_path());
try
{
for (const auto & child : zookeeper->getChildren(parent_path))
result.push_back(child);
}
catch (Coordination::Exception &) {} // NOLINT(bugprone-empty-catch)
std::sort(result.begin(), result.end());
return result;
}
void KeeperClient::askConfirmation(const String & prompt, std::function<void()> && callback)
{
if (!ask_confirmation)
{
callback();
return;
}
std::cout << prompt << " Continue?\n";
waiting_confirmation = true;
confirmation_callback = callback;
}
fs::path KeeperClient::getAbsolutePath(const String & relative) const
{
String result;
if (relative.starts_with('/'))
result = fs::weakly_canonical(relative);
else
result = fs::weakly_canonical(cwd / relative);
if (result.ends_with('/') && result.size() > 1)
result.pop_back();
return result;
}
void KeeperClient::loadCommands(std::vector<Command> && new_commands)
{
for (const auto & command : new_commands)
{
String name = command->getName();
commands.insert({name, command});
registered_commands_and_four_letter_words.push_back(std::move(name));
}
for (const auto & command : four_letter_word_commands)
registered_commands_and_four_letter_words.push_back(command);
std::sort(registered_commands_and_four_letter_words.begin(), registered_commands_and_four_letter_words.end());
}
void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
{
Poco::Util::Application::defineOptions(options);
options.addOption(
Poco::Util::Option("help", "", "show help and exit")
.binding("help"));
options.addOption(
Poco::Util::Option("host", "h", "server hostname. default `localhost`")
.argument("<host>")
.binding("host"));
options.addOption(
Poco::Util::Option("port", "p", "server port. default `9181`")
.argument("<port>")
.binding("port"));
options.addOption(
Poco::Util::Option("query", "q", "will execute given query, then exit.")
.argument("<query>")
.binding("query"));
options.addOption(
Poco::Util::Option("connection-timeout", "", "set connection timeout in seconds. default 10s.")
.argument("<seconds>")
.binding("connection-timeout"));
options.addOption(
Poco::Util::Option("session-timeout", "", "set session timeout in seconds. default 10s.")
.argument("<seconds>")
.binding("session-timeout"));
options.addOption(
Poco::Util::Option("operation-timeout", "", "set operation timeout in seconds. default 10s.")
.argument("<seconds>")
.binding("operation-timeout"));
options.addOption(
Poco::Util::Option("config-file", "c", "if set, will try to get a connection string from clickhouse config. default `config.xml`")
.argument("<file>")
.binding("config-file"));
options.addOption(
Poco::Util::Option("history-file", "", "set path of history file. default `~/.keeper-client-history`")
.argument("<file>")
.binding("history-file"));
options.addOption(
Poco::Util::Option("log-level", "", "set log level")
.argument("<level>")
.binding("log-level"));
options.addOption(
Poco::Util::Option("no-confirmation", "", "if set, will not require a confirmation on several commands. default false for interactive and true for query")
.binding("no-confirmation"));
options.addOption(
Poco::Util::Option("tests-mode", "", "run keeper-client in a special mode for tests. all commands output are separated by special symbols. default false")
.binding("tests-mode"));
}
void KeeperClient::initialize(Poco::Util::Application & /* self */)
{
suggest.setCompletionsCallback(
[&](const String & prefix, size_t /* prefix_length */) { return getCompletions(prefix); });
loadCommands({
std::make_shared<LSCommand>(),
std::make_shared<CDCommand>(),
std::make_shared<SetCommand>(),
std::make_shared<CreateCommand>(),
std::make_shared<TouchCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<ExistsCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStaleBackups>(),
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<ReconfigCommand>(),
std::make_shared<SyncCommand>(),
std::make_shared<HelpCommand>(),
std::make_shared<FourLetterWordCommand>(),
std::make_shared<GetDirectChildrenNumberCommand>(),
std::make_shared<GetAllChildrenNumberCommand>(),
});
String home_path;
const char * home_path_cstr = getenv("HOME"); // NOLINT(concurrency-mt-unsafe)
if (home_path_cstr)
home_path = home_path_cstr;
if (config().has("history-file"))
history_file = config().getString("history-file");
else
history_file = home_path + "/.keeper-client-history";
if (!history_file.empty() && !fs::exists(history_file))
{
try
{
FS::createFile(history_file);
}
catch (const ErrnoException & e)
{
if (e.getErrno() != EEXIST)
throw;
}
}
String default_log_level;
if (config().has("query"))
/// We don't want to see any information log in query mode, unless it was set explicitly
default_log_level = "error";
else
default_log_level = "information";
Poco::Logger::root().setLevel(config().getString("log-level", default_log_level));
EventNotifier::init();
}
bool KeeperClient::processQueryText(const String & text)
{
if (exit_strings.find(text) != exit_strings.end())
return false;
try
{
if (waiting_confirmation)
{
waiting_confirmation = false;
if (text.size() == 1 && (text == "y" || text == "Y"))
confirmation_callback();
return true;
}
KeeperParser parser;
const char * begin = text.data();
const char * end = begin + text.size();
while (begin < end)
{
String message;
ASTPtr res = tryParseQuery(
parser,
begin,
end,
/* out_error_message = */ message,
/* hilite = */ true,
/* description = */ "",
/* allow_multi_statements = */ true,
/* max_query_size = */ 0,
/* max_parser_depth = */ 0,
/* max_parser_backtracks = */ 0,
/* skip_insignificant = */ false);
if (!res)
{
std::cerr << message << "\n";
return true;
}
auto * query = res->as<ASTKeeperQuery>();
auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
}
}
catch (Coordination::Exception & err)
{
std::cerr << err.message() << "\n";
}
return true;
}
void KeeperClient::runInteractiveReplxx()
{
LineReader::Patterns query_extenders = {"\\"};
LineReader::Patterns query_delimiters = {};
char word_break_characters[] = " \t\v\f\a\b\r\n/";
ReplxxLineReader lr(
suggest,
history_file,
/* multiline= */ false,
/* ignore_shell_suspend= */ false,
query_extenders,
query_delimiters,
word_break_characters,
/* highlighter_= */ {});
lr.enableBracketedPaste();
while (true)
{
String prompt;
if (waiting_confirmation)
prompt = "[y/n] ";
else
prompt = cwd.string() + " :) ";
auto input = lr.readLine(prompt, ":-] ");
if (input.empty())
break;
if (!processQueryText(input))
break;
}
}
void KeeperClient::runInteractiveInputStream()
{
for (String input; std::getline(std::cin, input);)
{
if (!processQueryText(input))
break;
std::cout << "\a\a\a\a" << std::endl;
std::cerr << std::flush;
}
}
void KeeperClient::runInteractive()
{
if (config().hasOption("tests-mode"))
runInteractiveInputStream();
else
runInteractiveReplxx();
}
int KeeperClient::main(const std::vector<String> & /* args */)
{
if (config().hasOption("help"))
{
Poco::Util::HelpFormatter help_formatter(KeeperClient::options());
auto header_str = fmt::format("{} [OPTION]\n", commandName());
help_formatter.setHeader(header_str);
help_formatter.format(std::cout);
return 0;
}
ConfigProcessor config_processor(config().getString("config-file", "config.xml"));
/// This will handle a situation when clickhouse is running on the embedded config, but config.d folder is also present.
ConfigProcessor::registerEmbeddedConfig("config.xml", "<clickhouse/>");
auto clickhouse_config = config_processor.loadConfig();
Poco::Util::AbstractConfiguration::Keys keys;
clickhouse_config.configuration->keys("zookeeper", keys);
if (!config().has("host") && !config().has("port") && !keys.empty())
{
LOG_INFO(getLogger("KeeperClient"), "Found keeper node in the config.xml, will use it for connection");
for (const auto & key : keys)
{
if (key != "node")
continue;
String prefix = "zookeeper." + key;
String host = clickhouse_config.configuration->getString(prefix + ".host");
String port = clickhouse_config.configuration->getString(prefix + ".port");
if (clickhouse_config.configuration->has(prefix + ".secure"))
host = "secure://" + host;
zk_args.hosts.push_back(host + ":" + port);
}
}
else
{
String host = config().getString("host", "localhost");
String port = config().getString("port", "9181");
zk_args.hosts.push_back(host + ":" + port);
}
zk_args.availability_zones.resize(zk_args.hosts.size());
zk_args.connection_timeout_ms = config().getInt("connection-timeout", 10) * 1000;
zk_args.session_timeout_ms = config().getInt("session-timeout", 10) * 1000;
zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000;
zookeeper = zkutil::ZooKeeper::createWithoutKillingPreviousSessions(zk_args);
if (config().has("no-confirmation") || config().has("query"))
ask_confirmation = false;
if (config().has("query"))
{
processQueryText(config().getString("query"));
}
else
runInteractive();
return 0;
}
}
int mainEntryClickHouseKeeperClient(int argc, char ** argv)
{
try
{
DB::KeeperClient client;
client.init(argc, argv);
return client.run();
}
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
}
}