Refactoring

This commit is contained in:
kssenii 2021-07-23 23:54:49 +03:00
parent 59b0ce21e4
commit 8c05e4f038
6 changed files with 132 additions and 144 deletions

View File

@ -216,20 +216,6 @@ void Client::processMainImplException(const Exception & e)
} }
bool Client::isInteractive()
{
/// Batch mode is enabled if one of the following is true:
/// - -e (--query) command line option is present.
/// The value of the option is used as the text of query (or of multiple queries).
/// If stdin is not a terminal, INSERT data for the first query is read from it.
/// - stdin is not a terminal. In this case queries are read from it.
/// - -qf (--queries-file) command line option is present.
/// The value of the option is used as file with query (or of multiple queries) to execute.
return stdin_is_a_tty && !config().has("query") && queries_files.empty();
}
void Client::loadSuggestionDataIfPossible() void Client::loadSuggestionDataIfPossible()
{ {
if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false)) if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false))
@ -271,8 +257,6 @@ bool Client::processQueryFromInteractive(const String & input)
int Client::childMainImpl() int Client::childMainImpl()
{ {
UseSSL use_ssl;
registerFormats(); registerFormats();
registerFunctions(); registerFunctions();
registerAggregateFunctions(); registerAggregateFunctions();
@ -1881,6 +1865,27 @@ void Client::processOptions(const OptionsDescription & options_description,
void Client::processConfig() void Client::processConfig()
{ {
/// Batch mode is enabled if one of the following is true:
/// - -e (--query) command line option is present.
/// The value of the option is used as the text of query (or of multiple queries).
/// If stdin is not a terminal, INSERT data for the first query is read from it.
/// - stdin is not a terminal. In this case queries are read from it.
/// - -qf (--queries-file) command line option is present.
/// The value of the option is used as file with query (or of multiple queries) to execute.
if (stdin_is_a_tty && !config().has("query") && queries_files.empty())
{
if (config().has("query") && config().has("queries-file"))
throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS);
is_interactive = true;
}
else
{
need_render_progress = config().getBool("progress", false);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);
}
if (config().has("multiquery")) if (config().has("multiquery"))
is_multiquery = true; is_multiquery = true;

View File

@ -13,6 +13,8 @@ public:
Client() = default; Client() = default;
protected: protected:
int childMainImpl() override;
bool supportPasswordOption() const override { return true; } bool supportPasswordOption() const override { return true; }
bool splitQueries() const override { return true; } bool splitQueries() const override { return true; }
@ -23,7 +25,7 @@ protected:
connect(); connect();
} }
bool processFile(const String & file) override bool processMultiQueryFromFile(const String & file) override
{ {
connection->setDefaultDatabase(connection_parameters.default_database); connection->setDefaultDatabase(connection_parameters.default_database);
String text; String text;
@ -36,10 +38,9 @@ protected:
void initializeChild() override; void initializeChild() override;
void processMainImplException(const Exception & e) override; void processMainImplException(const Exception & e) override;
bool isInteractive() override;
void loadSuggestionDataIfPossible() override; void loadSuggestionDataIfPossible() override;
bool processQueryFromInteractive(const String & input) override; bool processQueryFromInteractive(const String & input) override;
int childMainImpl() override;
bool checkErrorMatchesHints(const TestHint & test_hint, bool had_error) override; bool checkErrorMatchesHints(const TestHint & test_hint, bool had_error) override;
void reportQueryError() const override; void reportQueryError() const override;
bool processWithFuzzing(const String & text) override; bool processWithFuzzing(const String & text) override;

View File

@ -26,12 +26,10 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h> #include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
#include <common/ErrorHandlers.h> #include <common/ErrorHandlers.h>
#include <Common/StatusFile.h>
#include <Functions/registerFunctions.h> #include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h> #include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h> #include <TableFunctions/registerTableFunctions.h>
@ -357,40 +355,14 @@ void LocalServer::setupUsers()
int LocalServer::childMainImpl() int LocalServer::childMainImpl()
{ {
Poco::Logger * log = &logger();
ThreadStatus thread_status; ThreadStatus thread_status;
UseSSL use_ssl;
if (!is_interactive && !config().has("query") && !config().has("table-structure") && !config().has("queries-file")) /// Nothing to process
{
if (config().hasOption("verbose"))
std::cerr << "There are no queries to process." << '\n';
return Application::EXIT_OK;
}
if (config().has("query") && config().has("queries-file"))
throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS);
/// Prompt may contain the following substitutions in a form of {name}. /// Prompt may contain the following substitutions in a form of {name}.
std::map<String, String> prompt_substitutions{{"display_name", server_display_name}}; std::map<String, String> prompt_substitutions{{"display_name", server_display_name}};
/// Quite suboptimal. /// Quite suboptimal.
for (const auto & [key, value] : prompt_substitutions) for (const auto & [key, value] : prompt_substitutions)
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value); boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
shared_context = Context::createShared();
global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::LOCAL);
tryInitPath();
std::optional<StatusFile> status;
/// Skip temp path installation
/// We will terminate process on error /// We will terminate process on error
static KillingErrorHandler error_handler; static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler); Poco::ErrorHandler::set(&error_handler);
@ -405,6 +377,82 @@ int LocalServer::childMainImpl()
registerDisks(); registerDisks();
registerFormats(); registerFormats();
/// we can't mutate global_context (can lead to races, as it was already passed to some background threads)
/// so we can't reuse it safely as a query context and need a copy here
query_context = Context::createCopy(global_context);
query_context->makeSessionContext();
query_context->makeQueryContext();
query_context->setUser("default", "", Poco::Net::SocketAddress{});
query_context->setCurrentQueryId("");
applyCmdSettings(query_context);
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(query_context);
if (need_render_progress)
{
/// Set progress callback, which can be run from multiple threads.
query_context->setProgressCallback([&](const Progress & value)
{
/// Write progress only if progress was updated
if (progress_indication.updateProgress(value))
progress_indication.writeProgress();
});
/// Set callback for file processing progress.
progress_indication.setFileProgressCallback(query_context);
}
if (is_interactive)
{
runInteractive();
}
else
{
runNonInteractive();
}
global_context->shutdown();
global_context.reset();
status.reset();
cleanup();
return Application::EXIT_OK;
}
void LocalServer::processConfig()
{
if (stdin_is_a_tty && !config().has("query") && !config().has("table-structure") && queries_files.empty())
{
if (config().has("query") && config().has("queries-file"))
throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS);
is_interactive = true;
}
else
{
/// For clickhouse-local non-interactive mode always assumes multiqeury can be used by default.
is_multiquery = true;
need_render_progress = config().getBool("progress", false);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);
}
shared_context = Context::createShared();
global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::LOCAL);
tryInitPath();
Poco::Logger * log = &logger();
/// Maybe useless /// Maybe useless
if (config().has("macros")) if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log)); global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
@ -454,15 +502,17 @@ int LocalServer::childMainImpl()
String path = global_context->getPath(); String path = global_context->getPath();
/// Lock path directory before read /// Lock path directory before read
status.emplace(path + "status", StatusFile::write_full_info); status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
LOG_DEBUG(log, "Loading metadata from {}", path); LOG_DEBUG(log, "Loading metadata from {}", path);
fs::create_directories(fs::path(path) / "data/"); fs::create_directories(fs::path(path) / "data/");
fs::create_directories(fs::path(path) / "metadata/"); fs::create_directories(fs::path(path) / "metadata/");
loadMetadataSystem(global_context); loadMetadataSystem(global_context);
attachSystemTables(global_context); attachSystemTables(global_context);
loadMetadata(global_context); loadMetadata(global_context);
DatabaseCatalog::instance().loadDatabases(); DatabaseCatalog::instance().loadDatabases();
LOG_DEBUG(log, "Loaded metadata."); LOG_DEBUG(log, "Loaded metadata.");
} }
else if (!config().has("no-system-tables")) else if (!config().has("no-system-tables"))
@ -470,53 +520,9 @@ int LocalServer::childMainImpl()
attachSystemTables(global_context); attachSystemTables(global_context);
} }
if (!is_interactive) echo_queries = config().hasOption("echo") || config().hasOption("verbose");
is_multiquery = true; prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
server_display_name = config().getString("display_name", getFQDNOrHostName());
/// we can't mutate global_context (can lead to races, as it was already passed to some background threads)
/// so we can't reuse it safely as a query context and need a copy here
query_context = Context::createCopy(global_context);
query_context->makeSessionContext();
query_context->makeQueryContext();
query_context->setUser("default", "", Poco::Net::SocketAddress{});
query_context->setCurrentQueryId("");
applyCmdSettings(query_context);
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(query_context);
if (need_render_progress)
{
/// Set progress callback, which can be run from multiple threads.
query_context->setProgressCallback([&](const Progress & value)
{
/// Write progress only if progress was updated
if (progress_indication.updateProgress(value))
progress_indication.writeProgress();
});
/// Set callback for file processing progress.
progress_indication.setFileProgressCallback(query_context);
}
if (is_interactive)
{
runInteractive();
}
else
{
processQueries();
}
global_context->shutdown();
global_context.reset();
status.reset();
cleanup();
return Application::EXIT_OK;
} }
@ -547,13 +553,6 @@ static std::string getHelpFooter()
} }
bool LocalServer::isInteractive()
{
/// Nothing to process
return stdin_is_a_tty && !config().has("query") && !config().has("table-structure") && queries_files.empty();
}
void LocalServer::printHelpMessage(const OptionsDescription & options_description) void LocalServer::printHelpMessage(const OptionsDescription & options_description)
{ {
std::cout << getHelpHeader() << "\n"; std::cout << getHelpHeader() << "\n";
@ -662,14 +661,6 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
queries_files = options["queries-file"].as<std::vector<std::string>>(); queries_files = options["queries-file"].as<std::vector<std::string>>();
} }
void LocalServer::processConfig()
{
echo_queries = config().hasOption("echo") || config().hasOption("verbose");
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
server_display_name = config().getString("display_name", getFQDNOrHostName());
}
} }

