mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Minor cleanup in Client.cpp before fuzzing
This commit is contained in:
parent
751319d0ef
commit
a739a10556
@ -177,10 +177,10 @@ private:
|
||||
ASTPtr parsed_query;
|
||||
|
||||
/// The last exception that was received from the server. Is used for the return code in batch mode.
|
||||
std::unique_ptr<Exception> last_exception;
|
||||
std::unique_ptr<Exception> last_exception_received_from_server;
|
||||
|
||||
/// If the last query resulted in exception.
|
||||
bool got_exception = false;
|
||||
bool received_exception_from_server = false;
|
||||
int expected_server_error = 0;
|
||||
int expected_client_error = 0;
|
||||
int actual_server_error = 0;
|
||||
@ -616,7 +616,7 @@ private:
|
||||
|
||||
try
|
||||
{
|
||||
if (!process(input))
|
||||
if (!processQueryText(input))
|
||||
break;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
@ -657,8 +657,8 @@ private:
|
||||
nonInteractive();
|
||||
|
||||
/// If exception code isn't zero, we should return non-zero return code anyway.
|
||||
if (last_exception)
|
||||
return last_exception->code() != 0 ? last_exception->code() : -1;
|
||||
if (last_exception_received_from_server)
|
||||
return last_exception_received_from_server->code() != 0 ? last_exception_received_from_server->code() : -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -753,111 +753,114 @@ private:
|
||||
readStringUntilEOF(text, in);
|
||||
}
|
||||
|
||||
process(text);
|
||||
processQueryText(text);
|
||||
}
|
||||
|
||||
|
||||
bool process(const String & text)
|
||||
bool processQueryText(const String & text)
|
||||
{
|
||||
if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; })))
|
||||
return false;
|
||||
|
||||
const bool test_mode = config().has("testmode");
|
||||
if (config().has("multiquery"))
|
||||
if (!config().has("multiquery"))
|
||||
{
|
||||
{ /// disable logs if expects errors
|
||||
TestHint test_hint(test_mode, text);
|
||||
if (test_hint.clientError() || test_hint.serverError())
|
||||
process("SET send_logs_level = 'none'");
|
||||
}
|
||||
|
||||
/// Several queries separated by ';'.
|
||||
/// INSERT data is ended by the end of line, not ';'.
|
||||
|
||||
const char * begin = text.data();
|
||||
const char * end = begin + text.size();
|
||||
|
||||
while (begin < end)
|
||||
{
|
||||
const char * pos = begin;
|
||||
ASTPtr ast = parseQuery(pos, end, true);
|
||||
|
||||
if (!ast)
|
||||
{
|
||||
if (ignore_error)
|
||||
{
|
||||
Tokens tokens(begin, end);
|
||||
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
|
||||
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
||||
++token_iterator;
|
||||
begin = token_iterator->end;
|
||||
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
auto * insert = ast->as<ASTInsertQuery>();
|
||||
|
||||
if (insert && insert->data)
|
||||
{
|
||||
pos = find_first_symbols<'\n'>(insert->data, end);
|
||||
insert->end = pos;
|
||||
}
|
||||
|
||||
String str = text.substr(begin - text.data(), pos - begin);
|
||||
|
||||
begin = pos;
|
||||
while (isWhitespaceASCII(*begin) || *begin == ';')
|
||||
++begin;
|
||||
|
||||
TestHint test_hint(test_mode, str);
|
||||
expected_client_error = test_hint.clientError();
|
||||
expected_server_error = test_hint.serverError();
|
||||
|
||||
try
|
||||
{
|
||||
auto ast_to_process = ast;
|
||||
if (insert && insert->data)
|
||||
ast_to_process = nullptr;
|
||||
|
||||
if (!processSingleQuery(str, ast_to_process) && !ignore_error)
|
||||
return false;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
last_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
|
||||
actual_client_error = last_exception->code();
|
||||
if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error))
|
||||
std::cerr << "Error on processing query: " << str << std::endl << last_exception->message();
|
||||
got_exception = true;
|
||||
}
|
||||
|
||||
if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception))
|
||||
connection->forceConnected(connection_parameters.timeouts);
|
||||
|
||||
if (got_exception && !ignore_error)
|
||||
{
|
||||
if (is_interactive)
|
||||
break;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
processSingleQuery(text);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return processSingleQuery(text);
|
||||
|
||||
return processMultiQuery(text);
|
||||
}
|
||||
|
||||
bool processMultiQuery(const String & text)
|
||||
{
|
||||
const bool test_mode = config().has("testmode");
|
||||
|
||||
{ /// disable logs if expects errors
|
||||
TestHint test_hint(test_mode, text);
|
||||
if (test_hint.clientError() || test_hint.serverError())
|
||||
processSingleQuery("SET send_logs_level = 'none'");
|
||||
}
|
||||
|
||||
/// Several queries separated by ';'.
|
||||
/// INSERT data is ended by the end of line, not ';'.
|
||||
|
||||
const char * begin = text.data();
|
||||
const char * end = begin + text.size();
|
||||
|
||||
while (begin < end)
|
||||
{
|
||||
const char * pos = begin;
|
||||
ASTPtr orig_ast = parseQuery(pos, end, true);
|
||||
|
||||
if (!orig_ast)
|
||||
{
|
||||
if (ignore_error)
|
||||
{
|
||||
Tokens tokens(begin, end);
|
||||
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
|
||||
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
||||
++token_iterator;
|
||||
begin = token_iterator->end;
|
||||
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
auto * insert = orig_ast->as<ASTInsertQuery>();
|
||||
|
||||
if (insert && insert->data)
|
||||
{
|
||||
pos = find_first_symbols<'\n'>(insert->data, end);
|
||||
insert->end = pos;
|
||||
}
|
||||
|
||||
String str = text.substr(begin - text.data(), pos - begin);
|
||||
|
||||
begin = pos;
|
||||
while (isWhitespaceASCII(*begin) || *begin == ';')
|
||||
++begin;
|
||||
|
||||
TestHint test_hint(test_mode, str);
|
||||
expected_client_error = test_hint.clientError();
|
||||
expected_server_error = test_hint.serverError();
|
||||
|
||||
try
|
||||
{
|
||||
auto ast_to_process = orig_ast;
|
||||
if (insert && insert->data)
|
||||
ast_to_process = nullptr;
|
||||
|
||||
processSingleQuery(str, ast_to_process);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
last_exception_received_from_server = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
|
||||
actual_client_error = last_exception_received_from_server->code();
|
||||
if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error))
|
||||
std::cerr << "Error on processing query: " << str << std::endl << last_exception_received_from_server->message();
|
||||
received_exception_from_server = true;
|
||||
}
|
||||
|
||||
if (!test_hint.checkActual(actual_server_error, actual_client_error, received_exception_from_server, last_exception_received_from_server))
|
||||
connection->forceConnected(connection_parameters.timeouts);
|
||||
|
||||
if (received_exception_from_server && !ignore_error)
|
||||
{
|
||||
if (is_interactive)
|
||||
break;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr)
|
||||
void processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr)
|
||||
{
|
||||
resetOutput();
|
||||
got_exception = false;
|
||||
received_exception_from_server = false;
|
||||
|
||||
if (echo_queries)
|
||||
{
|
||||
@ -880,7 +883,7 @@ private:
|
||||
}
|
||||
|
||||
if (!parsed_query)
|
||||
return true;
|
||||
return;
|
||||
|
||||
processed_rows = 0;
|
||||
progress.reset();
|
||||
@ -924,7 +927,7 @@ private:
|
||||
}
|
||||
|
||||
/// Do not change context (current DB, settings) in case of an exception.
|
||||
if (!got_exception)
|
||||
if (!received_exception_from_server)
|
||||
{
|
||||
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
|
||||
{
|
||||
@ -962,8 +965,6 @@ private:
|
||||
{
|
||||
std::cerr << watch.elapsedSeconds() << "\n";
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -998,9 +999,11 @@ private:
|
||||
query = serializeAST(*parsed_query);
|
||||
}
|
||||
|
||||
static constexpr size_t max_retries = 10;
|
||||
for (size_t retry = 0; retry < max_retries; ++retry)
|
||||
int retries_left = 10;
|
||||
for (;;)
|
||||
{
|
||||
assert(retries_left > 0);
|
||||
|
||||
try
|
||||
{
|
||||
connection->sendQuery(
|
||||
@ -1019,11 +1022,19 @@ private:
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// Retry when the server said "Client should retry" and no rows has been received yet.
|
||||
if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && retry + 1 < max_retries)
|
||||
continue;
|
||||
|
||||
throw;
|
||||
/// Retry when the server said "Client should retry" and no rows
|
||||
/// has been received yet.
|
||||
if (processed_rows == 0
|
||||
&& e.code() == ErrorCodes::DEADLOCK_AVOIDED
|
||||
&& --retries_left)
|
||||
{
|
||||
std::cerr << "Got a transient error from the server, will"
|
||||
<< " retry (" << retries_left << " retries left)";
|
||||
}
|
||||
else
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1310,8 +1321,8 @@ private:
|
||||
return true;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
onException(*packet.exception);
|
||||
last_exception = std::move(packet.exception);
|
||||
onReceiveExceptionFromServer(*packet.exception);
|
||||
last_exception_received_from_server = std::move(packet.exception);
|
||||
return false;
|
||||
|
||||
case Protocol::Server::Log:
|
||||
@ -1342,8 +1353,8 @@ private:
|
||||
return true;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
onException(*packet.exception);
|
||||
last_exception = std::move(packet.exception);
|
||||
onReceiveExceptionFromServer(*packet.exception);
|
||||
last_exception_received_from_server = std::move(packet.exception);
|
||||
return false;
|
||||
|
||||
case Protocol::Server::Log:
|
||||
@ -1376,8 +1387,8 @@ private:
|
||||
return true;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
onException(*packet.exception);
|
||||
last_exception = std::move(packet.exception);
|
||||
onReceiveExceptionFromServer(*packet.exception);
|
||||
last_exception_received_from_server = std::move(packet.exception);
|
||||
return false;
|
||||
|
||||
case Protocol::Server::Log:
|
||||
@ -1660,10 +1671,10 @@ private:
|
||||
}
|
||||
|
||||
|
||||
void onException(const Exception & e)
|
||||
void onReceiveExceptionFromServer(const Exception & e)
|
||||
{
|
||||
resetOutput();
|
||||
got_exception = true;
|
||||
received_exception_from_server = true;
|
||||
|
||||
actual_server_error = e.code();
|
||||
if (expected_server_error)
|
||||
|
Loading…
Reference in New Issue
Block a user