Refactoring

This commit is contained in:
kssenii 2021-07-23 23:56:47 +03:00
parent 8c05e4f038
commit accd6d5c0f
6 changed files with 181 additions and 265 deletions

View File

@ -14,51 +14,40 @@
#include <Poco/String.h>
#include <filesystem>
#include "Client.h"
#include <common/argsToConfig.h>
#include <common/find_symbols.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
#include "Client.h"
#include <Columns/ColumnString.h>
#include <Poco/Util/Application.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Columns/ColumnString.h>
#include <common/find_symbols.h>
#include <common/LineReader.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Exception.h>
#include <Common/UnicodeBar.h>
#include <Common/formatReadable.h>
#include <Common/NetException.h>
#include <Common/Throttler.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/PODArray.h>
#include <common/argsToConfig.h>
#include <Common/TerminalSize.h>
#include <Common/Config/configReadClient.h>
#include <Common/InterruptListener.h>
#include <Client/Connection.h>
#include <Core/QueryProcessingStage.h>
#include <Client/Connection.h>
#include <Columns/ColumnString.h>
#include <Poco/Util/Application.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/UseSSL.h>
#include <IO/WriteBufferFromOStream.h>
#include <DataStreams/InternalTextLogsRowOutputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
@ -68,15 +57,10 @@
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
@ -84,8 +68,6 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Formats/registerFormats.h>
#include <Formats/FormatFactory.h>
#include <Common/Config/configReadClient.h>
#include <Storages/ColumnsDescription.h>
#ifndef __clang__
#pragma GCC optimize("-fno-var-tracking-assignments")
@ -93,11 +75,11 @@
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int NO_DATA_TO_INSERT;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_PACKET_FROM_SERVER;
@ -184,8 +166,10 @@ std::vector<String> Client::loadWarningMessages()
}
void Client::initializeChild()
void Client::initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
const char * home_path_cstr = getenv("HOME");
if (home_path_cstr)
home_path = home_path_cstr;
@ -209,13 +193,6 @@ void Client::initializeChild()
}
void Client::processMainImplException(const Exception & e)
{
bool print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR;
std::cerr << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
}
void Client::loadSuggestionDataIfPossible()
{
if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false))
@ -226,36 +203,7 @@ void Client::loadSuggestionDataIfPossible()
}
bool Client::processQueryFromInteractive(const String & input)
{
try
{
return processQueryText(input);
}
catch (const Exception & e)
{
/// We don't need to handle the test hints in the interactive mode.
bool print_stack_trace = config().getBool("stacktrace", false);
std::cerr << "Exception on client:" << std::endl << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
client_exception = std::make_unique<Exception>(e);
}
if (client_exception)
{
/// client_exception may have been set above or elsewhere.
/// Client-side exception during query execution can result in the loss of
/// sync in the connection protocol.
/// So we reconnect and allow to enter the next query.
connect();
}
/// Continue processing queries.
return true;
}
int Client::childMainImpl()
int Client::mainImpl()
{
registerFormats();
registerFunctions();
@ -289,19 +237,17 @@ int Client::childMainImpl()
}
else
{
auto query_id = config().getString("query_id", "");
if (!query_id.empty())
global_context->setCurrentQueryId(query_id);
runNonInteractive();
// If exception code isn't zero, we should return non-zero return
// code anyway.
const auto * exception = server_exception ? server_exception.get() : client_exception.get();
if (exception)
{
return exception->code() != 0 ? exception->code() : -1;
}
if (have_error)
{
// Shouldn't be set without an exception, but check it just in
@ -1884,6 +1830,10 @@ void Client::processConfig()
need_render_progress = config().getBool("progress", false);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);
auto query_id = config().getString("query_id", "");
if (!query_id.empty())
global_context->setCurrentQueryId(query_id);
}
if (config().has("multiquery"))

View File

@ -12,8 +12,10 @@ class Client : public ClientBase
public:
Client() = default;
void initialize(Poco::Util::Application & self) override;
protected:
int childMainImpl() override;
int mainImpl() override;
bool supportPasswordOption() const override { return true; }
@ -36,14 +38,13 @@ protected:
std::vector<String> loadWarningMessages();
void initializeChild() override;
void processMainImplException(const Exception & e) override;
void loadSuggestionDataIfPossible() override;
bool processQueryFromInteractive(const String & input) override;
bool checkErrorMatchesHints(const TestHint & test_hint, bool had_error) override;
void reportQueryError() const override;
bool processWithFuzzing(const String & text) override;
void executeParsedQueryPrefix() override;
void executeParsedQueryImpl() override;
void executeParsedQuerySuffix() override;
@ -81,8 +82,6 @@ private:
ConnectionParameters connection_parameters;
void nonInteractive();
void connect();
void printChangedSettings() const;
void sendExternalTables();

View File

