Reorganize multiquery processing

This commit is contained in:
kssenii 2021-08-20 17:10:59 +03:00
parent 41d735fd56
commit 21f9622cad
4 changed files with 279 additions and 259 deletions

View File

@ -37,6 +37,7 @@ protected:
private:
void connect() override;
void printChangedSettings() const;
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);

View File

@ -37,11 +37,6 @@ public:
protected:
void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
void connect() override
{
}
void processError(const String & query) const override;
void loadSuggestionData(Suggest &) override;
String getQueryTextPrefix() override;

View File

@ -909,148 +909,95 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
bool ClientBase::processMultiQueryImpl(const String & all_queries_text,
std::function<void(const String &, const String &, ASTPtr)> execute_single_query,
std::function<void(const String &, Exception &)> process_parse_query_error)
ClientBase::MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
std::optional<Exception> & current_exception)
{
if (this_query_begin >= all_queries_end)
return MultiQueryProcessingStage::QUERIES_END;
/// Several queries separated by ';'.
/// INSERT data is ended by the end of line, not ';'.
/// An exception is VALUES format where we also support semicolon in
/// addition to end of line.
const char * this_query_begin = all_queries_text.data();
const char * all_queries_end = all_queries_text.data() + all_queries_text.size();
// Remove leading empty newlines and other whitespace, because they
// are annoying to filter in query log. This is mostly relevant for
// the tests.
while (this_query_begin < all_queries_end && isWhitespaceASCII(*this_query_begin))
++this_query_begin;
String full_query; // full_query is the query + inline INSERT data + trailing comments (the latter is our best guess for now).
String query_to_execute;
ASTPtr parsed_query;
if (this_query_begin >= all_queries_end)
return MultiQueryProcessingStage::QUERIES_END;
while (this_query_begin < all_queries_end)
// If there are only comments left until the end of file, we just
// stop. The parser can't handle this situation because it always
// expects that there is some query that it can parse.
// We can get into this situation because the parser also doesn't
// skip the trailing comments after parsing a query. This is because
// they may as well be the leading comments for the next query,
// and it makes more sense to treat them as such.
{
// Remove leading empty newlines and other whitespace, because they
// are annoying to filter in query log. This is mostly relevant for
// the tests.
while (this_query_begin < all_queries_end && isWhitespaceASCII(*this_query_begin))
++this_query_begin;
Tokens tokens(this_query_begin, all_queries_end);
IParser::Pos token_iterator(tokens, global_context->getSettingsRef().max_parser_depth);
if (!token_iterator.isValid())
return MultiQueryProcessingStage::QUERIES_END;
}
if (this_query_begin >= all_queries_end)
break;
this_query_end = this_query_begin;
try
{
parsed_query = parseQuery(this_query_end, all_queries_end, true);
}
catch (Exception & e)
{
current_exception.emplace(e);
return MultiQueryProcessingStage::PARSING_EXCEPTION;
}
// If there are only comments left until the end of file, we just
// stop. The parser can't handle this situation because it always
// expects that there is some query that it can parse.
// We can get into this situation because the parser also doesn't
// skip the trailing comments after parsing a query. This is because
// they may as well be the leading comments for the next query,
// and it makes more sense to treat them as such.
if (!parsed_query)
{
if (ignore_error)
{
Tokens tokens(this_query_begin, all_queries_end);
IParser::Pos token_iterator(tokens, global_context->getSettingsRef().max_parser_depth);
if (!token_iterator.isValid())
{
break;
}
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
++token_iterator;
this_query_begin = token_iterator->end;
return MultiQueryProcessingStage::CONTINUE_PARSING;
}
const char * this_query_end = this_query_begin;
try
{
parsed_query = parseQuery(this_query_end, all_queries_end, true);
}
catch (Exception & e)
{
this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end);
process_parse_query_error(String(this_query_begin, this_query_end - this_query_begin), e);
/// It's expected syntax error, skip the line
this_query_begin = this_query_end;
continue;
}
if (!parsed_query)
{
if (ignore_error)
{
Tokens tokens(this_query_begin, all_queries_end);
IParser::Pos token_iterator(tokens, global_context->getSettingsRef().max_parser_depth);
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
++token_iterator;
this_query_begin = token_iterator->end;
continue;
}
return true;
}
// INSERT queries may have the inserted data in the query text
// that follow the query itself, e.g. "insert into t format CSV 1;2".
// They need special handling. First of all, here we find where the
// inserted data ends. In multy-query mode, it is delimited by a
// newline.
// The VALUES format needs even more handling -- we also allow the
// data to be delimited by semicolon. This case is handled later by
// the format parser itself.
// We can't do multiline INSERTs with inline data, because most
// row input formats (e.g. TSV) can't tell when the input stops,
// unlike VALUES.
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
if (insert_ast && insert_ast->data)
{
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
insert_ast->end = this_query_end;
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), insert_ast->data - this_query_begin);
}
else
{
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
}
// Try to include the trailing comment with test hints. It is just
// a guess for now, because we don't yet know where the query ends
// if it is an INSERT query with inline data. We will do it again
// after we have processed the query. But even this guess is
// beneficial so that we see proper trailing comments in "echo" and
// server log.
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
if (query_fuzzer_runs)
{
if (!processWithFuzzing(full_query))
return false;
this_query_begin = this_query_end;
continue;
}
execute_single_query(String(this_query_begin, this_query_end - this_query_begin), query_to_execute, parsed_query);
// For INSERTs with inline data: use the end of inline data as
// reported by the format parser (it is saved in sendData()).
// This allows us to handle queries like:
// insert into t values (1); select 1
// , where the inline data is delimited by semicolon and not by a
// newline.
if (insert_ast && insert_ast->data)
{
this_query_end = insert_ast->end;
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
}
// Report error.
if (have_error)
processError(full_query);
// Stop processing queries if needed.
if (have_error && !ignore_error)
{
if (is_interactive)
break;
else
return false;
}
this_query_begin = this_query_end;
return MultiQueryProcessingStage::PARSING_FAILED;
}
return true;
// INSERT queries may have the inserted data in the query text
// that follow the query itself, e.g. "insert into t format CSV 1;2".
// They need special handling. First of all, here we find where the
// inserted data ends. In multy-query mode, it is delimited by a
// newline.
// The VALUES format needs even more handling -- we also allow the
// data to be delimited by semicolon. This case is handled later by
// the format parser itself.
// We can't do multiline INSERTs with inline data, because most
// row input formats (e.g. TSV) can't tell when the input stops,
// unlike VALUES.
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
if (insert_ast && insert_ast->data)
{
this_query_end = find_first_symbols<'\n'>(insert_ast->data, all_queries_end);
insert_ast->end = this_query_end;
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), insert_ast->data - this_query_begin);
}
else
{
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
}
// Try to include the trailing comment with test hints. It is just
// a guess for now, because we don't yet know where the query ends
// if it is an INSERT query with inline data. We will do it again
// after we have processed the query. But even this guess is
// beneficial so that we see proper trailing comments in "echo" and
// server log.
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
return MultiQueryProcessingStage::EXECUTE_QUERY;
}
@ -1091,114 +1038,185 @@ bool ClientBase::processMultiQuery(const String & all_queries_text)
}
bool echo_query = echo_queries;
auto process_single_query = [&](const String & full_query, const String & query_to_execute, ASTPtr parsed_query)
{
// Now we know for sure where the query ends.
// Look for the hint in the text of query + insert data + trailing
// comments,
// e.g. insert into t format CSV 'a' -- { serverError 123 }.
// Use the updated query boundaries we just calculated.
TestHint test_hint(test_mode, full_query);
// Echo all queries if asked; makes for a more readable reference
// file.
echo_query = test_hint.echoQueries().value_or(echo_query);
try
{
processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false);
}
catch (...)
{
// Surprisingly, this is a client error. A server error would
// have been reported w/o throwing (see onReceiveSeverException()).
client_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
have_error = true;
}
// Check whether the error (or its absence) matches the test hints
// (or their absence).
bool error_matches_hint = true;
if (have_error)
{
if (test_hint.serverError())
{
if (!server_exception)
{
error_matches_hint = false;
fmt::print(stderr, "Expected server error code '{}' but got no server error.\n", test_hint.serverError());
}
else if (server_exception->code() != test_hint.serverError())
{
error_matches_hint = false;
std::cerr << "Expected server error code: " << test_hint.serverError() << " but got: " << server_exception->code()
<< "." << std::endl;
}
}
if (test_hint.clientError())
{
if (!client_exception)
{
error_matches_hint = false;
fmt::print(stderr, "Expected client error code '{}' but got no client error.\n", test_hint.clientError());
}
else if (client_exception->code() != test_hint.clientError())
{
error_matches_hint = false;
fmt::print(
stderr, "Expected client error code '{}' but got '{}'.\n", test_hint.clientError(), client_exception->code());
}
}
if (!test_hint.clientError() && !test_hint.serverError())
{
// No error was expected but it still occurred. This is the
// default case w/o test hint, doesn't need additional
// diagnostics.
error_matches_hint = false;
}
}
else
{
if (test_hint.clientError())
{
fmt::print(stderr, "The query succeeded but the client error '{}' was expected.\n", test_hint.clientError());
error_matches_hint = false;
}
if (test_hint.serverError())
{
fmt::print(stderr, "The query succeeded but the server error '{}' was expected.\n", test_hint.serverError());
error_matches_hint = false;
}
}
// If the error is expected, force reconnect and ignore it.
if (have_error && error_matches_hint)
{
client_exception.reset();
server_exception.reset();
have_error = false;
if (!connection->checkConnected())
connect();
}
};
auto process_parse_query_error = [&](const String & query, Exception & e)
{
// Try to find test hint for syntax error. We don't know where
// the query ends because we failed to parse it, so we consume
// the entire line.
TestHint hint(test_mode, query);
if (hint.serverError())
{
// Syntax errors are considered as client errors
e.addMessage("\nExpected server error '{}'.", hint.serverError());
throw;
}
if (hint.clientError() != e.code())
{
if (hint.clientError())
e.addMessage("\nExpected client error: " + std::to_string(hint.clientError()));
throw;
}
};
/// Several queries separated by ';'.
/// INSERT data is ended by the end of line, not ';'.
/// An exception is VALUES format where we also support semicolon in
/// addition to end of line.
const char * this_query_begin = all_queries_text.data();
const char * this_query_end;
const char * all_queries_end = all_queries_text.data() + all_queries_text.size();
return processMultiQueryImpl(all_queries_text, process_single_query, process_parse_query_error);
String full_query; // full_query is the query + inline INSERT data + trailing comments (the latter is our best guess for now).
String query_to_execute;
ASTPtr parsed_query;
std::optional<Exception> current_exception;
while (true)
{
auto stage = analyzeMultiQueryText(this_query_begin, this_query_end, all_queries_end,
query_to_execute, parsed_query, all_queries_text, current_exception);
switch (stage)
{
case MultiQueryProcessingStage::QUERIES_END:
case MultiQueryProcessingStage::PARSING_FAILED:
{
return true;
}
case MultiQueryProcessingStage::CONTINUE_PARSING:
{
continue;
}
case MultiQueryProcessingStage::PARSING_EXCEPTION:
{
this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end);
// Try to find test hint for syntax error. We don't know where
// the query ends because we failed to parse it, so we consume
// the entire line.
TestHint hint(test_mode, String(this_query_begin, this_query_end - this_query_begin));
if (hint.serverError())
{
// Syntax errors are considered as client errors
current_exception->addMessage("\nExpected server error '{}'.", hint.serverError());
current_exception->rethrow();
}
if (hint.clientError() != current_exception->code())
{
if (hint.clientError())
current_exception->addMessage("\nExpected client error: " + std::to_string(hint.clientError()));
current_exception->rethrow();
}
/// It's expected syntax error, skip the line
this_query_begin = this_query_end;
current_exception.reset();
continue;
}
case MultiQueryProcessingStage::EXECUTE_QUERY:
{
full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
if (query_fuzzer_runs)
{
if (!processWithFuzzing(full_query))
return false;
this_query_begin = this_query_end;
continue;
}
// Now we know for sure where the query ends.
// Look for the hint in the text of query + insert data + trailing
// comments,
// e.g. insert into t format CSV 'a' -- { serverError 123 }.
// Use the updated query boundaries we just calculated.
TestHint test_hint(test_mode, full_query);
// Echo all queries if asked; makes for a more readable reference
// file.
echo_query = test_hint.echoQueries().value_or(echo_query);
try
{
processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false);
}
catch (...)
{
// Surprisingly, this is a client error. A server error would
// have been reported w/o throwing (see onReceiveSeverException()).
client_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
have_error = true;
}
// Check whether the error (or its absence) matches the test hints
// (or their absence).
bool error_matches_hint = true;
if (have_error)
{
if (test_hint.serverError())
{
if (!server_exception)
{
error_matches_hint = false;
fmt::print(stderr, "Expected server error code '{}' but got no server error.\n", test_hint.serverError());
}
else if (server_exception->code() != test_hint.serverError())
{
error_matches_hint = false;
std::cerr << "Expected server error code: " << test_hint.serverError() << " but got: " << server_exception->code()
<< "." << std::endl;
}
}
if (test_hint.clientError())
{
if (!client_exception)
{
error_matches_hint = false;
fmt::print(stderr, "Expected client error code '{}' but got no client error.\n", test_hint.clientError());
}
else if (client_exception->code() != test_hint.clientError())
{
error_matches_hint = false;
fmt::print(
stderr, "Expected client error code '{}' but got '{}'.\n", test_hint.clientError(), client_exception->code());
}
}
if (!test_hint.clientError() && !test_hint.serverError())
{
// No error was expected but it still occurred. This is the
// default case w/o test hint, doesn't need additional
// diagnostics.
error_matches_hint = false;
}
}
else
{
if (test_hint.clientError())
{
fmt::print(stderr, "The query succeeded but the client error '{}' was expected.\n", test_hint.clientError());
error_matches_hint = false;
}
if (test_hint.serverError())
{
fmt::print(stderr, "The query succeeded but the server error '{}' was expected.\n", test_hint.serverError());
error_matches_hint = false;
}
}
// If the error is expected, force reconnect and ignore it.
if (have_error && error_matches_hint)
{
client_exception.reset();
server_exception.reset();
have_error = false;
if (!connection->checkConnected())
connect();
}
// For INSERTs with inline data: use the end of inline data as
// reported by the format parser (it is saved in sendData()).
// This allows us to handle queries like:
// insert into t values (1); select 1
// , where the inline data is delimited by semicolon and not by a
// newline.
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
if (insert_ast && insert_ast->data)
{
this_query_end = insert_ast->end;
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
}
// Report error.
if (have_error)
processError(full_query);
// Stop processing queries if needed.
if (have_error && !ignore_error)
return is_interactive;
this_query_begin = this_query_end;
break;
}
}
}
}

