#include "TestHint.h" #include "ConnectionParameters.h" #include "QueryFuzzer.h" #include "Suggest.h" #if USE_REPLXX # include #elif defined(USE_READLINE) && USE_READLINE # include #else # include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if !defined(ARCADIA_BUILD) # include #endif #ifndef __clang__ #pragma GCC optimize("-fno-var-tracking-assignments") #endif /// http://en.wikipedia.org/wiki/ANSI_escape_code #define CLEAR_TO_END_OF_LINE "\033[K" namespace DB { namespace ErrorCodes { extern const int NETWORK_ERROR; extern const int NO_DATA_TO_INSERT; extern const int BAD_ARGUMENTS; extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int UNEXPECTED_PACKET_FROM_SERVER; extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED; extern const int INVALID_USAGE_OF_INPUT; extern const int DEADLOCK_AVOIDED; extern const int UNRECOGNIZED_ARGUMENTS; } class Client : public Poco::Util::Application { public: Client() = default; private: using StringSet = std::unordered_set; StringSet exit_strings { "exit", "quit", "logout", "учше", "йгше", "дщпщге", "exit;", "quit;", "logout;", "учшеж", "йгшеж", "дщпщгеж", "q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй" }; bool is_interactive = true; /// Use either interactive line editing interface or batch mode. bool need_render_progress = true; /// Render query execution progress. bool echo_queries = false; /// Print queries before execution in batch mode. bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode. bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode. bool stdin_is_a_tty = false; /// stdin is a terminal. bool stdout_is_a_tty = false; /// stdout is a terminal. std::unique_ptr connection; /// Connection to DB. String query_id; /// Current query_id. String full_query; /// Current query as it was given to the client. // Current query as it will be sent to the server. It may differ from the // full query for INSERT queries, for which the data that follows the query // is stripped and sent separately. String query_to_send; String format; /// Query results output format. bool is_default_format = true; /// false, if format is set in the config or command line. size_t format_max_block_size = 0; /// Max block size for console output. String insert_format; /// Format of INSERT data that is read from stdin in batch mode. size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data. size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second. bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string? SharedContextHolder shared_context = Context::createShared(); Context context = Context::createGlobal(shared_context.get()); /// Buffer that reads from stdin in batch mode. ReadBufferFromFileDescriptor std_in {STDIN_FILENO}; /// Console output. WriteBufferFromFileDescriptor std_out {STDOUT_FILENO}; std::unique_ptr pager_cmd; /// The user can specify to redirect query output to a file. std::optional out_file_buf; BlockOutputStreamPtr block_out_stream; /// The user could specify special file for server logs (stderr by default) std::unique_ptr out_logs_buf; String server_logs_file; BlockOutputStreamPtr logs_out_stream; String home_path; String current_profile; String prompt_by_server_display_name; /// Path to a file containing command history. String history_file; /// How many rows have been read or written. size_t processed_rows = 0; /// Parsed query. Is used to determine some settings (e.g. format, output file). ASTPtr parsed_query; /// The last exception that was received from the server. Is used for the return code in batch mode. std::unique_ptr last_exception_received_from_server; /// If the last query resulted in exception. bool received_exception_from_server = false; int expected_server_error = 0; int expected_client_error = 0; int actual_server_error = 0; int actual_client_error = 0; UInt64 server_revision = 0; String server_version; String server_display_name; Stopwatch watch; /// The server periodically sends information about how much data was read since last time. Progress progress; bool show_progress_bar = false; size_t written_progress_chars = 0; bool written_first_block = false; /// External tables info. std::list external_tables; /// Dictionary with query parameters for prepared statements. NameToNameMap query_parameters; ConnectionParameters connection_parameters; QueryFuzzer fuzzer; int query_fuzzer_runs = 0; void initialize(Poco::Util::Application & self) override { Poco::Util::Application::initialize(self); const char * home_path_cstr = getenv("HOME"); if (home_path_cstr) home_path = home_path_cstr; configReadClient(config(), home_path); context.setApplicationType(Context::ApplicationType::CLIENT); context.setQueryParameters(query_parameters); /// settings and limits could be specified in config file, but passed settings has higher priority for (const auto & setting : context.getSettingsRef().allUnchanged()) { const auto & name = setting.getName(); if (config().has(name)) context.setSetting(name, config().getString(name)); } /// Set path for format schema files if (config().has("format_schema_path")) context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString()); } int main(const std::vector & /*args*/) override { try { return mainImpl(); } catch (const Exception & e) { bool print_stack_trace = config().getBool("stacktrace", false); std::string text = e.displayText(); /** If exception is received from server, then stack trace is embedded in message. * If exception is thrown on client, then stack trace is in separate field. */ auto embedded_stack_trace_pos = text.find("Stack trace"); if (std::string::npos != embedded_stack_trace_pos && !print_stack_trace) text.resize(embedded_stack_trace_pos); std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl; /// Don't print the stack trace on the client if it was logged on the server. /// Also don't print the stack trace in case of network errors. if (print_stack_trace && e.code() != ErrorCodes::NETWORK_ERROR && std::string::npos == embedded_stack_trace_pos) { std::cerr << "Stack trace:" << std::endl << e.getStackTraceString(); } /// If exception code isn't zero, we should return non-zero return code anyway. return e.code() ? e.code() : -1; } catch (...) { std::cerr << getCurrentExceptionMessage(false) << std::endl; return getCurrentExceptionCode(); } } /// Should we celebrate a bit? static bool isNewYearMode() { time_t current_time = time(nullptr); /// It's bad to be intrusive. if (current_time % 3 != 0) return false; LocalDate now(current_time); return (now.month() == 12 && now.day() >= 20) || (now.month() == 1 && now.day() <= 5); } static bool isChineseNewYearMode(const String & local_tz) { /// Days of Dec. 20 in Chinese calendar starting from year 2019 to year 2105 static constexpr UInt16 chineseNewYearIndicators[] = {18275, 18659, 19014, 19368, 19752, 20107, 20491, 20845, 21199, 21583, 21937, 22292, 22676, 23030, 23414, 23768, 24122, 24506, 24860, 25215, 25599, 25954, 26308, 26692, 27046, 27430, 27784, 28138, 28522, 28877, 29232, 29616, 29970, 30354, 30708, 31062, 31446, 31800, 32155, 32539, 32894, 33248, 33632, 33986, 34369, 34724, 35078, 35462, 35817, 36171, 36555, 36909, 37293, 37647, 38002, 38386, 38740, 39095, 39479, 39833, 40187, 40571, 40925, 41309, 41664, 42018, 42402, 42757, 43111, 43495, 43849, 44233, 44587, 44942, 45326, 45680, 46035, 46418, 46772, 47126, 47510, 47865, 48249, 48604, 48958, 49342}; /// All time zone names are acquired from https://www.iana.org/time-zones static constexpr const char * chineseNewYearTimeZoneIndicators[] = { /// Time zones celebrating Chinese new year. "Asia/Shanghai", "Asia/Chongqing", "Asia/Harbin", "Asia/Urumqi", "Asia/Hong_Kong", "Asia/Chungking", "Asia/Macao", "Asia/Macau", "Asia/Taipei", "Asia/Singapore", /// Time zones celebrating Chinese new year but with different festival names. Let's not print the message for now. // "Asia/Brunei", // "Asia/Ho_Chi_Minh", // "Asia/Hovd", // "Asia/Jakarta", // "Asia/Jayapura", // "Asia/Kashgar", // "Asia/Kuala_Lumpur", // "Asia/Kuching", // "Asia/Makassar", // "Asia/Pontianak", // "Asia/Pyongyang", // "Asia/Saigon", // "Asia/Seoul", // "Asia/Ujung_Pandang", // "Asia/Ulaanbaatar", // "Asia/Ulan_Bator", }; static constexpr size_t M = sizeof(chineseNewYearTimeZoneIndicators) / sizeof(chineseNewYearTimeZoneIndicators[0]); time_t current_time = time(nullptr); if (chineseNewYearTimeZoneIndicators + M == std::find_if(chineseNewYearTimeZoneIndicators, chineseNewYearTimeZoneIndicators + M, [&local_tz](const char * tz) { return tz == local_tz; })) return false; /// It's bad to be intrusive. if (current_time % 3 != 0) return false; auto days = DateLUT::instance().toDayNum(current_time).toUnderType(); for (auto d : chineseNewYearIndicators) { /// Let's celebrate until Lantern Festival if (d <= days && d + 25u >= days) return true; else if (d > days) return false; } return false; } #if USE_REPLXX static void highlight(const String & query, std::vector & colors) { using namespace replxx; static const std::unordered_map token_to_color = { { TokenType::Whitespace, Replxx::Color::DEFAULT }, { TokenType::Comment, Replxx::Color::GRAY }, { TokenType::BareWord, Replxx::Color::DEFAULT }, { TokenType::Number, Replxx::Color::GREEN }, { TokenType::StringLiteral, Replxx::Color::CYAN }, { TokenType::QuotedIdentifier, Replxx::Color::MAGENTA }, { TokenType::OpeningRoundBracket, Replxx::Color::BROWN }, { TokenType::ClosingRoundBracket, Replxx::Color::BROWN }, { TokenType::OpeningSquareBracket, Replxx::Color::BROWN }, { TokenType::ClosingSquareBracket, Replxx::Color::BROWN }, { TokenType::OpeningCurlyBrace, Replxx::Color::INTENSE }, { TokenType::ClosingCurlyBrace, Replxx::Color::INTENSE }, { TokenType::Comma, Replxx::Color::INTENSE }, { TokenType::Semicolon, Replxx::Color::INTENSE }, { TokenType::Dot, Replxx::Color::INTENSE }, { TokenType::Asterisk, Replxx::Color::INTENSE }, { TokenType::Plus, Replxx::Color::INTENSE }, { TokenType::Minus, Replxx::Color::INTENSE }, { TokenType::Slash, Replxx::Color::INTENSE }, { TokenType::Percent, Replxx::Color::INTENSE }, { TokenType::Arrow, Replxx::Color::INTENSE }, { TokenType::QuestionMark, Replxx::Color::INTENSE }, { TokenType::Colon, Replxx::Color::INTENSE }, { TokenType::Equals, Replxx::Color::INTENSE }, { TokenType::NotEquals, Replxx::Color::INTENSE }, { TokenType::Less, Replxx::Color::INTENSE }, { TokenType::Greater, Replxx::Color::INTENSE }, { TokenType::LessOrEquals, Replxx::Color::INTENSE }, { TokenType::GreaterOrEquals, Replxx::Color::INTENSE }, { TokenType::Concatenation, Replxx::Color::INTENSE }, { TokenType::At, Replxx::Color::INTENSE }, { TokenType::DoubleAt, Replxx::Color::MAGENTA }, { TokenType::EndOfStream, Replxx::Color::DEFAULT }, { TokenType::Error, Replxx::Color::RED }, { TokenType::ErrorMultilineCommentIsNotClosed, Replxx::Color::RED }, { TokenType::ErrorSingleQuoteIsNotClosed, Replxx::Color::RED }, { TokenType::ErrorDoubleQuoteIsNotClosed, Replxx::Color::RED }, { TokenType::ErrorSinglePipeMark, Replxx::Color::RED }, { TokenType::ErrorWrongNumber, Replxx::Color::RED }, { TokenType::ErrorMaxQuerySizeExceeded, Replxx::Color::RED } }; const Replxx::Color unknown_token_color = Replxx::Color::RED; Lexer lexer(query.data(), query.data() + query.size()); size_t pos = 0; for (Token token = lexer.nextToken(); !token.isEnd(); token = lexer.nextToken()) { size_t utf8_len = UTF8::countCodePoints(reinterpret_cast(token.begin), token.size()); for (size_t code_point_index = 0; code_point_index < utf8_len; ++code_point_index) { if (token_to_color.find(token.type) != token_to_color.end()) colors[pos + code_point_index] = token_to_color.at(token.type); else colors[pos + code_point_index] = unknown_token_color; } pos += utf8_len; } } #endif int mainImpl() { UseSSL use_ssl; registerFunctions(); registerAggregateFunctions(); /// Batch mode is enabled if one of the following is true: /// - -e (--query) command line option is present. /// The value of the option is used as the text of query (or of multiple queries). /// If stdin is not a terminal, INSERT data for the first query is read from it. /// - stdin is not a terminal. In this case queries are read from it. if (!stdin_is_a_tty || config().has("query")) is_interactive = false; std::cout << std::fixed << std::setprecision(3); std::cerr << std::fixed << std::setprecision(3); if (is_interactive) showClientVersion(); is_default_format = !config().has("vertical") && !config().has("format"); if (config().has("vertical")) format = config().getString("format", "Vertical"); else format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated"); format_max_block_size = config().getInt("format_max_block_size", context.getSettingsRef().max_block_size); insert_format = "Values"; /// Setting value from cmd arg overrides one from config if (context.getSettingsRef().max_insert_block_size.changed) insert_format_max_block_size = context.getSettingsRef().max_insert_block_size; else insert_format_max_block_size = config().getInt("insert_format_max_block_size", context.getSettingsRef().max_insert_block_size); if (!is_interactive) { need_render_progress = config().getBool("progress", false); echo_queries = config().getBool("echo", false); ignore_error = config().getBool("ignore-error", false); } ClientInfo & client_info = context.getClientInfo(); client_info.setInitialQuery(); client_info.quota_key = config().getString("quota_key", ""); connect(); /// Initialize DateLUT here to avoid counting time spent here as query execution time. const auto local_tz = DateLUT::instance().getTimeZone(); if (!context.getSettingsRef().use_client_time_zone) { const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts); if (!time_zone.empty()) { try { DateLUT::setDefaultTimezone(time_zone); } catch (...) { std::cerr << "Warning: could not switch to server time zone: " << time_zone << ", reason: " << getCurrentExceptionMessage(/* with_stacktrace = */ false) << std::endl << "Proceeding with local time zone." << std::endl << std::endl; } } else { std::cerr << "Warning: could not determine server time zone. " << "Proceeding with local time zone." << std::endl << std::endl; } } Strings keys; prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) "); config().keys("prompt_by_server_display_name", keys); for (const String & key : keys) { if (key != "default" && server_display_name.find(key) != std::string::npos) { prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name." + key); break; } } /// Prompt may contain escape sequences including \e[ or \x1b[ sequences to set terminal color. { String unescaped_prompt_by_server_display_name; ReadBufferFromString in(prompt_by_server_display_name); readEscapedString(unescaped_prompt_by_server_display_name, in); prompt_by_server_display_name = std::move(unescaped_prompt_by_server_display_name); } /// Prompt may contain the following substitutions in a form of {name}. std::map prompt_substitutions { {"host", connection_parameters.host}, {"port", toString(connection_parameters.port)}, {"user", connection_parameters.user}, {"display_name", server_display_name}, }; /// Quite suboptimal. for (const auto & [key, value]: prompt_substitutions) boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value); if (is_interactive) { if (!query_id.empty()) throw Exception("query_id could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS); if (print_time_to_stderr) throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS); if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false)) { /// Load suggestion data from the server. Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit")); } /// Load command history if present. if (config().has("history_file")) history_file = config().getString("history_file"); else { auto * history_file_from_env = getenv("CLICKHOUSE_HISTORY_FILE"); if (history_file_from_env) history_file = history_file_from_env; else if (!home_path.empty()) history_file = home_path + "/.clickhouse-client-history"; } if (!history_file.empty() && !Poco::File(history_file).exists()) Poco::File(history_file).createFile(); LineReader::Patterns query_extenders = {"\\"}; LineReader::Patterns query_delimiters = {";", "\\G"}; #if USE_REPLXX replxx::Replxx::highlighter_callback_t highlight_callback{}; if (config().getBool("highlight")) highlight_callback = highlight; ReplxxLineReader lr( Suggest::instance(), history_file, config().has("multiline"), query_extenders, query_delimiters, highlight_callback); #elif defined(USE_READLINE) && USE_READLINE ReadlineLineReader lr(Suggest::instance(), history_file, config().has("multiline"), query_extenders, query_delimiters); #else LineReader lr(history_file, config().has("multiline"), query_extenders, query_delimiters); #endif /// Enable bracketed-paste-mode only when multiquery is enabled and multiline is /// disabled, so that we are able to paste and execute multiline queries in a whole /// instead of erroring out, while be less intrusive. if (config().has("multiquery") && !config().has("multiline")) lr.enableBracketedPaste(); do { auto input = lr.readLine(prompt(), ":-] "); if (input.empty()) break; has_vertical_output_suffix = false; if (input.ends_with("\\G")) { input.resize(input.size() - 2); has_vertical_output_suffix = true; } try { if (!processQueryText(input)) break; } catch (const Exception & e) { actual_client_error = e.code(); if (!actual_client_error || actual_client_error != expected_client_error) { std::cerr << std::endl << "Exception on client:" << std::endl << "Code: " << e.code() << ". " << e.displayText() << std::endl; if (config().getBool("stacktrace", false)) std::cerr << "Stack trace:" << std::endl << e.getStackTraceString() << std::endl; std::cerr << std::endl; } /// Client-side exception during query execution can result in the loss of /// sync in the connection protocol. /// So we reconnect and allow to enter the next query. connect(); } } while (true); if (isNewYearMode()) std::cout << "Happy new year." << std::endl; else if (isChineseNewYearMode(local_tz)) std::cout << "Happy Chinese new year. 春节快乐!" << std::endl; else std::cout << "Bye." << std::endl; return 0; } else { query_id = config().getString("query_id", ""); if (query_fuzzer_runs) { nonInteractiveWithFuzzing(); } else { nonInteractive(); } /// If exception code isn't zero, we should return non-zero return code anyway. if (last_exception_received_from_server) return last_exception_received_from_server->code() != 0 ? last_exception_received_from_server->code() : -1; return 0; } } void connect() { connection_parameters = ConnectionParameters(config()); if (is_interactive) std::cout << "Connecting to " << (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at " : "") << connection_parameters.host << ":" << connection_parameters.port << (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl; connection = std::make_unique( connection_parameters.host, connection_parameters.port, connection_parameters.default_database, connection_parameters.user, connection_parameters.password, "", /* cluster */ "", /* cluster_secret */ "client", connection_parameters.compression, connection_parameters.security); String server_name; UInt64 server_version_major = 0; UInt64 server_version_minor = 0; UInt64 server_version_patch = 0; if (max_client_network_bandwidth) { ThrottlerPtr throttler = std::make_shared(max_client_network_bandwidth, 0, ""); connection->setThrottler(throttler); } connection->getServerVersion(connection_parameters.timeouts, server_name, server_version_major, server_version_minor, server_version_patch, server_revision); server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch); if (server_display_name = connection->getServerDisplayName(connection_parameters.timeouts); server_display_name.empty()) { server_display_name = config().getString("host", "localhost"); } if (is_interactive) { std::cout << "Connected to " << server_name << " server version " << server_version << " revision " << server_revision << "." << std::endl << std::endl; auto client_version_tuple = std::make_tuple(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH); auto server_version_tuple = std::make_tuple(server_version_major, server_version_minor, server_version_patch); if (client_version_tuple < server_version_tuple) { std::cout << "ClickHouse client version is older than ClickHouse server. " << "It may lack support for new features." << std::endl << std::endl; } else if (client_version_tuple > server_version_tuple) { std::cout << "ClickHouse server version is older than ClickHouse client. " << "It may indicate that the server is out of date and can be upgraded." << std::endl << std::endl; } } } inline String prompt() const { return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default")); } void nonInteractive() { String text; if (config().has("query")) text = config().getRawString("query"); /// Poco configuration should not process substitutions in form of ${...} inside query. else { /// If 'query' parameter is not set, read a query from stdin. /// The query is read entirely into memory (streaming is disabled). ReadBufferFromFileDescriptor in(STDIN_FILENO); readStringUntilEOF(text, in); } processQueryText(text); } void nonInteractiveWithFuzzing() { if (config().has("query")) { // Poco configuration should not process substitutions in form of // ${...} inside query processWithFuzzing(config().getRawString("query")); return; } // Try to stream the queries from stdin, without reading all of them // into memory. The interface of the parser does not support streaming, // in particular, it can't distinguish the end of partial input buffer // and the final end of input file. This means we have to try to split // the input into separate queries here. Two patterns of input are // especially interesting: // 1) multiline query: // select 1 // from system.numbers; // // 2) csv insert with in-place data: // insert into t format CSV 1;2 // // (1) means we can't split on new line, and (2) means we can't split on // semicolon. Solution: split on ';\n'. This sequence is frequent enough // in the SQL tests which are our principal input for fuzzing. Now we // have another interesting case: // 3) escaped semicolon followed by newline, e.g. // select '; // ' // // To handle (3), parse until we can, and read more data if the parser // complains. Hopefully this should be enough... ReadBufferFromFileDescriptor in(STDIN_FILENO); std::string text; while (!in.eof()) { // Read until separator. while (!in.eof()) { char * next_separator = find_first_symbols<';'>(in.position(), in.buffer().end()); if (next_separator < in.buffer().end()) { next_separator++; if (next_separator < in.buffer().end() && *next_separator == '\n') { // Found ';\n', append it to the query text and try to // parse. next_separator++; text.append(in.position(), next_separator - in.position()); in.position() = next_separator; break; } } // Didn't find the semicolon and reached the end of buffer. text.append(in.position(), next_separator - in.position()); in.position() = next_separator; if (text.size() > 1024 * 1024) { // We've read a lot of text and still haven't seen a separator. // Likely some pathological input, just fall through to prevent // too long loops. break; } } // Parse and execute what we've read. const auto * new_end = processWithFuzzing(text); if (new_end > &text[0]) { const auto rest_size = text.size() - (new_end - &text[0]); memcpy(&text[0], new_end, rest_size); text.resize(rest_size); } else { // We didn't read enough text to parse a query. Will read more. } // Ensure that we're still connected to the server. If the server died, // the reconnect is going to fail with an exception, and the fuzzer // will exit. The ping() would be the best match here, but it's // private, probably for a good reason that the protocol doesn't allow // pings at any possible moment. // Don't forget to reset the default database which might have changed. connection->setDefaultDatabase(""); connection->forceConnected(connection_parameters.timeouts); if (text.size() > 4 * 1024) { // Some pathological situation where the text is larger than 4kB // and we still cannot parse a single query in it. Abort. std::cerr << "Read too much text and still can't parse a query." " Aborting." << std::endl; exit(1); } } } bool processQueryText(const String & text) { if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; }))) return false; if (!config().has("multiquery")) { assert(!query_fuzzer_runs); processTextAsSingleQuery(text); return true; } if (query_fuzzer_runs) { processWithFuzzing(text); return true; } return processMultiQuery(text); } bool processMultiQuery(const String & all_queries_text) { const bool test_mode = config().has("testmode"); { /// disable logs if expects errors TestHint test_hint(test_mode, all_queries_text); if (test_hint.clientError() || test_hint.serverError()) processTextAsSingleQuery("SET send_logs_level = 'none'"); } /// Several queries separated by ';'. /// INSERT data is ended by the end of line, not ';'. /// An exception is VALUES format where we also support semicolon in /// addition to end of line. const char * this_query_begin = all_queries_text.data(); const char * all_queries_end = all_queries_text.data() + all_queries_text.size(); while (this_query_begin < all_queries_end) { // Use the token iterator to skip any whitespace, semicolons and // comments at the beginning of the query. An example from regression // tests: // insert into table t values ('invalid'); -- { serverError 469 } // select 1 // Here the test hint comment gets parsed as a part of second query. // We parse the `INSERT VALUES` up to the semicolon, and the rest // looks like a two-line query: // -- { serverError 469 } // select 1 // and we expect it to fail with error 469, but this hint is actually // for the previous query. Test hints should go after the query, so // we can fix this by skipping leading comments. Token iterator skips // comments and whitespace by itself, so we only have to check for // semicolons. // The code block is to limit visibility of `tokens` because we have // another such variable further down the code, and get warnings for // that. { Tokens tokens(this_query_begin, all_queries_end); IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); while (token_iterator->type == TokenType::Semicolon && token_iterator.isValid()) { ++token_iterator; } this_query_begin = token_iterator->begin; if (this_query_begin >= all_queries_end) { break; } } // Try to parse the query. const char * this_query_end = this_query_begin; try { parsed_query = parseQuery(this_query_end, all_queries_end, true); } catch (Exception & e) { if (!test_mode) throw; /// Try find test hint for syntax error const char * end_of_line = find_first_symbols<'\n'>(this_query_begin,all_queries_end); TestHint hint(true, String(this_query_end, end_of_line - this_query_end)); if (hint.serverError()) /// Syntax errors are considered as client errors throw; if (hint.clientError() != e.code()) { if (hint.clientError()) e.addMessage("\nExpected clinet error: " + std::to_string(hint.clientError())); throw; } /// It's expected syntax error, skip the line this_query_begin = end_of_line; continue; } if (!parsed_query) { if (ignore_error) { Tokens tokens(this_query_begin, all_queries_end); IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid()) ++token_iterator; this_query_begin = token_iterator->end; continue; } return true; } // INSERT queries may have the inserted data in the query text // that follow the query itself, e.g. "insert into t format CSV 1;2". // They need special handling. First of all, here we find where the // inserted data ends. In multy-query mode, it is delimited by a // newline. // The VALUES format needs even more handling -- we also allow the // data to be delimited by semicolon. This case is handled later by // the format parser itself. auto * insert_ast = parsed_query->as(); if (insert_ast && insert_ast->data) { this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end); insert_ast->end = this_query_end; query_to_send = all_queries_text.substr( this_query_begin - all_queries_text.data(), insert_ast->data - this_query_begin); } else { query_to_send = all_queries_text.substr( this_query_begin - all_queries_text.data(), this_query_end - this_query_begin); } // full_query is the query + inline INSERT data. full_query = all_queries_text.substr( this_query_begin - all_queries_text.data(), this_query_end - this_query_begin); // Look for the hint in the text of query + insert data, if any. // e.g. insert into t format CSV 'a' -- { serverError 123 }. TestHint test_hint(test_mode, full_query); expected_client_error = test_hint.clientError(); expected_server_error = test_hint.serverError(); try { processParsedSingleQuery(); if (insert_ast && insert_ast->data) { // For VALUES format: use the end of inline data as reported // by the format parser (it is saved in sendData()). This // allows us to handle queries like: // insert into t values (1); select 1 //, where the inline data is delimited by semicolon and not // by a newline. this_query_end = parsed_query->as()->end; } } catch (...) { last_exception_received_from_server = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); actual_client_error = last_exception_received_from_server->code(); if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error)) std::cerr << "Error on processing query: " << full_query << std::endl << last_exception_received_from_server->message(); received_exception_from_server = true; } if (!test_hint.checkActual(actual_server_error, actual_client_error, received_exception_from_server, last_exception_received_from_server)) connection->forceConnected(connection_parameters.timeouts); if (received_exception_from_server && !ignore_error) { if (is_interactive) break; else return false; } this_query_begin = this_query_end; } return true; } // Returns the last position we could parse. const char * processWithFuzzing(const String & text) { /// Several queries separated by ';'. /// INSERT data is ended by the end of line, not ';'. const char * begin = text.data(); const char * end = begin + text.size(); while (begin < end) { // Skip whitespace before the query while (isWhitespaceASCII(*begin) || *begin == ';') { ++begin; } const auto * this_query_begin = begin; ASTPtr orig_ast = parseQuery(begin, end, true); if (!orig_ast) { // Can't continue after a parsing error return begin; } auto * as_insert = orig_ast->as(); if (as_insert && as_insert->data) { // INSERT data is ended by newline as_insert->end = find_first_symbols<'\n'>(as_insert->data, end); begin = as_insert->end; } full_query = text.substr(this_query_begin - text.data(), begin - text.data()); // Don't repeat inserts, the tables grow too big. Also don't repeat // creates because first we run the unmodified query, it will succeed, // and the subsequent queries will fail. When we run out of fuzzer // errors, it may be interesting to add fuzzing of create queries that // wraps columns into LowCardinality or Nullable. Also there are other // kinds of create queries such as CREATE DICTIONARY, we could fuzz // them as well. int this_query_runs = query_fuzzer_runs; if (as_insert || orig_ast->as()) { this_query_runs = 1; } ASTPtr fuzz_base = orig_ast; for (int fuzz_step = 0; fuzz_step < this_query_runs; fuzz_step++) { fprintf(stderr, "fuzzing step %d out of %d for query at pos %zd\n", fuzz_step, this_query_runs, this_query_begin - text.data()); ASTPtr ast_to_process; try { std::stringstream dump_before_fuzz; fuzz_base->dumpTree(dump_before_fuzz); auto base_before_fuzz = fuzz_base->formatForErrorMessage(); ast_to_process = fuzz_base->clone(); std::stringstream dump_of_cloned_ast; ast_to_process->dumpTree(dump_of_cloned_ast); // Run the original query as well. if (fuzz_step > 0) { fuzzer.fuzzMain(ast_to_process); } auto base_after_fuzz = fuzz_base->formatForErrorMessage(); // Debug AST cloning errors. if (base_before_fuzz != base_after_fuzz) { fprintf(stderr, "base before fuzz: %s\n" "base after fuzz: %s\n", base_before_fuzz.c_str(), base_after_fuzz.c_str()); fprintf(stderr, "dump before fuzz:\n%s\n", dump_before_fuzz.str().c_str()); fprintf(stderr, "dump of cloned ast:\n%s\n", dump_of_cloned_ast.str().c_str()); fprintf(stderr, "dump after fuzz:\n"); fuzz_base->dumpTree(std::cerr); fmt::print(stderr, "IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly."); assert(false); } auto fuzzed_text = ast_to_process->formatForErrorMessage(); if (fuzz_step > 0 && fuzzed_text == base_before_fuzz) { fprintf(stderr, "got boring ast\n"); continue; } parsed_query = ast_to_process; query_to_send = parsed_query->formatForErrorMessage(); processParsedSingleQuery(); } catch (...) { last_exception_received_from_server = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); received_exception_from_server = true; fmt::print(stderr, "Error on processing query '{}': {}\n", ast_to_process->formatForErrorMessage(), last_exception_received_from_server->message()); } if (!connection->isConnected()) { // Probably the server is dead because we found an assertion // failure. Fail fast. return begin; } if (received_exception_from_server) { // Query completed with error, ignore it and fuzz again. fprintf(stderr, "Got error, will fuzz again\n"); received_exception_from_server = false; last_exception_received_from_server.reset(); continue; } else if (ast_to_process->formatForErrorMessage().size() > 500) { // ast too long, start from original ast fprintf(stderr, "current ast too long, won't elaborate\n"); fuzz_base = orig_ast; } else { // fuzz starting from this successful query fprintf(stderr, "using this ast as etalon\n"); fuzz_base = ast_to_process; } } } return begin; } void processTextAsSingleQuery(const String & text_) { full_query = text_; /// Some parts of a query (result output and formatting) are executed /// client-side. Thus we need to parse the query. const char * begin = full_query.data(); parsed_query = parseQuery(begin, begin + full_query.size(), false); if (!parsed_query) return; // An INSERT query may have the data that follow query text. Remove the /// Send part of query without data, because data will be sent separately. auto * insert = parsed_query->as(); if (insert && insert->data) { query_to_send = full_query.substr(0, insert->data - full_query.data()); } else { query_to_send = full_query; } processParsedSingleQuery(); } // Parameters are in global variables: // 'parsed_query' -- the query AST, // 'query_to_send' -- the query text that is sent to server, // 'full_query' -- for INSERT queries, contains the query and the data that // follow it. Its memory is referenced by ASTInsertQuery::begin, end. void processParsedSingleQuery() { resetOutput(); last_exception_received_from_server.reset(); received_exception_from_server = false; if (echo_queries) { writeString(full_query, std_out); writeChar('\n', std_out); std_out.next(); } watch.restart(); processed_rows = 0; progress.reset(); show_progress_bar = false; written_progress_chars = 0; written_first_block = false; { /// Temporarily apply query settings to context. std::optional old_settings; SCOPE_EXIT({ if (old_settings) context.setSettings(*old_settings); }); auto apply_query_settings = [&](const IAST & settings_ast) { if (!old_settings) old_settings.emplace(context.getSettingsRef()); context.applySettingsChanges(settings_ast.as()->changes); }; const auto * insert = parsed_query->as(); if (insert && insert->settings_ast) apply_query_settings(*insert->settings_ast); /// FIXME: try to prettify this cast using `as<>()` const auto * with_output = dynamic_cast(parsed_query.get()); if (with_output && with_output->settings_ast) apply_query_settings(*with_output->settings_ast); connection->forceConnected(connection_parameters.timeouts); ASTPtr input_function; if (insert && insert->select) insert->tryFindInputFunction(input_function); /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. if (insert && (!insert->select || input_function) && !insert->watch) { if (input_function && insert->format.empty()) throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT); processInsertQuery(); } else processOrdinaryQuery(); } /// Do not change context (current DB, settings) in case of an exception. if (!received_exception_from_server) { if (const auto * set_query = parsed_query->as()) { /// Save all changes in settings to avoid losing them if the connection is lost. for (const auto & change : set_query->changes) { if (change.name == "profile") current_profile = change.value.safeGet(); else context.applySettingChange(change); } } if (const auto * use_query = parsed_query->as()) { const String & new_database = use_query->database; /// If the client initiates the reconnection, it takes the settings from the config. config().setString("database", new_database); /// If the connection initiates the reconnection, it uses its variable. connection->setDefaultDatabase(new_database); } } if (is_interactive) { std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << watch.elapsedSeconds() << " sec. "; if (progress.read_rows >= 1000) writeFinalProgress(); std::cout << std::endl << std::endl; } else if (print_time_to_stderr) { std::cerr << watch.elapsedSeconds() << "\n"; } } /// Convert external tables to ExternalTableData and send them using the connection. void sendExternalTables() { const auto * select = parsed_query->as(); if (!select && !external_tables.empty()) throw Exception("External tables could be sent only with select query", ErrorCodes::BAD_ARGUMENTS); std::vector data; for (auto & table : external_tables) data.emplace_back(table.getData(context)); connection->sendExternalTablesData(data); } /// Process the query that doesn't require transferring data blocks to the server. void processOrdinaryQuery() { /// Rewrite query only when we have query parameters. /// Note that if query is rewritten, comments in query are lost. /// But the user often wants to see comments in server logs, query log, processlist, etc. if (!query_parameters.empty()) { /// Replace ASTQueryParameter with ASTLiteral for prepared statements. ReplaceQueryParameterVisitor visitor(query_parameters); visitor.visit(parsed_query); /// Get new query after substitutions. Note that it cannot be done for INSERT query with embedded data. query_to_send = serializeAST(*parsed_query); } int retries_left = 10; for (;;) { assert(retries_left > 0); try { connection->sendQuery( connection_parameters.timeouts, query_to_send, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), &context.getClientInfo(), true); sendExternalTables(); receiveResult(); break; } catch (const Exception & e) { /// Retry when the server said "Client should retry" and no rows /// has been received yet. if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && --retries_left) { std::cerr << "Got a transient error from the server, will" << " retry (" << retries_left << " retries left)"; } else { throw; } } } } /// Process the query that requires transferring data blocks to the server. void processInsertQuery() { const auto parsed_insert_query = parsed_query->as(); if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof()))) throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); connection->sendQuery( connection_parameters.timeouts, query_to_send, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), &context.getClientInfo(), true); sendExternalTables(); /// Receive description of table structure. Block sample; ColumnsDescription columns_description; if (receiveSampleBlock(sample, columns_description)) { /// If structure was received (thus, server has not thrown an exception), /// send our data with that structure. sendData(sample, columns_description); receiveEndOfQuery(); } } ASTPtr parseQuery(const char * & pos, const char * end, bool allow_multi_statements) { ParserQuery parser(end, true); ASTPtr res; const auto & settings = context.getSettingsRef(); size_t max_length = 0; if (!allow_multi_statements) max_length = settings.max_query_size; if (is_interactive || ignore_error) { String message; res = tryParseQuery(parser, pos, end, message, true, "", allow_multi_statements, max_length, settings.max_parser_depth); if (!res) { std::cerr << std::endl << message << std::endl << std::endl; return nullptr; } } else res = parseQueryAndMovePosition(parser, pos, end, "", allow_multi_statements, max_length, settings.max_parser_depth); if (is_interactive) { std::cout << std::endl; formatAST(*res, std::cout); std::cout << std::endl << std::endl; } return res; } void sendData(Block & sample, const ColumnsDescription & columns_description) { /// If INSERT data must be sent. auto * parsed_insert_query = parsed_query->as(); if (!parsed_insert_query) return; if (parsed_insert_query->data) { /// Send data contained in the query. ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data); try { sendDataFrom(data_in, sample, columns_description); } catch (Exception & e) { /// The following query will use data from input // "INSERT INTO data FORMAT TSV\n " < data.csv // And may be pretty hard to debug, so add information about data source to make it easier. e.addMessage("data for INSERT was parsed from query"); throw; } // Remember where the data ended. We use this info later to determine // where the next query begins. parsed_insert_query->end = data_in.buffer().begin() + data_in.count(); } else if (!is_interactive) { /// Send data read from stdin. try { sendDataFrom(std_in, sample, columns_description); } catch (Exception & e) { e.addMessage("data for INSERT was parsed from stdin"); throw; } } else throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); } void sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description) { String current_format = insert_format; /// Data format can be specified in the INSERT query. if (const auto * insert = parsed_query->as()) { if (!insert->format.empty()) current_format = insert->format; } BlockInputStreamPtr block_input = context.getInputFormat( current_format, buf, sample, insert_format_max_block_size); const auto & column_defaults = columns_description.getDefaults(); if (!column_defaults.empty()) block_input = std::make_shared(block_input, column_defaults, context); BlockInputStreamPtr async_block_input = std::make_shared(block_input); async_block_input->readPrefix(); while (true) { Block block = async_block_input->read(); /// Check if server send Log packet receiveLogs(); /// Check if server send Exception packet auto packet_type = connection->checkPacket(); if (packet_type && *packet_type == Protocol::Server::Exception) { /* * We're exiting with error, so it makes sense to kill the * input stream without waiting for it to complete. */ async_block_input->cancel(true); return; } connection->sendData(block); processed_rows += block.rows(); if (!block) break; } async_block_input->readSuffix(); } /// Flush all buffers. void resetOutput() { block_out_stream.reset(); logs_out_stream.reset(); if (pager_cmd) { pager_cmd->in.close(); pager_cmd->wait(); } pager_cmd = nullptr; if (out_file_buf) { out_file_buf->next(); out_file_buf.reset(); } if (out_logs_buf) { out_logs_buf->next(); out_logs_buf.reset(); } std_out.next(); } /// Receives and processes packets coming from server. /// Also checks if query execution should be cancelled. void receiveResult() { InterruptListener interrupt_listener; bool cancelled = false; // TODO: get the poll_interval from commandline. const auto receive_timeout = connection_parameters.timeouts.receive_timeout; constexpr size_t default_poll_interval = 1000000; /// in microseconds constexpr size_t min_poll_interval = 5000; /// in microseconds const size_t poll_interval = std::max(min_poll_interval, std::min(receive_timeout.totalMicroseconds(), default_poll_interval)); while (true) { Stopwatch receive_watch(CLOCK_MONOTONIC_COARSE); while (true) { /// Has the Ctrl+C been pressed and thus the query should be cancelled? /// If this is the case, inform the server about it and receive the remaining packets /// to avoid losing sync. if (!cancelled) { auto cancel_query = [&] { connection->sendCancel(); cancelled = true; if (is_interactive) { clearProgress(); std::cout << "Cancelling query." << std::endl; } /// Pressing Ctrl+C twice results in shut down. interrupt_listener.unblock(); }; if (interrupt_listener.check()) { cancel_query(); } else { double elapsed = receive_watch.elapsedSeconds(); if (elapsed > receive_timeout.totalSeconds()) { std::cout << "Timeout exceeded while receiving data from server." << " Waited for " << static_cast(elapsed) << " seconds," << " timeout is " << receive_timeout.totalSeconds() << " seconds." << std::endl; cancel_query(); } } } /// Poll for changes after a cancellation check, otherwise it never reached /// because of progress updates from server. if (connection->poll(poll_interval)) break; } if (!receiveAndProcessPacket(cancelled)) break; } if (cancelled && is_interactive) std::cout << "Query was cancelled." << std::endl; } /// Receive a part of the result, or progress info or an exception and process it. /// Returns true if one should continue receiving packets. /// Output of result is suppressed if query was cancelled. bool receiveAndProcessPacket(bool cancelled) { Packet packet = connection->receivePacket(); switch (packet.type) { case Protocol::Server::Data: if (!cancelled) onData(packet.block); return true; case Protocol::Server::Progress: onProgress(packet.progress); return true; case Protocol::Server::ProfileInfo: onProfileInfo(packet.profile_info); return true; case Protocol::Server::Totals: if (!cancelled) onTotals(packet.block); return true; case Protocol::Server::Extremes: if (!cancelled) onExtremes(packet.block); return true; case Protocol::Server::Exception: onReceiveExceptionFromServer(*packet.exception); last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: onLogData(packet.block); return true; case Protocol::Server::EndOfStream: onEndOfStream(); return false; default: throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from server {}", packet.type, connection->getDescription()); } } /// Receive the block that serves as an example of the structure of table where data will be inserted. bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description) { while (true) { Packet packet = connection->receivePacket(); switch (packet.type) { case Protocol::Server::Data: out = packet.block; return true; case Protocol::Server::Exception: onReceiveExceptionFromServer(*packet.exception); last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: onLogData(packet.block); break; case Protocol::Server::TableColumns: columns_description = ColumnsDescription::parse(packet.multistring_message[1]); return receiveSampleBlock(out, columns_description); default: throw NetException("Unexpected packet from server (expected Data, Exception or Log, got " + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); } } } /// Process Log packets, exit when receive Exception or EndOfStream bool receiveEndOfQuery() { while (true) { Packet packet = connection->receivePacket(); switch (packet.type) { case Protocol::Server::EndOfStream: onEndOfStream(); return true; case Protocol::Server::Exception: onReceiveExceptionFromServer(*packet.exception); last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: onLogData(packet.block); break; default: throw NetException("Unexpected packet from server (expected Exception, EndOfStream or Log, got " + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); } } } /// Process Log packets, used when inserting data by blocks void receiveLogs() { auto packet_type = connection->checkPacket(); while (packet_type && *packet_type == Protocol::Server::Log) { receiveAndProcessPacket(false); packet_type = connection->checkPacket(); } } void initBlockOutputStream(const Block & block) { if (!block_out_stream) { WriteBuffer * out_buf = nullptr; String pager = config().getString("pager", ""); if (!pager.empty()) { signal(SIGPIPE, SIG_IGN); pager_cmd = ShellCommand::execute(pager, true); out_buf = &pager_cmd->in; } else { out_buf = &std_out; } String current_format = format; /// The query can specify output format or output file. /// FIXME: try to prettify this cast using `as<>()` if (const auto * query_with_output = dynamic_cast(parsed_query.get())) { if (query_with_output->out_file) { const auto & out_file_node = query_with_output->out_file->as(); const auto & out_file = out_file_node.value.safeGet(); out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); out_buf = &*out_file_buf; // We are writing to file, so default format is the same as in non-interactive mode. if (is_interactive && is_default_format) current_format = "TabSeparated"; } if (query_with_output->format != nullptr) { if (has_vertical_output_suffix) throw Exception("Output format already specified", ErrorCodes::CLIENT_OUTPUT_FORMAT_SPECIFIED); const auto & id = query_with_output->format->as(); current_format = id.name; } } if (has_vertical_output_suffix) current_format = "Vertical"; block_out_stream = context.getOutputFormat(current_format, *out_buf, block); block_out_stream->writePrefix(); } } void initLogsOutputStream() { if (!logs_out_stream) { WriteBuffer * wb = out_logs_buf.get(); if (!out_logs_buf) { if (server_logs_file.empty()) { /// Use stderr by default out_logs_buf = std::make_unique(STDERR_FILENO); wb = out_logs_buf.get(); } else if (server_logs_file == "-") { /// Use stdout if --server_logs_file=- specified wb = &std_out; } else { out_logs_buf = std::make_unique( server_logs_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT); wb = out_logs_buf.get(); } } logs_out_stream = std::make_shared(*wb, stdout_is_a_tty); logs_out_stream->writePrefix(); } } void onData(Block & block) { if (!block) return; processed_rows += block.rows(); initBlockOutputStream(block); /// The header block containing zero rows was used to initialize /// block_out_stream, do not output it. /// Also do not output too much data if we're fuzzing. if (block.rows() != 0 && (query_fuzzer_runs == 0 || processed_rows < 100)) { block_out_stream->write(block); written_first_block = true; } bool clear_progess = std_out.offset() > 0; if (clear_progess) clearProgress(); /// Received data block is immediately displayed to the user. block_out_stream->flush(); /// Restore progress bar after data block. if (clear_progess) writeProgress(); } void onLogData(Block & block) { initLogsOutputStream(); clearProgress(); logs_out_stream->write(block); logs_out_stream->flush(); } void onTotals(Block & block) { initBlockOutputStream(block); block_out_stream->setTotals(block); } void onExtremes(Block & block) { initBlockOutputStream(block); block_out_stream->setExtremes(block); } void onProgress(const Progress & value) { if (!progress.incrementPiecewiseAtomically(value)) { // Just a keep-alive update. return; } if (block_out_stream) block_out_stream->onProgress(value); writeProgress(); } void clearProgress() { if (written_progress_chars) { written_progress_chars = 0; std::cerr << "\r" CLEAR_TO_END_OF_LINE; } } void writeProgress() { if (!need_render_progress) return; /// Output all progress bar commands to stderr at once to avoid flicker. WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024); static size_t increment = 0; static const char * indicators[8] = { "\033[1;30m→\033[0m", "\033[1;31m↘\033[0m", "\033[1;32m↓\033[0m", "\033[1;33m↙\033[0m", "\033[1;34m←\033[0m", "\033[1;35m↖\033[0m", "\033[1;36m↑\033[0m", "\033[1m↗\033[0m", }; const char * indicator = indicators[increment % 8]; size_t terminal_width = getTerminalWidth(); if (!written_progress_chars) { /// If the current line is not empty, the progress must be output on the next line. /// The trick is found here: https://www.vidarholen.net/contents/blog/?p=878 message << std::string(terminal_width, ' '); } message << '\r'; size_t prefix_size = message.count(); message << indicator << " Progress: "; message << formatReadableQuantity(progress.read_rows) << " rows, " << formatReadableSizeWithDecimalSuffix(progress.read_bytes); size_t elapsed_ns = watch.elapsed(); if (elapsed_ns) message << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., " << formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) "; else message << ". "; written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 2); /// Don't count invisible output (escape sequences). /// If the approximate number of rows to process is known, we can display a progress bar and percentage. if (progress.total_rows_to_read > 0) { size_t total_rows_corrected = std::max(progress.read_rows, progress.total_rows_to_read); /// To avoid flicker, display progress bar only if .5 seconds have passed since query execution start /// and the query is less than halfway done. if (elapsed_ns > 500000000) { /// Trigger to start displaying progress bar. If query is mostly done, don't display it. if (progress.read_rows * 2 < total_rows_corrected) show_progress_bar = true; if (show_progress_bar) { ssize_t width_of_progress_bar = static_cast(terminal_width) - written_progress_chars - strlen(" 99%"); if (width_of_progress_bar > 0) { std::string bar = UnicodeBar::render(UnicodeBar::getWidth(progress.read_rows, 0, total_rows_corrected, width_of_progress_bar)); message << "\033[0;32m" << bar << "\033[0m"; if (width_of_progress_bar > static_cast(bar.size() / UNICODE_BAR_CHAR_SIZE)) message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' '); } } } /// Underestimate percentage a bit to avoid displaying 100%. message << ' ' << (99 * progress.read_rows / total_rows_corrected) << '%'; } message << CLEAR_TO_END_OF_LINE; ++increment; message.next(); } void writeFinalProgress() { std::cout << "Processed " << formatReadableQuantity(progress.read_rows) << " rows, " << formatReadableSizeWithDecimalSuffix(progress.read_bytes); size_t elapsed_ns = watch.elapsed(); if (elapsed_ns) std::cout << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., " << formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) "; else std::cout << ". "; } void onReceiveExceptionFromServer(const Exception & e) { resetOutput(); received_exception_from_server = true; actual_server_error = e.code(); if (expected_server_error) { if (actual_server_error == expected_server_error) return; std::cerr << "Expected error code: " << expected_server_error << " but got: " << actual_server_error << "." << std::endl; } std::string text = e.displayText(); auto embedded_stack_trace_pos = text.find("Stack trace"); if (std::string::npos != embedded_stack_trace_pos && !config().getBool("stacktrace", false)) text.resize(embedded_stack_trace_pos); /// If we probably have progress bar, we should add additional newline, /// otherwise exception may display concatenated with the progress bar. if (need_render_progress) std::cerr << '\n'; std::cerr << "Received exception from server (version " << server_version << "):" << std::endl << "Code: " << e.code() << ". " << text << std::endl; } void onProfileInfo(const BlockStreamProfileInfo & profile_info) { if (profile_info.hasAppliedLimit() && block_out_stream) block_out_stream->setRowsBeforeLimit(profile_info.getRowsBeforeLimit()); } void onEndOfStream() { clearProgress(); if (block_out_stream) block_out_stream->writeSuffix(); if (logs_out_stream) logs_out_stream->writeSuffix(); resetOutput(); if (is_interactive && !written_first_block) { clearProgress(); std::cout << "Ok." << std::endl; } } static void showClientVersion() { std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl; } public: void init(int argc, char ** argv) { /// Don't parse options with Poco library. We need more sophisticated processing. stopOptionsProcessing(); /** We allow different groups of arguments: * - common arguments; * - arguments for any number of external tables each in form "--external args...", * where possible args are file, name, format, structure, types; * - param arguments for prepared statements. * Split these groups before processing. */ using Arguments = std::vector; Arguments common_arguments{""}; /// 0th argument is ignored. std::vector external_tables_arguments; bool in_external_group = false; for (int arg_num = 1; arg_num < argc; ++arg_num) { const char * arg = argv[arg_num]; if (0 == strcmp(arg, "--external")) { in_external_group = true; external_tables_arguments.emplace_back(Arguments{""}); } /// Options with value after equal sign. else if (in_external_group && (0 == strncmp(arg, "--file=", strlen("--file=")) || 0 == strncmp(arg, "--name=", strlen("--name=")) || 0 == strncmp(arg, "--format=", strlen("--format=")) || 0 == strncmp(arg, "--structure=", strlen("--structure=")) || 0 == strncmp(arg, "--types=", strlen("--types=")))) { external_tables_arguments.back().emplace_back(arg); } /// Options with value after whitespace. else if (in_external_group && (0 == strcmp(arg, "--file") || 0 == strcmp(arg, "--name") || 0 == strcmp(arg, "--format") || 0 == strcmp(arg, "--structure") || 0 == strcmp(arg, "--types"))) { if (arg_num + 1 < argc) { external_tables_arguments.back().emplace_back(arg); ++arg_num; arg = argv[arg_num]; external_tables_arguments.back().emplace_back(arg); } else break; } else { in_external_group = false; /// Parameter arg after underline. if (startsWith(arg, "--param_")) { const char * param_continuation = arg + strlen("--param_"); const char * equal_pos = strchr(param_continuation, '='); if (equal_pos == param_continuation) throw Exception("Parameter name cannot be empty", ErrorCodes::BAD_ARGUMENTS); if (equal_pos) { /// param_name=value query_parameters.emplace(String(param_continuation, equal_pos), String(equal_pos + 1)); } else { /// param_name value ++arg_num; arg = argv[arg_num]; query_parameters.emplace(String(param_continuation), String(arg)); } } else common_arguments.emplace_back(arg); } } stdin_is_a_tty = isatty(STDIN_FILENO); stdout_is_a_tty = isatty(STDOUT_FILENO); uint64_t terminal_width = 0; if (stdin_is_a_tty) terminal_width = getTerminalWidth(); namespace po = boost::program_options; /// Main commandline options related to client functionality and all parameters from Settings. po::options_description main_description = createOptionsDescription("Main options", terminal_width); main_description.add_options() ("help", "produce help message") ("config-file,C", po::value(), "config-file path") ("config,c", po::value(), "config-file path (another shorthand)") ("host,h", po::value()->default_value("localhost"), "server host") ("port", po::value()->default_value(9000), "server port") ("secure,s", "Use TLS connection") ("user,u", po::value()->default_value("default"), "user") /** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown. * implicit_value is used to avoid this exception (to allow user to type just "--password") * Since currently boost provides no way to check if a value has been set implicitly for an option, * the "\n" is used to distinguish this case because there is hardly a chance an user would use "\n" * as the password. */ ("password", po::value()->implicit_value("\n", ""), "password") ("ask-password", "ask-password") ("quota_key", po::value(), "A string to differentiate quotas when the user have keyed quotas configured on server") ("query_id", po::value(), "query_id") ("query,q", po::value(), "query") ("database,d", po::value(), "database") ("pager", po::value(), "pager") ("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.") ("suggestion_limit", po::value()->default_value(10000), "Suggestion limit for how many databases, tables and columns to fetch.") ("multiline,m", "multiline") ("multiquery,n", "multiquery") ("format,f", po::value(), "default output format") ("testmode,T", "enable test hints in comments") ("ignore-error", "do not stop processing in multiquery mode") ("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command") ("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)") ("stacktrace", "print stack traces of exceptions") ("progress", "print progress even in non-interactive mode") ("version,V", "print version information and exit") ("version-clean", "print version in machine-readable format and exit") ("echo", "in batch mode, print query before execution") ("max_client_network_bandwidth", po::value(), "the maximum speed of data exchange over the network for the client in bytes per second.") ("compression", po::value(), "enable or disable compression") ("highlight", po::value()->default_value(true), "enable or disable basic syntax highlight in interactive command line") ("log-level", po::value(), "client log level") ("server_logs_file", po::value(), "put server logs into specified file") ("query-fuzzer-runs", po::value()->default_value(0), "query fuzzer runs") ; Settings cmd_settings; cmd_settings.addProgramOptions(main_description); /// Commandline options related to external tables. po::options_description external_description = createOptionsDescription("External tables options", terminal_width); external_description.add_options() ("file", po::value(), "data file or - for stdin") ("name", po::value()->default_value("_data"), "name of the table") ("format", po::value()->default_value("TabSeparated"), "data format") ("structure", po::value(), "structure") ("types", po::value(), "types") ; /// Parse main commandline options. po::parsed_options parsed = po::command_line_parser(common_arguments).options(main_description).run(); auto unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::include_positional); // unrecognized_options[0] is "", I don't understand why we need "" as the first argument which unused if (unrecognized_options.size() > 1) { throw Exception("Unrecognized option '" + unrecognized_options[1] + "'", ErrorCodes::UNRECOGNIZED_ARGUMENTS); } po::variables_map options; po::store(parsed, options); po::notify(options); if (options.count("version") || options.count("V")) { showClientVersion(); exit(0); } if (options.count("version-clean")) { std::cout << VERSION_STRING; exit(0); } /// Output of help message. if (options.count("help") || (options.count("host") && options["host"].as() == "elp")) /// If user writes -help instead of --help. { std::cout << main_description << "\n"; std::cout << external_description << "\n"; std::cout << "In addition, --param_name=value can be specified for substitution of parameters for parametrized queries.\n"; exit(0); } if (options.count("log-level")) Poco::Logger::root().setLevel(options["log-level"].as()); size_t number_of_external_tables_with_stdin_source = 0; for (size_t i = 0; i < external_tables_arguments.size(); ++i) { /// Parse commandline options related to external tables. po::parsed_options parsed_tables = po::command_line_parser(external_tables_arguments[i]).options(external_description).run(); po::variables_map external_options; po::store(parsed_tables, external_options); try { external_tables.emplace_back(external_options); if (external_tables.back().file == "-") ++number_of_external_tables_with_stdin_source; if (number_of_external_tables_with_stdin_source > 1) throw Exception("Two or more external tables has stdin (-) set as --file field", ErrorCodes::BAD_ARGUMENTS); } catch (const Exception & e) { std::string text = e.displayText(); std::cerr << "Code: " << e.code() << ". " << text << std::endl; std::cerr << "Table №" << i << std::endl << std::endl; /// Avoid the case when error exit code can possibly overflow to normal (zero). auto exit_code = e.code() % 256; if (exit_code == 0) exit_code = 255; exit(exit_code); } } context.makeGlobalContext(); context.setSettings(cmd_settings); /// Copy settings-related program options to config. /// TODO: Is this code necessary? for (const auto & setting : context.getSettingsRef().all()) { const auto & name = setting.getName(); if (options.count(name)) config().setString(name, options[name].as()); } if (options.count("config-file") && options.count("config")) throw Exception("Two or more configuration files referenced in arguments", ErrorCodes::BAD_ARGUMENTS); /// Save received data into the internal config. if (options.count("config-file")) config().setString("config-file", options["config-file"].as()); if (options.count("config")) config().setString("config-file", options["config"].as()); if (options.count("host") && !options["host"].defaulted()) config().setString("host", options["host"].as()); if (options.count("query_id")) config().setString("query_id", options["query_id"].as()); if (options.count("query")) config().setString("query", options["query"].as()); if (options.count("database")) config().setString("database", options["database"].as()); if (options.count("pager")) config().setString("pager", options["pager"].as()); if (options.count("port") && !options["port"].defaulted()) config().setInt("port", options["port"].as()); if (options.count("secure")) config().setBool("secure", true); if (options.count("user") && !options["user"].defaulted()) config().setString("user", options["user"].as()); if (options.count("password")) config().setString("password", options["password"].as()); if (options.count("ask-password")) config().setBool("ask-password", true); if (options.count("quota_key")) config().setString("quota_key", options["quota_key"].as()); if (options.count("multiline")) config().setBool("multiline", true); if (options.count("multiquery")) config().setBool("multiquery", true); if (options.count("testmode")) config().setBool("testmode", true); if (options.count("ignore-error")) config().setBool("ignore-error", true); if (options.count("format")) config().setString("format", options["format"].as()); if (options.count("vertical")) config().setBool("vertical", true); if (options.count("stacktrace")) config().setBool("stacktrace", true); if (options.count("progress")) config().setBool("progress", true); if (options.count("echo")) config().setBool("echo", true); if (options.count("time")) print_time_to_stderr = true; if (options.count("max_client_network_bandwidth")) max_client_network_bandwidth = options["max_client_network_bandwidth"].as(); if (options.count("compression")) config().setBool("compression", options["compression"].as()); if (options.count("server_logs_file")) server_logs_file = options["server_logs_file"].as(); if (options.count("disable_suggestion")) config().setBool("disable_suggestion", true); if (options.count("suggestion_limit")) config().setInt("suggestion_limit", options["suggestion_limit"].as()); if (options.count("highlight")) config().setBool("highlight", options["highlight"].as()); if ((query_fuzzer_runs = options["query-fuzzer-runs"].as())) { // Fuzzer implies multiquery. config().setBool("multiquery", true); // Ignore errors in parsing queries. // TODO stop using parseQuery. config().setBool("ignore-error", true); ignore_error = true; } argsToConfig(common_arguments, config(), 100); clearPasswordFromCommandLine(argc, argv); } }; } #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wmissing-declarations" int mainEntryClickHouseClient(int argc, char ** argv) { try { DB::Client client; client.init(argc, argv); return client.run(); } catch (const boost::program_options::error & e) { std::cerr << "Bad arguments: " << e.what() << std::endl; return 1; } catch (const DB::Exception & e) { std::string text = e.displayText(); std::cerr << "Code: " << e.code() << ". " << text << std::endl; return 1; } catch (...) { std::cerr << DB::getCurrentExceptionMessage(true) << std::endl; return 1; } }