@ -59,10 +59,10 @@ namespace ErrorCodes
}
LocalServer::LocalServer() = default;
void LocalServer::initializeChild()
void LocalServer::initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
/// Load config files if exists
if (config().has("config-file") || fs::exists("config.xml"))
{
@ -126,7 +126,7 @@ void LocalServer::tryInitPath()
parent_folder = std::filesystem::temp_directory_path();
}
catch (const std::filesystem::filesystem_error& e)
catch (const fs::filesystem_error& e)
{
// tmp folder don't exists? misconfiguration? chroot?
LOG_DEBUG(log, "Can not get temporary folder: {}", e.what());
@ -192,26 +192,6 @@ void LocalServer::cleanup()
}
void LocalServer::processMainImplException(const Exception &)
{
try
{
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
}
void LocalServer::reportQueryError() const
{
}
std::string LocalServer::getInitialCreateTableQuery()
{
if (!config().has("table-structure"))
@ -270,37 +250,6 @@ void LocalServer::executeParsedQueryImpl()
}
void LocalServer::processQueries()
{
String initial_create_query = getInitialCreateTableQuery();
String queries_str = initial_create_query;
if (config().has("query"))
queries_str += config().getRawString("query");
else
{
String queries_from_file;
ReadBufferFromFile in(config().getString("queries-file"));
readStringUntilEOF(queries_from_file, in);
queries_str += queries_from_file;
}
const auto & settings = global_context->getSettingsRef();
std::vector<String> queries;
auto parse_res = splitMultipartQuery(queries_str, queries, settings.max_query_size, settings.max_parser_depth);
if (!parse_res.second)
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
for (const auto & query : queries)
prepareAndExecuteQuery(query);
if (exception)
std::rethrow_exception(exception);
}
static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data)
{
std::stringstream ss{std::string{xml_data}}; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@ -353,22 +302,22 @@ void LocalServer::setupUsers()
}
int LocalServer::childMainImpl()
int LocalServer::mainImpl()
{
ThreadStatus thread_status;
/// Prompt may contain the following substitutions in a form of {name}.
std::map<String, String> prompt_substitutions{{"display_name", server_display_name}};
/// Quite suboptimal.
for (const auto & [key, value] : prompt_substitutions)
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
if (is_interactive)
{
std::map<String, String> prompt_substitutions{{"display_name", server_display_name}};
for (const auto & [key, value] : prompt_substitutions)
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
}
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
/// Don't initialize DateLUT
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
@ -380,14 +329,12 @@ int LocalServer::childMainImpl()
/// we can't mutate global_context (can lead to races, as it was already passed to some background threads)
/// so we can't reuse it safely as a query context and need a copy here
query_context = Context::createCopy(global_context);
query_context->makeSessionContext();
query_context->makeQueryContext();
query_context->setUser("default", "", Poco::Net::SocketAddress{});
query_context->setCurrentQueryId("");
applyCmdSettings(query_context);
applyCmdSettings(query_context);
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(query_context);
@ -406,13 +353,9 @@ int LocalServer::childMainImpl()
}
if (is_interactive)
{
runInteractive();
}
else
{
runNonInteractive();
}
global_context->shutdown();
global_context.reset();

View File

