diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index be148ddb3e3..cfd7a22c11f 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -177,10 +177,10 @@ private: ASTPtr parsed_query; /// The last exception that was received from the server. Is used for the return code in batch mode. - std::unique_ptr last_exception; + std::unique_ptr last_exception_received_from_server; /// If the last query resulted in exception. - bool got_exception = false; + bool received_exception_from_server = false; int expected_server_error = 0; int expected_client_error = 0; int actual_server_error = 0; @@ -616,7 +616,7 @@ private: try { - if (!process(input)) + if (!processQueryText(input)) break; } catch (const Exception & e) @@ -657,8 +657,8 @@ private: nonInteractive(); /// If exception code isn't zero, we should return non-zero return code anyway. - if (last_exception) - return last_exception->code() != 0 ? last_exception->code() : -1; + if (last_exception_received_from_server) + return last_exception_received_from_server->code() != 0 ? last_exception_received_from_server->code() : -1; return 0; } @@ -753,111 +753,114 @@ private: readStringUntilEOF(text, in); } - process(text); + processQueryText(text); } - - bool process(const String & text) + bool processQueryText(const String & text) { if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; }))) return false; - const bool test_mode = config().has("testmode"); - if (config().has("multiquery")) + if (!config().has("multiquery")) { - { /// disable logs if expects errors - TestHint test_hint(test_mode, text); - if (test_hint.clientError() || test_hint.serverError()) - process("SET send_logs_level = 'none'"); - } - - /// Several queries separated by ';'. - /// INSERT data is ended by the end of line, not ';'. - - const char * begin = text.data(); - const char * end = begin + text.size(); - - while (begin < end) - { - const char * pos = begin; - ASTPtr ast = parseQuery(pos, end, true); - - if (!ast) - { - if (ignore_error) - { - Tokens tokens(begin, end); - IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); - while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid()) - ++token_iterator; - begin = token_iterator->end; - - continue; - } - return true; - } - - auto * insert = ast->as(); - - if (insert && insert->data) - { - pos = find_first_symbols<'\n'>(insert->data, end); - insert->end = pos; - } - - String str = text.substr(begin - text.data(), pos - begin); - - begin = pos; - while (isWhitespaceASCII(*begin) || *begin == ';') - ++begin; - - TestHint test_hint(test_mode, str); - expected_client_error = test_hint.clientError(); - expected_server_error = test_hint.serverError(); - - try - { - auto ast_to_process = ast; - if (insert && insert->data) - ast_to_process = nullptr; - - if (!processSingleQuery(str, ast_to_process) && !ignore_error) - return false; - } - catch (...) - { - last_exception = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); - actual_client_error = last_exception->code(); - if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error)) - std::cerr << "Error on processing query: " << str << std::endl << last_exception->message(); - got_exception = true; - } - - if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception)) - connection->forceConnected(connection_parameters.timeouts); - - if (got_exception && !ignore_error) - { - if (is_interactive) - break; - else - return false; - } - } - + processSingleQuery(text); return true; } - else - { - return processSingleQuery(text); + + return processMultiQuery(text); + } + + bool processMultiQuery(const String & text) + { + const bool test_mode = config().has("testmode"); + + { /// disable logs if expects errors + TestHint test_hint(test_mode, text); + if (test_hint.clientError() || test_hint.serverError()) + processSingleQuery("SET send_logs_level = 'none'"); } + + /// Several queries separated by ';'. + /// INSERT data is ended by the end of line, not ';'. + + const char * begin = text.data(); + const char * end = begin + text.size(); + + while (begin < end) + { + const char * pos = begin; + ASTPtr orig_ast = parseQuery(pos, end, true); + + if (!orig_ast) + { + if (ignore_error) + { + Tokens tokens(begin, end); + IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth); + while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid()) + ++token_iterator; + begin = token_iterator->end; + + continue; + } + return true; + } + + auto * insert = orig_ast->as(); + + if (insert && insert->data) + { + pos = find_first_symbols<'\n'>(insert->data, end); + insert->end = pos; + } + + String str = text.substr(begin - text.data(), pos - begin); + + begin = pos; + while (isWhitespaceASCII(*begin) || *begin == ';') + ++begin; + + TestHint test_hint(test_mode, str); + expected_client_error = test_hint.clientError(); + expected_server_error = test_hint.serverError(); + + try + { + auto ast_to_process = orig_ast; + if (insert && insert->data) + ast_to_process = nullptr; + + processSingleQuery(str, ast_to_process); + } + catch (...) + { + last_exception_received_from_server = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); + actual_client_error = last_exception_received_from_server->code(); + if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error)) + std::cerr << "Error on processing query: " << str << std::endl << last_exception_received_from_server->message(); + received_exception_from_server = true; + } + + if (!test_hint.checkActual(actual_server_error, actual_client_error, received_exception_from_server, last_exception_received_from_server)) + connection->forceConnected(connection_parameters.timeouts); + + if (received_exception_from_server && !ignore_error) + { + if (is_interactive) + break; + else + return false; + } + } + + return true; } - bool processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr) + void processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr) { resetOutput(); - got_exception = false; + received_exception_from_server = false; if (echo_queries) { @@ -880,7 +883,7 @@ private: } if (!parsed_query) - return true; + return; processed_rows = 0; progress.reset(); @@ -924,7 +927,7 @@ private: } /// Do not change context (current DB, settings) in case of an exception. - if (!got_exception) + if (!received_exception_from_server) { if (const auto * set_query = parsed_query->as()) { @@ -962,8 +965,6 @@ private: { std::cerr << watch.elapsedSeconds() << "\n"; } - - return true; } @@ -998,9 +999,11 @@ private: query = serializeAST(*parsed_query); } - static constexpr size_t max_retries = 10; - for (size_t retry = 0; retry < max_retries; ++retry) + int retries_left = 10; + for (;;) { + assert(retries_left > 0); + try { connection->sendQuery( @@ -1019,11 +1022,19 @@ private: } catch (const Exception & e) { - /// Retry when the server said "Client should retry" and no rows has been received yet. - if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && retry + 1 < max_retries) - continue; - - throw; + /// Retry when the server said "Client should retry" and no rows + /// has been received yet. + if (processed_rows == 0 + && e.code() == ErrorCodes::DEADLOCK_AVOIDED + && --retries_left) + { + std::cerr << "Got a transient error from the server, will" + << " retry (" << retries_left << " retries left)"; + } + else + { + throw; + } } } } @@ -1310,8 +1321,8 @@ private: return true; case Protocol::Server::Exception: - onException(*packet.exception); - last_exception = std::move(packet.exception); + onReceiveExceptionFromServer(*packet.exception); + last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: @@ -1342,8 +1353,8 @@ private: return true; case Protocol::Server::Exception: - onException(*packet.exception); - last_exception = std::move(packet.exception); + onReceiveExceptionFromServer(*packet.exception); + last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: @@ -1376,8 +1387,8 @@ private: return true; case Protocol::Server::Exception: - onException(*packet.exception); - last_exception = std::move(packet.exception); + onReceiveExceptionFromServer(*packet.exception); + last_exception_received_from_server = std::move(packet.exception); return false; case Protocol::Server::Log: @@ -1660,10 +1671,10 @@ private: } - void onException(const Exception & e) + void onReceiveExceptionFromServer(const Exception & e) { resetOutput(); - got_exception = true; + received_exception_from_server = true; actual_server_error = e.code(); if (expected_server_error)