From a739a105569b0d4d2505174b80735488b56aac49 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Fri, 26 Jun 2020 04:34:06 +0300 Subject: [PATCH 1/2] Minor cleanup in Client.cpp before fuzzing --- programs/client/Client.cpp | 239 +++++++++++++++++++------------------ 1 file changed, 125 insertions(+), 114 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index be148ddb3e3..cfd7a22c11f 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -177,10 +177,10 @@ private: ASTPtr parsed_query; /// The last exception that was received from the server. Is used for the return code in batch mode. - std::unique_ptr last_exception; + std::unique_ptr last_exception_received_from_server; /// If the last query resulted in exception. - bool got_exception = false; + bool received_exception_from_server = false; int expected_server_error = 0; int expected_client_error = 0; int actual_server_error = 0; @@ -616,7 +616,7 @@ private: try { - if (!process(input)) + if (!processQueryText(input)) break; } catch (const Exception & e) @@ -657,8 +657,8 @@ private: nonInteractive(); /// If exception code isn't zero, we should return non-zero return code anyway. - if (last_exception) - return last_exception->code() != 0 ? last_exception->code() : -1; + if (last_exception_received_from_server) + return last_exception_received_from_server->code() != 0 ? last_exception_received_from_server->code() : -1; return 0; } @@ -753,111 +753,114 @@ private: readStringUntilEOF(text, in); } - process(text); + processQueryText(text); } - - bool process(const String & text) + bool processQueryText(const String & text) { if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; }))) return false; - const bool test_mode = config().has("testmode"); - if (config().has("multiquery")) + if (!config().has("multiquery")) { - { /// disable logs if expects errors - TestHint test_hint(test_mode, text); - if (test_hint.clientError() || test_hint.serverError()) - process("SET send_logs_level = 'none'"); - } - - /// Several queries separated by ';'. - /// INSERT data is ended by the end of line, not ';'. - - const char * begin = text.data(); - const char * end = begin + text.size(); - - while (begin < end) - { - const char * pos = begin; - ASTPtr ast = parseQuery(pos, end, true); - - if (!ast) - { - if (ignore_error) - { - Tokens tokens(begin, end); - IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); - while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid()) - ++token_iterator; - begin = token_iterator->end; - - continue; - } - return true; - } - - auto * insert = ast->as(); - - if (insert && insert->data) - { - pos = find_first_symbols<'\n'>(insert->data, end); - insert->end = pos; - } - - String str = text.substr(begin - text.data(), pos - begin); - - begin = pos; - while (isWhitespaceASCII(*begin) || *begin == ';') - ++begin; - - TestHint test_hint(test_mode, str); - expected_client_error = test_hint.clientError(); - expected_server_error = test_hint.serverError(); - - try - { - auto ast_to_process = ast; - if (insert && insert->data) - ast_to_process = nullptr; - - if (!processSingleQuery(str, ast_to_process) && !ignore_error) - return false; - } - catch (...) - { - last_exception = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); - actual_client_error = last_exception->code(); - if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error)) - std::cerr << "Error on processing query: " << str << std::endl << last_exception->message(); - got_exception = true; - } - - if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception)) - connection->forceConnected(connection_parameters.timeouts); - - if (got_exception && !ignore_error) - { - if (is_interactive) - break; - else - return false; - } - } - + processSingleQuery(text); return true; } - else - { - return processSingleQuery(text); + + return processMultiQuery(text); + } + + bool processMultiQuery(const String & text) + { + const bool test_mode = config().has("testmode"); + + { /// disable logs if expects errors + TestHint test_hint(test_mode, text); + if (test_hint.clientError() || test_hint.serverError()) + processSingleQuery("SET send_logs_level = 'none'"); } + + /// Several queries separated by ';'. + /// INSERT data is ended by the end of line, not ';'. + + const char * begin = text.data(); + const char * end = begin + text.size(); + + while (begin < end) + { + const char * pos = begin; + ASTPtr orig_ast = parseQuery(pos, end, true); + + if (!orig_ast) + { + if (ignore_error) + { + Tokens tokens(begin, end); + IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); + while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid()) + ++token_iterator; + begin = token_iterator->end; + + continue; + } + return true; + } + + auto * insert = orig_ast->as(); + + if (insert && insert->data) + { + pos = find_first_symbols<'\n'>(insert->data, end); + insert->end = pos; + } + + String str = text.substr(begin - text.data(), pos - begin); + + begin = pos; + while (isWhitespaceASCII(*begin) || *begin == ';') + ++begin; + + TestHint test_hint(test_mode, str); + expected_client_error = test_hint.clientError(); + expected_server_error = test_hint.serverError(); + + try + { + auto ast_to_process = orig_ast; + if (insert && insert->data) + ast_to_process = nullptr; + + processSingleQuery(str, ast_to_process); + } + catch (...) + { + last_exception_received_from_server = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); + actual_client_error = last_exception_received_from_server->code(); + if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error)) + std::cerr << "Error on processing query: " << str << std::endl << last_exception_received_from_server->message(); + received_exception_from_server = true; + } + + if (!test_hint.checkActual(actual_server_error, actual_client_error, received_exception_from_server, last_exception_received_from_server)) + connection->forceConnected(connection_parameters.timeouts); + + if (received_exception_from_server && !ignore_error) + { + if (is_interactive) + break; + else + return false; + } + } + + return true; } - bool processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr) + void processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr) { resetOutput(); - got_exception = false; + received_exception_from_server = false; if (echo_queries) { @@ -880,7 +883,7 @@ private: } if (!parsed_query) - return true; + return; processed_rows = 0; progress.reset(); @@ -924,7 +927,7 @@ private: } /// Do not change context (current DB, settings) in case of an exception. - if (!got_exception) + if (!received_exception_from_server) { if (const auto * set_query = parsed_query->as()) { @@ -962,8 +965,6 @@ private: { std::cerr << watch.elapsedSeconds() << "\n"; } - - return true; } @@ -998,9 +999,11 @@ private: query = serializeAST(*parsed_query); } - static constexpr size_t max_retries = 10; - for (size_t retry = 0; retry < max_retries; ++retry) + int retries_left = 10; + for (;;) { + assert(retries_left > 0); + try { connection->sendQuery( @@ -1019,11 +1022,19 @@ private: } catch (const Exception & e) { - /// Retry when the server said "Client should retry" and no rows has been received yet. - if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && retry + 1 < max_retries) - continue; - - throw; + /// Retry when the server said "Client should retry" and no rows + /// has been received yet. + if (processed_rows == 0 + && e.code() == ErrorCodes::DEADLOCK_AVOIDED + && --retries_left) + { + std::cerr << "Got a transient error from the server, will" + << " retry (" << retries_left << " retries left)"; + } + else + { + throw; + } } } } @@ -1310,8 +1321,8 @@ private: return true; case Protocol::Server::Exception: - onException(*packet.exception); - last_exception = std::move(packet.exception); + onReceiveExceptionFromServer(*packet.exception); + last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: @@ -1342,8 +1353,8 @@ private: return true; case Protocol::Server::Exception: - onException(*packet.exception); - last_exception = std::move(packet.exception); + onReceiveExceptionFromServer(*packet.exception); + last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: @@ -1376,8 +1387,8 @@ private: return true; case Protocol::Server::Exception: - onException(*packet.exception); - last_exception = std::move(packet.exception); + onReceiveExceptionFromServer(*packet.exception); + last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: @@ -1660,10 +1671,10 @@ private: } - void onException(const Exception & e) + void onReceiveExceptionFromServer(const Exception & e) { resetOutput(); - got_exception = true; + received_exception_from_server = true; actual_server_error = e.code(); if (expected_server_error) From 4ebe1d34182600cc5498518eb8a8b4549c9b6e87 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Fri, 26 Jun 2020 07:37:18 +0300 Subject: [PATCH 2/2] more cleanup --- programs/client/Client.cpp | 87 ++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index cfd7a22c11f..b0371550903 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -132,7 +132,12 @@ private: std::unique_ptr connection; /// Connection to DB. String query_id; /// Current query_id. - String query; /// Current query. + String full_query; /// Current query as it was given to the client. + + // Current query as it will be sent to the server. It may differ from the + // full query for INSERT queries, for which the data that follows the query + // is stripped and sent separately. + String query_to_send; String format; /// Query results output format. bool is_default_format = true; /// false, if format is set in the config or command line. @@ -763,7 +768,7 @@ private: if (!config().has("multiquery")) { - processSingleQuery(text); + processTextAsSingleQuery(text); return true; } @@ -777,7 +782,7 @@ private: { /// disable logs if expects errors TestHint test_hint(test_mode, text); if (test_hint.clientError() || test_hint.serverError()) - processSingleQuery("SET send_logs_level = 'none'"); + processTextAsSingleQuery("SET send_logs_level = 'none'"); } /// Several queries separated by ';'. @@ -828,9 +833,17 @@ private: { auto ast_to_process = orig_ast; if (insert && insert->data) + { ast_to_process = nullptr; - - processSingleQuery(str, ast_to_process); + processTextAsSingleQuery(str); + } + else + { + parsed_query = ast_to_process; + full_query = str; + query_to_send = str; + processParsedSingleQuery(); + } } catch (...) { @@ -857,34 +870,51 @@ private: } - void processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr) + void processTextAsSingleQuery(const String & text_) + { + full_query = text_; + + /// Some parts of a query (result output and formatting) are executed + /// client-side. Thus we need to parse the query. + const char * begin = full_query.data(); + parsed_query = parseQuery(begin, begin + full_query.size(), false); + + if (!parsed_query) + return; + + // An INSERT query may have the data that follow query text. Remove the + /// Send part of query without data, because data will be sent separately. + auto * insert = parsed_query->as(); + if (insert && insert->data) + { + query_to_send = full_query.substr(0, insert->data - full_query.data()); + } + else + { + query_to_send = full_query; + } + + processParsedSingleQuery(); + } + + // Parameters are in global variables: + // 'parsed_query' -- the query AST, + // 'query_to_send' -- the query text that is sent to server, + // 'full_query' -- for INSERT queries, contains the query and the data that + // follow it. Its memory is referenced by ASTInsertQuery::begin, end. + void processParsedSingleQuery() { resetOutput(); received_exception_from_server = false; if (echo_queries) { - writeString(line, std_out); + writeString(full_query, std_out); writeChar('\n', std_out); std_out.next(); } watch.restart(); - - query = line; - - /// Some parts of a query (result output and formatting) are executed client-side. - /// Thus we need to parse the query. - parsed_query = parsed_query_; - if (!parsed_query) - { - const char * begin = query.data(); - parsed_query = parseQuery(begin, begin + query.size(), false); - } - - if (!parsed_query) - return; - processed_rows = 0; progress.reset(); show_progress_bar = false; @@ -996,7 +1026,7 @@ private: visitor.visit(parsed_query); /// Get new query after substitutions. Note that it cannot be done for INSERT query with embedded data. - query = serializeAST(*parsed_query); + query_to_send = serializeAST(*parsed_query); } int retries_left = 10; @@ -1008,7 +1038,7 @@ private: { connection->sendQuery( connection_parameters.timeouts, - query, + query_to_send, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), @@ -1043,18 +1073,13 @@ private: /// Process the query that requires transferring data blocks to the server. void processInsertQuery() { - /// Send part of query without data, because data will be sent separately. - const auto & parsed_insert_query = parsed_query->as(); - String query_without_data = parsed_insert_query.data - ? query.substr(0, parsed_insert_query.data - query.data()) - : query; - + const auto parsed_insert_query = parsed_query->as(); if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof()))) throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); connection->sendQuery( connection_parameters.timeouts, - query_without_data, + query_to_send, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(),