View File

@ -3,12 +3,16 @@
#include <filesystem> #include <filesystem>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <Common/ProgressIndication.h>
#include <Common/StatusFile.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <loggers/Loggers.h> #include <loggers/Loggers.h>
#include <Poco/Util/Application.h>
#include <Common/ProgressIndication.h>
#include <Client/ClientBase.h> #include <Client/ClientBase.h>
#include <Poco/Util/Application.h>
namespace DB namespace DB
{ {
@ -53,8 +57,6 @@ protected:
int childMainImpl() override; int childMainImpl() override;
bool isInteractive() override;
bool processQueryFromInteractive(const String & input) override bool processQueryFromInteractive(const String & input) override
{ {
return processQueryText(input); return processQueryText(input);
@ -77,7 +79,7 @@ protected:
bool supportPasswordOption() const override { return false; } bool supportPasswordOption() const override { return false; }
bool processFile(const String & file) override bool processMultiQueryFromFile(const String & file) override
{ {
auto text = getInitialCreateTableQuery(); auto text = getInitialCreateTableQuery();
String queries_from_file; String queries_from_file;
@ -90,6 +92,8 @@ protected:
private: private:
ContextMutablePtr query_context; ContextMutablePtr query_context;
std::optional<StatusFile> status;
std::exception_ptr exception; std::exception_ptr exception;
void processQuery(const String & query, std::exception_ptr exception); void processQuery(const String & query, std::exception_ptr exception);

View File

@ -4,16 +4,15 @@
#include <iomanip> #include <iomanip>
#include <filesystem> #include <filesystem>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
#include <common/argsToConfig.h> #include <common/argsToConfig.h>
#include <common/DateLUT.h> #include <common/DateLUT.h>
#include <common/LocalDate.h> #include <common/LocalDate.h>
#include <common/LineReader.h> #include <common/LineReader.h>
#include <common/scope_guard_safe.h> #include <common/scope_guard_safe.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
#include <Common/UTF8Helpers.h> #include <Common/UTF8Helpers.h>
#include <Common/TerminalSize.h> #include <Common/TerminalSize.h>
#include <Common/clearPasswordFromCommandLine.h> #include <Common/clearPasswordFromCommandLine.h>
@ -38,6 +37,7 @@
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <IO/UseSSL.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -461,7 +461,7 @@ bool ClientBase::processQueryText(const String & text)
if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; }))) if (exit_strings.end() != exit_strings.find(trim(text, [](char c) { return isWhitespaceASCII(c) || c == ';'; })))
return false; return false;
if (!config().has("multiquery")) if (!is_multiquery)
{ {
assert(!query_fuzzer_runs); assert(!query_fuzzer_runs);
prepareAndExecuteQuery(text); prepareAndExecuteQuery(text);
@ -596,10 +596,10 @@ void ClientBase::runNonInteractive()
for (const auto & queries_file : queries_files) for (const auto & queries_file : queries_files)
{ {
for (const auto & interleave_file : interleave_queries_files) for (const auto & interleave_file : interleave_queries_files)
if (!processFile(interleave_file)) if (!processMultiQueryFromFile(interleave_file))
return; return;
if (!processFile(queries_file)) if (!processMultiQueryFromFile(queries_file))
return; return;
} }
@ -644,11 +644,7 @@ static void showClientVersion()
int ClientBase::mainImpl() int ClientBase::mainImpl()
{ {
if (isInteractive()) UseSSL use_ssl;
is_interactive = true;
if (config().has("query") && !queries_files.empty())
throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS);
processConfig(); processConfig();
@ -660,17 +656,10 @@ int ClientBase::mainImpl()
clearTerminal(); clearTerminal();
showClientVersion(); showClientVersion();
} }
else
{
need_render_progress = config().getBool("progress", false);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);
}
return childMainImpl(); return childMainImpl();
} }
void ClientBase::initialize(Poco::Util::Application & self) void ClientBase::initialize(Poco::Util::Application & self)
{ {
Poco::Util::Application::initialize(self); Poco::Util::Application::initialize(self);
@ -755,8 +744,8 @@ void ClientBase::init(int argc, char ** argv)
Poco::Logger::root().setLevel(options["log-level"].as<std::string>()); Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
processOptions(options_description, options, external_tables_arguments); processOptions(options_description, options, external_tables_arguments);
argsToConfig(common_arguments, config(), 100); argsToConfig(common_arguments, config(), 100);
if (supportPasswordOption()) if (supportPasswordOption())
clearPasswordFromCommandLine(argc, argv); clearPasswordFromCommandLine(argc, argv);
} }