View File

@ -14,15 +14,11 @@ namespace po = boost::program_options;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
}
namespace DB
{
class ClientBase : public Poco::Util::Application
{
@ -37,29 +33,39 @@ protected:
void runInteractive();
void runNonInteractive();
/// Initialize `connection` object with `Connection` or `LocalConnection`.
virtual void connect() = 0;
/// Process single query. Can use processOrdinaryQuery(), processInsertQuery().
virtual void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0;
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
virtual bool processWithFuzzing(const String &)
{
throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
/// Process single query.
virtual void executeSingleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0;
/// Methods, which can be called from executeSingleQuery.
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
void processParsedSingleQuery(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
virtual void processError(const String & query) const = 0;
virtual void loadSuggestionData(Suggest &) = 0;
bool processMultiQueryImpl(const String & all_queries_text,
std::function<void(const String & full_query, const String & query_to_execute, ASTPtr parsed_query)> execute_single_query,
std::function<void(const String &, Exception &)> process_parse_query_error = {});
enum MultiQueryProcessingStage
{
QUERIES_END,
PARSING_EXCEPTION,
CONTINUE_PARSING,
EXECUTE_QUERY,
PARSING_FAILED,
};
void processParsedSingleQuery(const String & query, const String & query_to_execute, ASTPtr parsed_query,
std::optional<bool> echo_query_ = {}, bool report_error = false);
virtual void connect() {}
MultiQueryProcessingStage analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
std::optional<Exception> & current_exception);
/// For non-interactive multi-query mode get queries text prefix.
virtual String getQueryTextPrefix() { return ""; }