fix a test, add retries for sql tests

This commit is contained in:
Alexander Tokmakov 2024-07-25 00:56:41 +02:00
parent 3d00badef7
commit 8df648b3c8
4 changed files with 76 additions and 8 deletions

View File

@ -2230,6 +2230,8 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
ASTPtr parsed_query; ASTPtr parsed_query;
std::unique_ptr<Exception> current_exception; std::unique_ptr<Exception> current_exception;
size_t retries_count = 0;
while (true) while (true)
{ {
auto stage = analyzeMultiQueryText(this_query_begin, this_query_end, all_queries_end, auto stage = analyzeMultiQueryText(this_query_begin, this_query_end, all_queries_end,
@ -2310,7 +2312,12 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
// Check whether the error (or its absence) matches the test hints // Check whether the error (or its absence) matches the test hints
// (or their absence). // (or their absence).
bool error_matches_hint = true; bool error_matches_hint = true;
if (have_error) bool need_retry = test_hint.needRetry(server_exception, &retries_count);
if (need_retry)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
else if (have_error)
{ {
if (test_hint.hasServerErrors()) if (test_hint.hasServerErrors())
{ {
@ -2404,6 +2411,7 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
if (have_error && !ignore_error) if (have_error && !ignore_error)
return is_interactive; return is_interactive;
if (!need_retry)
this_query_begin = this_query_end; this_query_begin = this_query_end;
break; break;
} }

View File

@ -10,6 +10,7 @@
namespace DB::ErrorCodes namespace DB::ErrorCodes
{ {
extern const int CANNOT_PARSE_TEXT; extern const int CANNOT_PARSE_TEXT;
extern const int OK;
} }
namespace DB namespace DB
@ -62,9 +63,28 @@ bool TestHint::hasExpectedServerError(int error)
return std::find(server_errors.begin(), server_errors.end(), error) != server_errors.end(); return std::find(server_errors.begin(), server_errors.end(), error) != server_errors.end();
} }
bool TestHint::needRetry(const std::unique_ptr<Exception> & server_exception, size_t * retries_counter)
{
chassert(retries_counter);
if (max_retries <= *retries_counter)
return false;
++*retries_counter;
int error = ErrorCodes::OK;
if (server_exception)
error = server_exception->code();
if (retry_until)
return !hasExpectedServerError(error); /// retry until we get the expected error
else
return hasExpectedServerError(error); /// retry while we have the expected error
}
void TestHint::parse(Lexer & comment_lexer, bool is_leading_hint) void TestHint::parse(Lexer & comment_lexer, bool is_leading_hint)
{ {
std::unordered_set<std::string_view> commands{"echo", "echoOn", "echoOff"}; std::unordered_set<std::string_view> commands{"echo", "echoOn", "echoOff", "retry"};
std::unordered_set<std::string_view> command_errors{ std::unordered_set<std::string_view> command_errors{
"serverError", "serverError",
@ -73,6 +93,9 @@ void TestHint::parse(Lexer & comment_lexer, bool is_leading_hint)
for (Token token = comment_lexer.nextToken(); !token.isEnd(); token = comment_lexer.nextToken()) for (Token token = comment_lexer.nextToken(); !token.isEnd(); token = comment_lexer.nextToken())
{ {
if (token.type == TokenType::Whitespace)
continue;
String item = String(token.begin, token.end); String item = String(token.begin, token.end);
if (token.type == TokenType::BareWord && commands.contains(item)) if (token.type == TokenType::BareWord && commands.contains(item))
{ {
@ -82,6 +105,30 @@ void TestHint::parse(Lexer & comment_lexer, bool is_leading_hint)
echo.emplace(true); echo.emplace(true);
if (item == "echoOff") if (item == "echoOff")
echo.emplace(false); echo.emplace(false);
if (item == "retry")
{
token = comment_lexer.nextToken();
while (token.type == TokenType::Whitespace)
token = comment_lexer.nextToken();
if (token.type != TokenType::Number)
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_TEXT, "Could not parse the number of retries: {}",
std::string_view(token.begin, token.end));
max_retries = std::stoul(std::string(token.begin, token.end));
token = comment_lexer.nextToken();
while (token.type == TokenType::Whitespace)
token = comment_lexer.nextToken();
if (token.type != TokenType::BareWord ||
(std::string_view(token.begin, token.end) != "until" &&
std::string_view(token.begin, token.end) != "while"))
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_TEXT, "Expected 'until' or 'while' after the number of retries, got: {}",
std::string_view(token.begin, token.end));
retry_until = std::string_view(token.begin, token.end) == "until";
}
} }
else if (!is_leading_hint && token.type == TokenType::BareWord && command_errors.contains(item)) else if (!is_leading_hint && token.type == TokenType::BareWord && command_errors.contains(item))
{ {
@ -133,6 +180,9 @@ void TestHint::parse(Lexer & comment_lexer, bool is_leading_hint)
break; break;
} }
} }
if (max_retries && server_errors.size() != 1)
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_TEXT, "Expected one serverError after the 'retry N while|until' command");
} }
} }

View File

@ -6,6 +6,7 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <Core/Types.h> #include <Core/Types.h>
#include <Common/Exception.h>
namespace DB namespace DB
@ -65,12 +66,17 @@ public:
bool hasExpectedClientError(int error); bool hasExpectedClientError(int error);
bool hasExpectedServerError(int error); bool hasExpectedServerError(int error);
bool needRetry(const std::unique_ptr<Exception> & server_exception, size_t * retries_counter);
private: private:
const String & query; const String & query;
ErrorVector server_errors{}; ErrorVector server_errors{};
ErrorVector client_errors{}; ErrorVector client_errors{};
std::optional<bool> echo; std::optional<bool> echo;
size_t max_retries = 0;
bool retry_until = false;
void parse(Lexer & comment_lexer, bool is_leading_hint); void parse(Lexer & comment_lexer, bool is_leading_hint);
bool allErrorsExpected(int actual_server_error, int actual_client_error) const bool allErrorsExpected(int actual_server_error, int actual_client_error) const

View File

@ -7,7 +7,7 @@ create table rmt2 (n int, m int, k int) engine=ReplicatedMergeTree('/test/02446/
settings storage_policy='s3_cache', allow_remote_fs_zero_copy_replication=1, old_parts_lifetime=0, cleanup_delay_period=0, max_cleanup_delay_period=1, cleanup_delay_period_random_add=1, min_bytes_for_wide_part=0; settings storage_policy='s3_cache', allow_remote_fs_zero_copy_replication=1, old_parts_lifetime=0, cleanup_delay_period=0, max_cleanup_delay_period=1, cleanup_delay_period_random_add=1, min_bytes_for_wide_part=0;
-- FIXME zero-copy locks may remain in ZooKeeper forever if we failed to insert a part. -- FIXME zero-copy locks may remain in ZooKeeper forever if we failed to insert a part.
-- Probably that's why we have to replace repsistent lock with ephemeral sometimes. -- Probably that's why we have to replace persistent lock with ephemeral sometimes.
-- See also "Replacing persistent lock with ephemeral for path {}. It can happen only in case of local part loss" -- See also "Replacing persistent lock with ephemeral for path {}. It can happen only in case of local part loss"
-- in StorageReplicatedMergeTree::createZeroCopyLockNode -- in StorageReplicatedMergeTree::createZeroCopyLockNode
set insert_keeper_fault_injection_probability=0; set insert_keeper_fault_injection_probability=0;
@ -23,6 +23,10 @@ select sleepEachRow(0.5) as test_does_not_rely_on_this;
insert into rmt1 values(5, 5, 5); insert into rmt1 values(5, 5, 5);
alter table rmt2 update m = m * 10 where 1 settings mutations_sync=2; alter table rmt2 update m = m * 10 where 1 settings mutations_sync=2;
-- wait for parts to be merged
select throwIf(name = 'all_0_5_1_6') from system.parts where database=currentDatabase() and table like 'rmt%' and active
format Null; -- { retry 30 until serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
system sync replica rmt2; system sync replica rmt2;
set optimize_throw_if_noop=1; set optimize_throw_if_noop=1;
optimize table rmt2 final; optimize table rmt2 final;
@ -32,10 +36,10 @@ select 1, * from rmt1 order by n;
system sync replica rmt1; system sync replica rmt1;
select 2, * from rmt2 order by n; select 2, * from rmt2 order by n;
-- a funny way to wait for outdated parts to be removed -- wait for outdated parts to be removed
select sleep(1), sleepEachRow(0.1) from url('http://localhost:8123/?param_tries={1..10}&query=' || encodeURLComponent( select throwIf(count() = 0) from (
'select *, _state from system.parts where database=''' || currentDatabase() || ''' and table like ''rmt%'' and active=0' select *, _state from system.parts where database=currentDatabase() and table like 'rmt%' and active=0
), 'LineAsString', 's String') settings max_threads=1 format Null; ) format Null; -- { retry 30 until serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select *, _state from system.parts where database=currentDatabase() and table like 'rmt%' and active=0; select *, _state from system.parts where database=currentDatabase() and table like 'rmt%' and active=0;