View File

@ -29,15 +29,16 @@ class ClientBase : public Poco::Util::Application
public: public:
using Arguments = std::vector<String>; using Arguments = std::vector<String>;
int main(const std::vector<String> & /*args*/) override;
void initialize(Poco::Util::Application & self) override; void initialize(Poco::Util::Application & self) override;
int mainImpl(); /// Read args, process options, add args to config.
void init(int argc, char ** argv); void init(int argc, char ** argv);
int main(const std::vector<String> & /*args*/) override;
protected: protected:
virtual int childMainImpl() = 0;
bool processMultiQuery(const String & all_queries_text); bool processMultiQuery(const String & all_queries_text);
bool processQueryText(const String & text); bool processQueryText(const String & text);
@ -70,14 +71,10 @@ protected:
virtual void reconnectIfNeeded() {} virtual void reconnectIfNeeded() {}
virtual bool isInteractive() = 0;
virtual void processMainImplException(const Exception & e) = 0; virtual void processMainImplException(const Exception & e) = 0;
virtual void initializeChild() = 0; virtual void initializeChild() = 0;
virtual int childMainImpl() = 0;
virtual void readArguments(int argc, char ** argv, virtual void readArguments(int argc, char ** argv,
Arguments & common_arguments, std::vector<Arguments> &) = 0; Arguments & common_arguments, std::vector<Arguments> &) = 0;
@ -106,7 +103,7 @@ protected:
virtual bool processWithFuzzing(const String &) { return true; } virtual bool processWithFuzzing(const String &) { return true; }
/// Process single file from non-interactive mode. /// Process single file from non-interactive mode.
virtual bool processFile(const String & file) = 0; virtual bool processMultiQueryFromFile(const String & file) = 0;
private: private:
static void clearTerminal(); static void clearTerminal();
@ -197,6 +194,7 @@ private:
NameSet exit_strings{"exit", "quit", "logout", "учше", "йгше", "дщпщге", "exit;", "quit;", "logout;", "учшеж", NameSet exit_strings{"exit", "quit", "logout", "учше", "йгше", "дщпщге", "exit;", "quit;", "logout;", "учшеж",
"йгшеж", "дщпщгеж", "q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй"}; "йгшеж", "дщпщгеж", "q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй"};
int mainImpl();
}; };
} }