@ -23,7 +23,9 @@ namespace DB
class LocalServer : public ClientBase, public Loggers
{
public:
LocalServer();
LocalServer() = default;
void initialize(Poco::Util::Application & self) override;
~LocalServer() override
{
@ -31,6 +33,55 @@ public:
global_context->shutdown(); /// required for properly exception handling
}
protected:
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> &) override;
void processConfig() override;
int mainImpl() override;
void shutdown() override
{
try
{
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void executeParsedQueryImpl() override;
void reportQueryError() const override {}
void printHelpMessage(const OptionsDescription & options_description) override;
bool supportPasswordOption() const override { return false; }
bool processMultiQueryFromFile(const String & file) override
{
auto text = getInitialCreateTableQuery();
String queries_from_file;
ReadBufferFromFile in(file);
readStringUntilEOF(queries_from_file, in);
text += queries_from_file;
return processMultiQuery(text);
}
void checkExceptions() override
{
if (exception)
std::rethrow_exception(exception);
}
private:
/** Composes CREATE subquery based on passed arguments (--structure --file --table and --input-format)
* This query will be executed first, before queries passed through --query argument
@ -50,46 +101,6 @@ private:
void cleanup();
protected:
void processMainImplException(const Exception & e) override;
void initializeChild() override;
int childMainImpl() override;
bool processQueryFromInteractive(const String & input) override
{
return processQueryText(input);
}
void executeParsedQueryImpl() override;
void reportQueryError() const override;
void printHelpMessage(const OptionsDescription & options_description) override;
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> &) override;
void processConfig() override;
bool supportPasswordOption() const override { return false; }
bool processMultiQueryFromFile(const String & file) override
{
auto text = getInitialCreateTableQuery();
String queries_from_file;
ReadBufferFromFile in(file);
readStringUntilEOF(queries_from_file, in);
text += queries_from_file;
return processMultiQuery(text);
}
private:
ContextMutablePtr query_context;
std::optional<StatusFile> status;

View File

@ -49,6 +49,7 @@ namespace ErrorCodes
{
extern const int UNRECOGNIZED_ARGUMENTS;
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
@ -572,8 +573,28 @@ void ClientBase::runInteractive()
has_vertical_output_suffix = true;
}
if (!processQueryFromInteractive(input))
break;
try
{
if (!processQueryText(input))
break;
}
catch (const Exception & e)
{
/// We don't need to handle the test hints in the interactive mode.
bool print_stack_trace = config().getBool("stacktrace", false);
std::cerr << "Exception on client:" << std::endl << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
client_exception = std::make_unique<Exception>(e);
}
if (client_exception)
{
/// client_exception may have been set above or elsewhere.
/// Client-side exception during query execution can result in the loss of
/// sync in the connection protocol.
/// So we reconnect and allow to enter the next query.
reconnectIfNeeded();
}
}
while (true);
@ -622,10 +643,12 @@ void ClientBase::runNonInteractive()
processWithFuzzing(text);
else
processQueryText(text);
checkExceptions();
}
void ClientBase::clearTerminal()
static void clearTerminal()
{
/// Clear from cursor until end of screen.
/// It is needed if garbage is left in terminal.
@ -642,40 +665,31 @@ static void showClientVersion()
}
int ClientBase::mainImpl()
{
UseSSL use_ssl;
processConfig();
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
if (is_interactive)
{
clearTerminal();
showClientVersion();
}
return childMainImpl();
}
void ClientBase::initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
initializeChild();
}
int ClientBase::main(const std::vector<std::string> & /*args*/)
{
try
{
UseSSL use_ssl;
processConfig();
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
if (is_interactive)
{
clearTerminal();
showClientVersion();
}
return mainImpl();
}
catch (const Exception & e)
{
processMainImplException(e);
shutdown();
bool print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR;
std::cerr << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;

View File

@ -29,51 +29,16 @@ class ClientBase : public Poco::Util::Application
public:
using Arguments = std::vector<String>;
void initialize(Poco::Util::Application & self) override;
/// Read args, process options, add args to config.
void init(int argc, char ** argv);
int main(const std::vector<String> & /*args*/) override;
protected:
virtual int childMainImpl() = 0;
/// Prepare for and start either interactive or non-interactive mode.
virtual int mainImpl() = 0;
bool processMultiQuery(const String & all_queries_text);
bool processQueryText(const String & text);
void runInteractive();
void runNonInteractive();
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
void resetOutput();
virtual void executeParsedQueryPrefix() {}
virtual void executeParsedQueryImpl() = 0;
virtual void executeParsedQuerySuffix() {}
void prepareAndExecuteQuery(const String & query);
void executeParsedQuery(std::optional<bool> echo_query_ = {}, bool report_error = true);
virtual bool processQueryFromInteractive(const String & input) = 0;
virtual void reportQueryError() const = 0;
virtual void loadSuggestionDataIfPossible() {}
virtual bool checkErrorMatchesHints(const TestHint & /* test_hint */, bool /* had_error */) { return false; }
virtual void reconnectIfNeeded() {}
virtual void processMainImplException(const Exception & e) = 0;
virtual void initializeChild() = 0;
/// If some work is required on destroying.
virtual void shutdown() {}
virtual void readArguments(int argc, char ** argv,
Arguments & common_arguments, std::vector<Arguments> &) = 0;
@ -94,20 +59,52 @@ protected:
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0;
virtual void processConfig() = 0;
void runInteractive();
void runNonInteractive();
bool processMultiQuery(const String & all_queries_text);
/// Process single file (with queries) from non-interactive mode.
virtual bool processMultiQueryFromFile(const String & file) = 0;
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
void prepareAndExecuteQuery(const String & query);
void executeParsedQuery(std::optional<bool> echo_query_ = {}, bool report_error = true);
virtual void executeParsedQueryPrefix() {}
virtual void executeParsedQueryImpl() = 0;
virtual void executeParsedQuerySuffix() {}
void resetOutput();
virtual void checkExceptions() {}
virtual void reportQueryError() const = 0;
virtual void loadSuggestionDataIfPossible() {}
virtual bool checkErrorMatchesHints(const TestHint & /* test_hint */, bool /* had_error */) { return false; }
virtual void reconnectIfNeeded() {}
virtual bool supportPasswordOption() const = 0;
virtual bool splitQueries() const { return false; }
virtual bool processWithFuzzing(const String &) { return true; }
/// Process single file from non-interactive mode.
virtual bool processMultiQueryFromFile(const String & file) = 0;
private:
static void clearTerminal();
inline String prompt() const
{
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
@ -115,6 +112,9 @@ private:
void outputQueryInfo(bool echo_query_);
/// Process query text (multiquery or single query) accroding to options.
bool processQueryText(const String & text);
protected:
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
@ -194,7 +194,6 @@ private:
NameSet exit_strings{"exit", "quit", "logout", "учше", "йгше", "дщпщге", "exit;", "quit;", "logout;", "учшеж",
"йгшеж", "дщпщгеж", "q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй"};
int mainImpl();
};
}