From 8f77855981b80f142ce2997b7cf6761cd64f7e79 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sat, 4 Sep 2021 21:19:01 +0300 Subject: [PATCH] Some review fixes --- programs/client/Client.cpp | 152 +++++++++++++++------------- programs/client/Client.h | 9 +- programs/local/LocalServer.cpp | 30 ++++-- programs/local/LocalServer.h | 12 +-- src/Client/ClientBase.cpp | 38 ++----- src/Client/ClientBase.h | 87 ++++++++-------- src/Client/ConnectionParameters.cpp | 2 +- src/Client/LocalConnection.cpp | 38 +++---- src/Client/LocalConnection.h | 6 +- 9 files changed, 186 insertions(+), 188 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index f25a20d5e36..bcdd574206f 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -116,7 +117,7 @@ void Client::processError(const String & query) const } -void Client::processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) +void Client::executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) { client_exception.reset(); server_exception.reset(); @@ -190,7 +191,7 @@ void Client::processSingleQuery(const String & query_to_execute, ASTPtr parsed_q } -bool Client::processMultiQuery(const String & all_queries_text) +bool Client::executeMultiQuery(const String & all_queries_text) { // It makes sense not to base any control flow on this, so that it is // the same in tests and in normal usage. The only difference is that in @@ -471,91 +472,100 @@ void Client::loadSuggestionData(Suggest & suggest) } -int Client::mainImpl() +int Client::main(const std::vector & /*args*/) +try { - try + UseSSL use_ssl; + MainThreadStatus::getInstance(); + + std::cout << std::fixed << std::setprecision(3); + std::cerr << std::fixed << std::setprecision(3); + + /// Limit on total memory usage + size_t max_client_memory_usage = config().getInt64("max_memory_usage_in_client", 0 /*default value*/); + + if (max_client_memory_usage != 0) { - MainThreadStatus::getInstance(); + total_memory_tracker.setHardLimit(max_client_memory_usage); + total_memory_tracker.setDescription("(total)"); + total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); + } - /// Limit on total memory usage - size_t max_client_memory_usage = config().getInt64("max_memory_usage_in_client", 0 /*default value*/); + registerFormats(); + registerFunctions(); + registerAggregateFunctions(); - if (max_client_memory_usage != 0) + processConfig(); + + if (is_interactive) + { + clearTerminal(); + showClientVersion(); + } + + connect(); + + if (is_interactive) + { + /// Load Warnings at the beginning of connection + if (!config().has("no-warnings")) { - total_memory_tracker.setHardLimit(max_client_memory_usage); - total_memory_tracker.setDescription("(total)"); - total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); - } - - registerFormats(); - registerFunctions(); - registerAggregateFunctions(); - - processConfig(); - connect(); - - if (is_interactive) - { - /// Load Warnings at the beginning of connection - if (!config().has("no-warnings")) + try { - try + std::vector messages = loadWarningMessages(); + if (!messages.empty()) { - std::vector messages = loadWarningMessages(); - if (!messages.empty()) - { - std::cout << "Warnings:" << std::endl; - for (const auto & message : messages) - std::cout << " * " << message << std::endl; - std::cout << std::endl; - } - } - catch (...) - { - /// Ignore exception + std::cout << "Warnings:" << std::endl; + for (const auto & message : messages) + std::cout << " * " << message << std::endl; + std::cout << std::endl; } } - - runInteractive(); + catch (...) + { + /// Ignore exception + } } - else + + runInteractive(); + } + else + { + connection->setDefaultDatabase(connection_parameters.default_database); + + runNonInteractive(); + + // If exception code isn't zero, we should return non-zero return + // code anyway. + const auto * exception = server_exception ? server_exception.get() : client_exception.get(); + + if (exception) { - connection->setDefaultDatabase(connection_parameters.default_database); - - runNonInteractive(); - - // If exception code isn't zero, we should return non-zero return - // code anyway. - const auto * exception = server_exception ? server_exception.get() : client_exception.get(); - - if (exception) - { - return exception->code() != 0 ? exception->code() : -1; - } - - if (have_error) - { - // Shouldn't be set without an exception, but check it just in - // case so that at least we don't lose an error. - return -1; - } + return exception->code() != 0 ? exception->code() : -1; + } + + if (have_error) + { + // Shouldn't be set without an exception, but check it just in + // case so that at least we don't lose an error. + return -1; } - } - catch (const Exception & e) - { - bool print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR; - std::cerr << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl; - /// If exception code isn't zero, we should return non-zero return code anyway. - return e.code() ? e.code() : -1; - } - catch (...) - { - std::cerr << getCurrentExceptionMessage(false) << std::endl; - return getCurrentExceptionCode(); } return 0; } +catch (const Exception & e) +{ + bool print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR; + std::cerr << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl; + /// If exception code isn't zero, we should return non-zero return code anyway. + return e.code() ? e.code() : -1; +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(false) << std::endl; + return getCurrentExceptionCode(); +} void Client::connect() diff --git a/programs/client/Client.h b/programs/client/Client.h index a06b1ee9c3b..3fe5b3d80d0 100644 --- a/programs/client/Client.h +++ b/programs/client/Client.h @@ -11,18 +11,17 @@ class Client : public ClientBase public: Client() = default; void initialize(Poco::Util::Application & self) override; + int main(const std::vector & /*args*/) override; protected: - void processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override; - bool processMultiQuery(const String & all_queries_text) override; + void executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) override; + bool executeMultiQuery(const String & all_queries_text) override; bool processWithFuzzing(const String & full_query) override; + void connect() override; void processError(const String & query) const override; void loadSuggestionData(Suggest & suggest) override; - void connect() override; - int mainImpl() override; - void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector & external_tables_arguments) override; diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index a3ed5c655b5..5f4a2475218 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -83,7 +84,7 @@ void LocalServer::processError(const String & query) const } -void LocalServer::processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) +void LocalServer::executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) { /// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error, /// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only @@ -123,7 +124,7 @@ void LocalServer::processSingleQuery(const String & query_to_execute, ASTPtr par } -bool LocalServer::processMultiQuery(const String & all_queries_text) +bool LocalServer::executeMultiQuery(const String & all_queries_text) { bool echo_query = echo_queries; @@ -329,6 +330,13 @@ static void attachSystemTables(ContextPtr context) void LocalServer::cleanup() { + connection.reset(); + + global_context->shutdown(); + global_context.reset(); + + status.reset(); + // Delete the temporary directory if needed. if (temporary_directory_to_delete) { @@ -438,11 +446,15 @@ void LocalServer::connect() } -int LocalServer::mainImpl() +int LocalServer::main(const std::vector & /*args*/) try { + UseSSL use_ssl; ThreadStatus thread_status; + std::cout << std::fixed << std::setprecision(3); + std::cerr << std::fixed << std::setprecision(3); + /// We will terminate process on error static KillingErrorHandler error_handler; Poco::ErrorHandler::set(&error_handler); @@ -463,7 +475,10 @@ try if (is_interactive) { - std::cout << std::endl; + clearTerminal(); + showClientVersion(); + std::cerr << std::endl; + runInteractive(); } else @@ -476,14 +491,7 @@ try client_exception->rethrow(); } - connection.reset(); - - global_context->shutdown(); - global_context.reset(); - - status.reset(); cleanup(); - return Application::EXIT_OK; } catch (const Exception & e) diff --git a/programs/local/LocalServer.h b/programs/local/LocalServer.h index dcf633a4256..ecfb59c4d91 100644 --- a/programs/local/LocalServer.h +++ b/programs/local/LocalServer.h @@ -25,18 +25,16 @@ class LocalServer : public ClientBase, public Loggers { public: LocalServer() = default; - void initialize(Poco::Util::Application & self) override; + int main(const std::vector & /*args*/) override; protected: - void processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override; - bool processMultiQuery(const String & all_queries_text) override; - - void processError(const String & query) const override; - void loadSuggestionData(Suggest &) override; + void executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) override; + bool executeMultiQuery(const String & all_queries_text) override; void connect() override; - int mainImpl() override; + void processError(const String & query) const override; + void loadSuggestionData(Suggest &) override; String getQueryTextPrefix() override; void printHelpMessage(const OptionsDescription & options_description) override; diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index e00a9b87691..d7a6f859dfa 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -46,7 +46,6 @@ #include #include #include -#include #include #include @@ -261,7 +260,10 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query) if (!pager.empty()) { signal(SIGPIPE, SIG_IGN); - pager_cmd = ShellCommand::execute(pager, true); + + ShellCommand::Config config(pager); + config.pipe_stdin_only = true; + pager_cmd = ShellCommand::execute(config); out_buf = &pager_cmd->in; } else @@ -390,10 +392,8 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa } int retries_left = 10; - for (;;) + while (retries_left) { - assert(retries_left > 0); - try { connection->sendQuery( @@ -425,6 +425,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa } } } + assert(retries_left > 0); } @@ -892,7 +893,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin written_first_block = false; progress_indication.resetProgress(); - processSingleQuery(query_to_execute, parsed_query); + executeSignleQuery(query_to_execute, parsed_query); if (is_interactive) { @@ -1021,7 +1022,7 @@ bool ClientBase::processQueryText(const String & text) return true; } - return processMultiQuery(text); + return executeMultiQuery(text); } @@ -1167,7 +1168,7 @@ void ClientBase::runNonInteractive() readStringUntilEOF(queries_from_file, in); text += queries_from_file; - return processMultiQuery(text); + return executeMultiQuery(text); }; /// Read all queries into `text`. @@ -1207,7 +1208,7 @@ void ClientBase::runNonInteractive() } -static void clearTerminal() +void ClientBase::clearTerminal() { /// Clear from cursor until end of screen. /// It is needed if garbage is left in terminal. @@ -1218,29 +1219,12 @@ static void clearTerminal() } -static void showClientVersion() +void ClientBase::showClientVersion() { std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl; } -int ClientBase::main(const std::vector & /*args*/) -{ - UseSSL use_ssl; - - std::cout << std::fixed << std::setprecision(3); - std::cerr << std::fixed << std::setprecision(3); - - if (is_interactive) - { - clearTerminal(); - showClientVersion(); - } - - return mainImpl(); -} - - void ClientBase::init(int argc, char ** argv) { namespace po = boost::program_options; diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 1fbb67ca31d..1e2b1fb79a5 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -36,23 +36,20 @@ public: using Arguments = std::vector; void init(int argc, char ** argv); - int main(const std::vector & /*args*/) override; protected: void runInteractive(); void runNonInteractive(); - virtual void connect() = 0; - virtual int mainImpl() = 0; - virtual bool processWithFuzzing(const String &) { throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED); } - virtual void processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0; - virtual bool processMultiQuery(const String & all_queries_text) = 0; + virtual void executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0; + virtual bool executeMultiQuery(const String & all_queries_text) = 0; + virtual void connect() = 0; virtual void processError(const String & query) const = 0; virtual void loadSuggestionData(Suggest &) = 0; @@ -74,6 +71,9 @@ protected: /// For non-interactive multi-query mode get queries text prefix. virtual String getQueryTextPrefix() { return ""; } + void clearTerminal(); + void showClientVersion(); + using ProgramOptionsDescription = boost::program_options::options_description; using CommandLineOptions = boost::program_options::variables_map; @@ -134,37 +134,34 @@ protected: bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode. bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode. - String home_path; std::vector queries_files; /// If not empty, queries will be read from these files - String history_file; /// Path to a file containing command history. std::vector interleave_queries_files; /// If not empty, run queries from these files before processing every file from 'queries_files'. - bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string? - String prompt_by_server_display_name; - String server_display_name; - - ProgressIndication progress_indication; - bool need_render_progress = true; - bool written_first_block = false; - size_t processed_rows = 0; /// How many rows have been read or written. - bool stdin_is_a_tty = false; /// stdin is a terminal. bool stdout_is_a_tty = false; /// stdout is a terminal. uint64_t terminal_width = 0; + std::unique_ptr connection; + ConnectionParameters connection_parameters; + + String format; /// Query results output format. + bool is_default_format = true; /// false, if format is set in the config or command line. + size_t format_max_block_size = 0; /// Max block size for console output. + String insert_format; /// Format of INSERT data that is read from stdin in batch mode. + size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data. + size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second. + + bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string? + + /// We will format query_id in interactive mode in various ways, the default is just to print Query id: ... + std::vector> query_id_formats; + /// Settings specified via command line args Settings cmd_settings; SharedContextHolder shared_context; ContextMutablePtr global_context; - QueryFuzzer fuzzer; - int query_fuzzer_runs = 0; - - /// If the last query resulted in exception. `server_exception` or - /// `client_exception` must be set. - bool have_error = false; - /// Buffer that reads from stdin in batch mode. ReadBufferFromFileDescriptor std_in{STDIN_FILENO}; /// Console output. @@ -180,17 +177,20 @@ protected: String server_logs_file; BlockOutputStreamPtr logs_out_stream; - /// We will format query_id in interactive mode in various ways, the default is just to print Query id: ... - std::vector> query_id_formats; + String home_path; + String history_file; /// Path to a file containing command history. - /// Dictionary with query parameters for prepared statements. - NameToNameMap query_parameters; + String current_profile; - std::unique_ptr connection; - ConnectionParameters connection_parameters; + UInt64 server_revision = 0; + String server_version; + String prompt_by_server_display_name; + String server_display_name; - String format; /// Query results output format. - bool is_default_format = true; /// false, if format is set in the config or command line. + ProgressIndication progress_indication; + bool need_render_progress = true; + bool written_first_block = false; + size_t processed_rows = 0; /// How many rows have been read or written. /// The last exception that was received from the server. Is used for the /// return code in batch mode. @@ -198,20 +198,17 @@ protected: /// Likewise, the last exception that occurred on the client. std::unique_ptr client_exception; + /// If the last query resulted in exception. `server_exception` or + /// `client_exception` must be set. + bool have_error = false; + + std::list external_tables; /// External tables info. + NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements. + + QueryFuzzer fuzzer; + int query_fuzzer_runs = 0; + QueryProcessingStage::Enum query_processing_stage; - - /// External tables info. - std::list external_tables; - String current_profile; - - size_t format_max_block_size = 0; /// Max block size for console output. - String insert_format; /// Format of INSERT data that is read from stdin in batch mode. - size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data. - size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second. - - UInt64 server_revision = 0; - String server_version; - }; } diff --git a/src/Client/ConnectionParameters.cpp b/src/Client/ConnectionParameters.cpp index 94854fc1b5b..79dbe265d84 100644 --- a/src/Client/ConnectionParameters.cpp +++ b/src/Client/ConnectionParameters.cpp @@ -13,7 +13,7 @@ #include #if !defined(ARCADIA_BUILD) -#include +#include // Y_IGNORE #endif diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 94ddfdfe434..e62742f9d80 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -9,11 +9,12 @@ namespace ErrorCodes { extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int UNKNOWN_EXCEPTION; + extern const int NOT_IMPLEMENTED; } LocalConnection::LocalConnection(ContextPtr context_) : WithContext(context_) - , session(getContext(), ClientInfo::Interface::TCP) + , session(getContext(), ClientInfo::Interface::LOCAL) { /// Authenticate and create a context to execute queries. session.authenticate("default", "", Poco::Net::SocketAddress{}); @@ -76,6 +77,9 @@ void LocalConnection::sendQuery( if (state->io.out) { + /** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception, + * client receive exception before sending data. + */ state->io.out->writePrefix(); state->block = state->io.out->getHeader(); } @@ -288,10 +292,7 @@ bool LocalConnection::pollImpl() auto next_read = pullBlock(block); if (block) { - if (state->io.null_format) - state->block.emplace(); - else - state->block.emplace(block); + state->block.emplace(block); } else if (!next_read) { @@ -358,31 +359,34 @@ Packet LocalConnection::receivePacket() void LocalConnection::getServerVersion( const ConnectionTimeouts & /* timeouts */, String & /* name */, UInt64 & /* version_major */, UInt64 & /* version_minor */, - UInt64 & /* version_patch */, UInt64 & /* revision */) { } - -void LocalConnection::setDefaultDatabase(const String & name) + UInt64 & /* version_patch */, UInt64 & /* revision */) { - default_database = name; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); +} + +void LocalConnection::setDefaultDatabase(const String &) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &) { - return server_revision; -} - -const String & LocalConnection::getDescription() const -{ - return description; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } const String & LocalConnection::getServerTimezone(const ConnectionTimeouts &) { - return server_timezone; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } const String & LocalConnection::getServerDisplayName(const ConnectionTimeouts &) { - return server_display_name; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); +} + +void LocalConnection::sendExternalTablesData(ExternalTablesData &) +{ + /// Do nothing. } } diff --git a/src/Client/LocalConnection.h b/src/Client/LocalConnection.h index c10bc09aa56..d72e5713ded 100644 --- a/src/Client/LocalConnection.h +++ b/src/Client/LocalConnection.h @@ -35,7 +35,6 @@ struct LocalQueryState /// Is request cancelled bool is_cancelled = false; - /// Is query finished == !has_pending_data bool is_finished = false; bool sent_totals = false; @@ -66,11 +65,10 @@ public: UInt64 & revision) override; UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override; - const String & getServerTimezone(const ConnectionTimeouts & timeouts) override; const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override; - const String & getDescription() const override; + const String & getDescription() const override { return description; } void sendQuery( const ConnectionTimeouts & timeouts, @@ -85,7 +83,7 @@ public: void sendData(const Block &, const String &, bool) override; - void sendExternalTablesData(ExternalTablesData &) override {} + void sendExternalTablesData(ExternalTablesData &) override; bool poll(size_t timeout_microseconds = 0) override;