mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #11976 from ClickHouse/aku/client-cleanup
Minor cleanup in Client.cpp before fuzzing
This commit is contained in:
commit
e7e8ee79df
@ -132,7 +132,12 @@ private:
|
|||||||
|
|
||||||
std::unique_ptr<Connection> connection; /// Connection to DB.
|
std::unique_ptr<Connection> connection; /// Connection to DB.
|
||||||
String query_id; /// Current query_id.
|
String query_id; /// Current query_id.
|
||||||
String query; /// Current query.
|
String full_query; /// Current query as it was given to the client.
|
||||||
|
|
||||||
|
// Current query as it will be sent to the server. It may differ from the
|
||||||
|
// full query for INSERT queries, for which the data that follows the query
|
||||||
|
// is stripped and sent separately.
|
||||||
|
String query_to_send;
|
||||||
|
|
||||||
String format; /// Query results output format.
|
String format; /// Query results output format.
|
||||||
bool is_default_format = true; /// false, if format is set in the config or command line.
|
bool is_default_format = true; /// false, if format is set in the config or command line.
|
||||||
@ -177,10 +182,10 @@ private:
|
|||||||
ASTPtr parsed_query;
|
ASTPtr parsed_query;
|
||||||
|
|
||||||
/// The last exception that was received from the server. Is used for the return code in batch mode.
|
/// The last exception that was received from the server. Is used for the return code in batch mode.
|
||||||
std::unique_ptr<Exception> last_exception;
|
std::unique_ptr<Exception> last_exception_received_from_server;
|
||||||
|
|
||||||
/// If the last query resulted in exception.
|
/// If the last query resulted in exception.
|
||||||
bool got_exception = false;
|
bool received_exception_from_server = false;
|
||||||
int expected_server_error = 0;
|
int expected_server_error = 0;
|
||||||
int expected_client_error = 0;
|
int expected_client_error = 0;
|
||||||
int actual_server_error = 0;
|
int actual_server_error = 0;
|
||||||
@ -616,7 +621,7 @@ private:
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!process(input))
|
if (!processQueryText(input))
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
@ -657,8 +662,8 @@ private:
|
|||||||
nonInteractive();
|
nonInteractive();
|
||||||
|
|
||||||
/// If exception code isn't zero, we should return non-zero return code anyway.
|
/// If exception code isn't zero, we should return non-zero return code anyway.
|
||||||
if (last_exception)
|
if (last_exception_received_from_server)
|
||||||
return last_exception->code() != 0 ? last_exception->code() : -1;
|
return last_exception_received_from_server->code() != 0 ? last_exception_received_from_server->code() : -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -753,135 +758,163 @@ private:
|
|||||||
readStringUntilEOF(text, in);
|
readStringUntilEOF(text, in);
|
||||||
}
|
}
|
||||||
|
|
||||||
process(text);
|
processQueryText(text);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool processQueryText(const String & text)
|
||||||
bool process(const String & text)
|
|
||||||
{
|
{
|
||||||
if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; })))
|
if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; })))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
const bool test_mode = config().has("testmode");
|
if (!config().has("multiquery"))
|
||||||
if (config().has("multiquery"))
|
|
||||||
{
|
{
|
||||||
{ /// disable logs if expects errors
|
processTextAsSingleQuery(text);
|
||||||
TestHint test_hint(test_mode, text);
|
|
||||||
if (test_hint.clientError() || test_hint.serverError())
|
|
||||||
process("SET send_logs_level = 'none'");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Several queries separated by ';'.
|
|
||||||
/// INSERT data is ended by the end of line, not ';'.
|
|
||||||
|
|
||||||
const char * begin = text.data();
|
|
||||||
const char * end = begin + text.size();
|
|
||||||
|
|
||||||
while (begin < end)
|
|
||||||
{
|
|
||||||
const char * pos = begin;
|
|
||||||
ASTPtr ast = parseQuery(pos, end, true);
|
|
||||||
|
|
||||||
if (!ast)
|
|
||||||
{
|
|
||||||
if (ignore_error)
|
|
||||||
{
|
|
||||||
Tokens tokens(begin, end);
|
|
||||||
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
|
|
||||||
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
|
||||||
++token_iterator;
|
|
||||||
begin = token_iterator->end;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto * insert = ast->as<ASTInsertQuery>();
|
|
||||||
|
|
||||||
if (insert && insert->data)
|
|
||||||
{
|
|
||||||
pos = find_first_symbols<'\n'>(insert->data, end);
|
|
||||||
insert->end = pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
String str = text.substr(begin - text.data(), pos - begin);
|
|
||||||
|
|
||||||
begin = pos;
|
|
||||||
while (isWhitespaceASCII(*begin) || *begin == ';')
|
|
||||||
++begin;
|
|
||||||
|
|
||||||
TestHint test_hint(test_mode, str);
|
|
||||||
expected_client_error = test_hint.clientError();
|
|
||||||
expected_server_error = test_hint.serverError();
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
auto ast_to_process = ast;
|
|
||||||
if (insert && insert->data)
|
|
||||||
ast_to_process = nullptr;
|
|
||||||
|
|
||||||
if (!processSingleQuery(str, ast_to_process) && !ignore_error)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
last_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
|
|
||||||
actual_client_error = last_exception->code();
|
|
||||||
if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error))
|
|
||||||
std::cerr << "Error on processing query: " << str << std::endl << last_exception->message();
|
|
||||||
got_exception = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception))
|
|
||||||
connection->forceConnected(connection_parameters.timeouts);
|
|
||||||
|
|
||||||
if (got_exception && !ignore_error)
|
|
||||||
{
|
|
||||||
if (is_interactive)
|
|
||||||
break;
|
|
||||||
else
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
return processMultiQuery(text);
|
||||||
return processSingleQuery(text);
|
}
|
||||||
|
|
||||||
|
bool processMultiQuery(const String & text)
|
||||||
|
{
|
||||||
|
const bool test_mode = config().has("testmode");
|
||||||
|
|
||||||
|
{ /// disable logs if expects errors
|
||||||
|
TestHint test_hint(test_mode, text);
|
||||||
|
if (test_hint.clientError() || test_hint.serverError())
|
||||||
|
processTextAsSingleQuery("SET send_logs_level = 'none'");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Several queries separated by ';'.
|
||||||
|
/// INSERT data is ended by the end of line, not ';'.
|
||||||
|
|
||||||
|
const char * begin = text.data();
|
||||||
|
const char * end = begin + text.size();
|
||||||
|
|
||||||
|
while (begin < end)
|
||||||
|
{
|
||||||
|
const char * pos = begin;
|
||||||
|
ASTPtr orig_ast = parseQuery(pos, end, true);
|
||||||
|
|
||||||
|
if (!orig_ast)
|
||||||
|
{
|
||||||
|
if (ignore_error)
|
||||||
|
{
|
||||||
|
Tokens tokens(begin, end);
|
||||||
|
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
|
||||||
|
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
||||||
|
++token_iterator;
|
||||||
|
begin = token_iterator->end;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto * insert = orig_ast->as<ASTInsertQuery>();
|
||||||
|
|
||||||
|
if (insert && insert->data)
|
||||||
|
{
|
||||||
|
pos = find_first_symbols<'\n'>(insert->data, end);
|
||||||
|
insert->end = pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
String str = text.substr(begin - text.data(), pos - begin);
|
||||||
|
|
||||||
|
begin = pos;
|
||||||
|
while (isWhitespaceASCII(*begin) || *begin == ';')
|
||||||
|
++begin;
|
||||||
|
|
||||||
|
TestHint test_hint(test_mode, str);
|
||||||
|
expected_client_error = test_hint.clientError();
|
||||||
|
expected_server_error = test_hint.serverError();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
auto ast_to_process = orig_ast;
|
||||||
|
if (insert && insert->data)
|
||||||
|
{
|
||||||
|
ast_to_process = nullptr;
|
||||||
|
processTextAsSingleQuery(str);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
parsed_query = ast_to_process;
|
||||||
|
full_query = str;
|
||||||
|
query_to_send = str;
|
||||||
|
processParsedSingleQuery();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
last_exception_received_from_server = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
|
||||||
|
actual_client_error = last_exception_received_from_server->code();
|
||||||
|
if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error))
|
||||||
|
std::cerr << "Error on processing query: " << str << std::endl << last_exception_received_from_server->message();
|
||||||
|
received_exception_from_server = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!test_hint.checkActual(actual_server_error, actual_client_error, received_exception_from_server, last_exception_received_from_server))
|
||||||
|
connection->forceConnected(connection_parameters.timeouts);
|
||||||
|
|
||||||
|
if (received_exception_from_server && !ignore_error)
|
||||||
|
{
|
||||||
|
if (is_interactive)
|
||||||
|
break;
|
||||||
|
else
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr)
|
void processTextAsSingleQuery(const String & text_)
|
||||||
|
{
|
||||||
|
full_query = text_;
|
||||||
|
|
||||||
|
/// Some parts of a query (result output and formatting) are executed
|
||||||
|
/// client-side. Thus we need to parse the query.
|
||||||
|
const char * begin = full_query.data();
|
||||||
|
parsed_query = parseQuery(begin, begin + full_query.size(), false);
|
||||||
|
|
||||||
|
if (!parsed_query)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// An INSERT query may have the data that follow query text. Remove the
|
||||||
|
/// Send part of query without data, because data will be sent separately.
|
||||||
|
auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||||
|
if (insert && insert->data)
|
||||||
|
{
|
||||||
|
query_to_send = full_query.substr(0, insert->data - full_query.data());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
query_to_send = full_query;
|
||||||
|
}
|
||||||
|
|
||||||
|
processParsedSingleQuery();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parameters are in global variables:
|
||||||
|
// 'parsed_query' -- the query AST,
|
||||||
|
// 'query_to_send' -- the query text that is sent to server,
|
||||||
|
// 'full_query' -- for INSERT queries, contains the query and the data that
|
||||||
|
// follow it. Its memory is referenced by ASTInsertQuery::begin, end.
|
||||||
|
void processParsedSingleQuery()
|
||||||
{
|
{
|
||||||
resetOutput();
|
resetOutput();
|
||||||
got_exception = false;
|
received_exception_from_server = false;
|
||||||
|
|
||||||
if (echo_queries)
|
if (echo_queries)
|
||||||
{
|
{
|
||||||
writeString(line, std_out);
|
writeString(full_query, std_out);
|
||||||
writeChar('\n', std_out);
|
writeChar('\n', std_out);
|
||||||
std_out.next();
|
std_out.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
watch.restart();
|
watch.restart();
|
||||||
|
|
||||||
query = line;
|
|
||||||
|
|
||||||
/// Some parts of a query (result output and formatting) are executed client-side.
|
|
||||||
/// Thus we need to parse the query.
|
|
||||||
parsed_query = parsed_query_;
|
|
||||||
if (!parsed_query)
|
|
||||||
{
|
|
||||||
const char * begin = query.data();
|
|
||||||
parsed_query = parseQuery(begin, begin + query.size(), false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!parsed_query)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
processed_rows = 0;
|
processed_rows = 0;
|
||||||
progress.reset();
|
progress.reset();
|
||||||
show_progress_bar = false;
|
show_progress_bar = false;
|
||||||
@ -924,7 +957,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Do not change context (current DB, settings) in case of an exception.
|
/// Do not change context (current DB, settings) in case of an exception.
|
||||||
if (!got_exception)
|
if (!received_exception_from_server)
|
||||||
{
|
{
|
||||||
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
|
if (const auto * set_query = parsed_query->as<ASTSetQuery>())
|
||||||
{
|
{
|
||||||
@ -962,8 +995,6 @@ private:
|
|||||||
{
|
{
|
||||||
std::cerr << watch.elapsedSeconds() << "\n";
|
std::cerr << watch.elapsedSeconds() << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -995,17 +1026,19 @@ private:
|
|||||||
visitor.visit(parsed_query);
|
visitor.visit(parsed_query);
|
||||||
|
|
||||||
/// Get new query after substitutions. Note that it cannot be done for INSERT query with embedded data.
|
/// Get new query after substitutions. Note that it cannot be done for INSERT query with embedded data.
|
||||||
query = serializeAST(*parsed_query);
|
query_to_send = serializeAST(*parsed_query);
|
||||||
}
|
}
|
||||||
|
|
||||||
static constexpr size_t max_retries = 10;
|
int retries_left = 10;
|
||||||
for (size_t retry = 0; retry < max_retries; ++retry)
|
for (;;)
|
||||||
{
|
{
|
||||||
|
assert(retries_left > 0);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
connection->sendQuery(
|
connection->sendQuery(
|
||||||
connection_parameters.timeouts,
|
connection_parameters.timeouts,
|
||||||
query,
|
query_to_send,
|
||||||
query_id,
|
query_id,
|
||||||
QueryProcessingStage::Complete,
|
QueryProcessingStage::Complete,
|
||||||
&context.getSettingsRef(),
|
&context.getSettingsRef(),
|
||||||
@ -1019,11 +1052,19 @@ private:
|
|||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
/// Retry when the server said "Client should retry" and no rows has been received yet.
|
/// Retry when the server said "Client should retry" and no rows
|
||||||
if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && retry + 1 < max_retries)
|
/// has been received yet.
|
||||||
continue;
|
if (processed_rows == 0
|
||||||
|
&& e.code() == ErrorCodes::DEADLOCK_AVOIDED
|
||||||
throw;
|
&& --retries_left)
|
||||||
|
{
|
||||||
|
std::cerr << "Got a transient error from the server, will"
|
||||||
|
<< " retry (" << retries_left << " retries left)";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1032,18 +1073,13 @@ private:
|
|||||||
/// Process the query that requires transferring data blocks to the server.
|
/// Process the query that requires transferring data blocks to the server.
|
||||||
void processInsertQuery()
|
void processInsertQuery()
|
||||||
{
|
{
|
||||||
/// Send part of query without data, because data will be sent separately.
|
const auto parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
||||||
const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
|
|
||||||
String query_without_data = parsed_insert_query.data
|
|
||||||
? query.substr(0, parsed_insert_query.data - query.data())
|
|
||||||
: query;
|
|
||||||
|
|
||||||
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
|
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
|
||||||
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
|
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
|
||||||
|
|
||||||
connection->sendQuery(
|
connection->sendQuery(
|
||||||
connection_parameters.timeouts,
|
connection_parameters.timeouts,
|
||||||
query_without_data,
|
query_to_send,
|
||||||
query_id,
|
query_id,
|
||||||
QueryProcessingStage::Complete,
|
QueryProcessingStage::Complete,
|
||||||
&context.getSettingsRef(),
|
&context.getSettingsRef(),
|
||||||
@ -1310,8 +1346,8 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
|
|
||||||
case Protocol::Server::Exception:
|
case Protocol::Server::Exception:
|
||||||
onException(*packet.exception);
|
onReceiveExceptionFromServer(*packet.exception);
|
||||||
last_exception = std::move(packet.exception);
|
last_exception_received_from_server = std::move(packet.exception);
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
case Protocol::Server::Log:
|
case Protocol::Server::Log:
|
||||||
@ -1342,8 +1378,8 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
|
|
||||||
case Protocol::Server::Exception:
|
case Protocol::Server::Exception:
|
||||||
onException(*packet.exception);
|
onReceiveExceptionFromServer(*packet.exception);
|
||||||
last_exception = std::move(packet.exception);
|
last_exception_received_from_server = std::move(packet.exception);
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
case Protocol::Server::Log:
|
case Protocol::Server::Log:
|
||||||
@ -1376,8 +1412,8 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
|
|
||||||
case Protocol::Server::Exception:
|
case Protocol::Server::Exception:
|
||||||
onException(*packet.exception);
|
onReceiveExceptionFromServer(*packet.exception);
|
||||||
last_exception = std::move(packet.exception);
|
last_exception_received_from_server = std::move(packet.exception);
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
case Protocol::Server::Log:
|
case Protocol::Server::Log:
|
||||||
@ -1660,10 +1696,10 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void onException(const Exception & e)
|
void onReceiveExceptionFromServer(const Exception & e)
|
||||||
{
|
{
|
||||||
resetOutput();
|
resetOutput();
|
||||||
got_exception = true;
|
received_exception_from_server = true;
|
||||||
|
|
||||||
actual_server_error = e.code();
|
actual_server_error = e.code();
|
||||||
if (expected_server_error)
|
if (expected_server_error)
|
||||||
|
Loading…
Reference in New Issue
Block a user