From e347fa2f459f2841869f937b8cd07f4f25d1bae5 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 11 Aug 2022 02:34:10 +0000 Subject: [PATCH 01/28] add fuzzer for table definition --- docker/test/fuzzer/allow-nullable-key.xml | 6 + docker/test/fuzzer/run-fuzzer.sh | 1 + programs/client/Client.cpp | 188 +++++++++++------ programs/client/Client.h | 1 + src/Client/ClientBase.h | 1 + src/Client/QueryFuzzer.cpp | 245 ++++++++++++++++++++++ src/Client/QueryFuzzer.h | 14 ++ 7 files changed, 390 insertions(+), 66 deletions(-) create mode 100644 docker/test/fuzzer/allow-nullable-key.xml diff --git a/docker/test/fuzzer/allow-nullable-key.xml b/docker/test/fuzzer/allow-nullable-key.xml new file mode 100644 index 00000000000..5a0c2c20e1c --- /dev/null +++ b/docker/test/fuzzer/allow-nullable-key.xml @@ -0,0 +1,6 @@ + + + + 1 + + \ No newline at end of file diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh index f74760e3339..393c980afba 100755 --- a/docker/test/fuzzer/run-fuzzer.sh +++ b/docker/test/fuzzer/run-fuzzer.sh @@ -85,6 +85,7 @@ function configure # TODO figure out which ones are needed cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d + cp -av --dereference "$script_dir"/allow-nullable-key.xml db/config.d } function watchdog diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 584806951cf..6ef0db93669 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -12,6 +12,7 @@ #include #include "Client.h" #include "Core/Protocol.h" +#include "Parsers/formatAST.h" #include @@ -513,6 +514,66 @@ static bool queryHasWithClause(const IAST & ast) return false; } +std::optional Client::processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query) +{ + processParsedSingleQuery(query_to_execute, query_to_execute, parsed_query); + + const auto * exception = server_exception ? server_exception.get() : client_exception.get(); + // Sometimes you may get TOO_DEEP_RECURSION from the server, + // and TOO_DEEP_RECURSION should not fail the fuzzer check. + if (have_error && exception->code() == ErrorCodes::TOO_DEEP_RECURSION) + { + have_error = false; + server_exception.reset(); + client_exception.reset(); + return true; + } + + if (have_error) + { + fmt::print(stderr, "Error on processing query '{}': {}\n", parsed_query->formatForErrorMessage(), exception->message()); + + // Try to reconnect after errors, for two reasons: + // 1. We might not have realized that the server died, e.g. if + // it sent us a trace and closed connection properly. + // 2. The connection might have gotten into a wrong state and + // the next query will get false positive about + // "Unknown packet from server". + try + { + connection->forceConnected(connection_parameters.timeouts); + } + catch (...) + { + // Just report it, we'll terminate below. + fmt::print(stderr, + "Error while reconnecting to the server: {}\n", + getCurrentExceptionMessage(true)); + + // The reconnection might fail, but we'll still be connected + // in the sense of `connection->isConnected() = true`, + // in case when the requested database doesn't exist. + // Disconnect manually now, so that the following code doesn't + // have any doubts, and the connection state is predictable. + connection->disconnect(); + } + } + + if (!connection->isConnected()) + { + // Probably the server is dead because we found an assertion + // failure. Fail fast. + fmt::print(stderr, "Lost connection to the server.\n"); + + // Print the changed settings because they might be needed to + // reproduce the error. + printChangedSettings(); + + return false; + } + + return std::nullopt; +} /// Returns false when server is not available. bool Client::processWithFuzzing(const String & full_query) @@ -557,18 +618,28 @@ bool Client::processWithFuzzing(const String & full_query) // - SET -- The time to fuzz the settings has not yet come // (see comments in Client/QueryFuzzer.cpp) size_t this_query_runs = query_fuzzer_runs; - if (orig_ast->as() || - orig_ast->as() || - orig_ast->as() || - orig_ast->as()) + ASTs inserts_for_fuzzed_tables; + + if (orig_ast->as() || orig_ast->as()) { this_query_runs = 1; } + else if (const auto * create = orig_ast->as()) + { + if (create->columns_list) + this_query_runs = create_query_fuzzer_runs; + else + this_query_runs = 1; + } + else if (const auto * insert = orig_ast->as()) + { + this_query_runs = 1; + inserts_for_fuzzed_tables = fuzzer.getInsertQueriesForFuzzedTables(full_query); + } String query_to_execute; - ASTPtr parsed_query; - ASTPtr fuzz_base = orig_ast; + for (size_t fuzz_step = 0; fuzz_step < this_query_runs; ++fuzz_step) { fmt::print(stderr, "Fuzzing step {} out of {}\n", fuzz_step, this_query_runs); @@ -629,9 +700,9 @@ bool Client::processWithFuzzing(const String & full_query) continue; } - parsed_query = ast_to_process; - query_to_execute = parsed_query->formatForErrorMessage(); - processParsedSingleQuery(full_query, query_to_execute, parsed_query); + query_to_execute = ast_to_process->formatForErrorMessage(); + if (auto res = processFuzzingStep(query_to_execute, ast_to_process)) + return *res; } catch (...) { @@ -644,60 +715,6 @@ bool Client::processWithFuzzing(const String & full_query) have_error = true; } - const auto * exception = server_exception ? server_exception.get() : client_exception.get(); - // Sometimes you may get TOO_DEEP_RECURSION from the server, - // and TOO_DEEP_RECURSION should not fail the fuzzer check. - if (have_error && exception->code() == ErrorCodes::TOO_DEEP_RECURSION) - { - have_error = false; - server_exception.reset(); - client_exception.reset(); - return true; - } - - if (have_error) - { - fmt::print(stderr, "Error on processing query '{}': {}\n", ast_to_process->formatForErrorMessage(), exception->message()); - - // Try to reconnect after errors, for two reasons: - // 1. We might not have realized that the server died, e.g. if - // it sent us a trace and closed connection properly. - // 2. The connection might have gotten into a wrong state and - // the next query will get false positive about - // "Unknown packet from server". - try - { - connection->forceConnected(connection_parameters.timeouts); - } - catch (...) - { - // Just report it, we'll terminate below. - fmt::print(stderr, - "Error while reconnecting to the server: {}\n", - getCurrentExceptionMessage(true)); - - // The reconnection might fail, but we'll still be connected - // in the sense of `connection->isConnected() = true`, - // in case when the requested database doesn't exist. - // Disconnect manually now, so that the following code doesn't - // have any doubts, and the connection state is predictable. - connection->disconnect(); - } - } - - if (!connection->isConnected()) - { - // Probably the server is dead because we found an assertion - // failure. Fail fast. - fmt::print(stderr, "Lost connection to the server.\n"); - - // Print the changed settings because they might be needed to - // reproduce the error. - printChangedSettings(); - - return false; - } - // Check that after the query is formatted, we can parse it back, // format again and get the same result. Unfortunately, we can't // compare the ASTs, which would be more sensitive to errors. This @@ -728,13 +745,12 @@ bool Client::processWithFuzzing(const String & full_query) // query, but second and third. // If you have to add any more workarounds to this check, just remove // it altogether, it's not so useful. - if (parsed_query && !have_error && !queryHasWithClause(*parsed_query)) + if (ast_to_process && !have_error && !queryHasWithClause(*ast_to_process)) { ASTPtr ast_2; try { const auto * tmp_pos = query_to_execute.c_str(); - ast_2 = parseQuery(tmp_pos, tmp_pos + query_to_execute.size(), false /* allow_multi_statements */); } catch (Exception & e) @@ -761,7 +777,7 @@ bool Client::processWithFuzzing(const String & full_query) "Got the following (different) text after formatting the fuzzed query and parsing it back:\n'{}'\n, expected:\n'{}'\n", text_3, text_2); fmt::print(stderr, "In more detail:\n"); - fmt::print(stderr, "AST-1 (generated by fuzzer):\n'{}'\n", parsed_query->dumpTree()); + fmt::print(stderr, "AST-1 (generated by fuzzer):\n'{}'\n", ast_to_process->dumpTree()); fmt::print(stderr, "Text-1 (AST-1 formatted):\n'{}'\n", query_to_execute); fmt::print(stderr, "AST-2 (Text-1 parsed):\n'{}'\n", ast_2->dumpTree()); fmt::print(stderr, "Text-2 (AST-2 formatted):\n'{}'\n", text_2); @@ -799,6 +815,34 @@ bool Client::processWithFuzzing(const String & full_query) } } + for (const auto & insert_query : inserts_for_fuzzed_tables) + { + std::cout << std::endl; + WriteBufferFromOStream ast_buf(std::cout, 4096); + formatAST(*insert_query, ast_buf, false /*highlight*/); + ast_buf.next(); + std::cout << std::endl << std::endl; + + try + { + query_to_execute = insert_query->formatForErrorMessage(); + if (auto res = processFuzzingStep(query_to_execute, insert_query)) + return *res; + } + catch (...) + { + client_exception = std::make_unique(getCurrentExceptionMessage(print_stack_trace), getCurrentExceptionCode()); + have_error = true; + } + + if (have_error) + { + server_exception.reset(); + client_exception.reset(); + have_error = false; + } + } + return true; } @@ -833,6 +877,7 @@ void Client::addOptions(OptionsDescription & options_description) ("compression", po::value(), "enable or disable compression (enabled by default for remote communication and disabled for localhost communication).") ("query-fuzzer-runs", po::value()->default_value(0), "After executing every SELECT query, do random mutations in it and run again specified number of times. This is used for testing to discover unexpected corner cases.") + ("create-query-fuzzer-runs", po::value()->default_value(0), "") ("interleave-queries-file", po::value>()->multitoken(), "file path with queries to execute before every file from 'queries-file'; multiple files can be specified (--queries-file file1 file2...); this is needed to enable more aggressive fuzzing of newly added tests (see 'query-fuzzer-runs' option)") @@ -985,6 +1030,17 @@ void Client::processOptions(const OptionsDescription & options_description, ignore_error = true; } + if ((create_query_fuzzer_runs = options["create-query-fuzzer-runs"].as())) + { + // Fuzzer implies multiquery. + config().setBool("multiquery", true); + // Ignore errors in parsing queries. + config().setBool("ignore-error", true); + + global_context->setSetting("allow_suspicious_low_cardinality_types", true); + ignore_error = true; + } + if (options.count("opentelemetry-traceparent")) { String traceparent = options["opentelemetry-traceparent"].as(); diff --git a/programs/client/Client.h b/programs/client/Client.h index 1fec282be51..63f28ca96a2 100644 --- a/programs/client/Client.h +++ b/programs/client/Client.h @@ -17,6 +17,7 @@ public: protected: bool processWithFuzzing(const String & full_query) override; + std::optional processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query); void connect() override; diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 6b19c1b8e02..212c9745a14 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -247,6 +247,7 @@ protected: QueryFuzzer fuzzer; int query_fuzzer_runs = 0; + int create_query_fuzzer_runs = 0; struct { diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 787fad5990a..be265e13caa 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -1,4 +1,21 @@ #include "QueryFuzzer.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include @@ -35,6 +52,7 @@ namespace DB namespace ErrorCodes { extern const int TOO_DEEP_RECURSION; + extern const int LOGICAL_ERROR; } Field QueryFuzzer::getRandomField(int type) @@ -398,6 +416,228 @@ void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def) } } +void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create) +{ + if (create.columns_list && create.columns_list->columns) + { + for (auto & ast : create.columns_list->columns->children) + { + if (auto * column = ast->as()) + { + fuzzColumnDeclaration(*column); + } + } + } + + if (create.storage && create.storage->engine) + { + auto & engine_name = create.storage->engine->name; + if (startsWith(engine_name, "Replicated")) + engine_name = engine_name.substr(strlen("Replicated")); + } + + auto full_name = create.getTable(); + auto original_name = full_name.substr(0, full_name.find("__fuzz_")); + + size_t index = index_of_fuzzed_table[original_name]++; + auto new_name = original_name + "__fuzz_" + toString(index); + + create.setTable(new_name); + + SipHash sip_hash; + sip_hash.update(original_name); + if (create.columns_list) + create.columns_list->updateTreeHash(sip_hash); + if (create.storage) + create.columns_list->updateTreeHash(sip_hash); + + IAST::Hash hash; + sip_hash.get128(hash); + if (created_tables_hashes.insert(hash).second) + original_table_name_to_fuzzed[original_name].push_back(new_name); +} + +void QueryFuzzer::fuzzColumnDeclaration(ASTColumnDeclaration & column) +{ + if (column.type) + { + auto data_type = fuzzDataType(DataTypeFactory::instance().get(column.type)); + + ParserDataType parser; + column.type = parseQuery(parser, data_type->getName(), DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH); + } +} + +DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) +{ + /// Do not replace Array with not Array to often. + const auto * type_array = typeid_cast(type.get()); + if (type_array && fuzz_rand() % 5 != 0) + return std::make_shared(fuzzDataType(type_array->getNestedType())); + + const auto * type_tuple = typeid_cast(type.get()); + if (type_tuple && fuzz_rand() % 5 != 0) + { + DataTypes elements; + for (const auto & element : type_tuple->getElements()) + elements.push_back(fuzzDataType(element)); + + return type_tuple->haveExplicitNames() + ? std::make_shared(elements, type_tuple->getElementNames()) + : std::make_shared(elements); + } + + const auto * type_map = typeid_cast(type.get()); + if (type_map && fuzz_rand() % 5 != 0) + { + auto key_type = fuzzDataType(type_map->getKeyType()); + auto value_type = fuzzDataType(type_map->getValueType()); + if (!DataTypeMap::checkKeyType(key_type)) + key_type = type_map->getKeyType(); + + return std::make_shared(key_type, value_type); + } + + const auto * type_nullable = typeid_cast(type.get()); + if (type_nullable) + { + size_t tmp = fuzz_rand() % 3; + if (tmp == 0) + return type_nullable->getNestedType(); + + if (tmp == 1) + { + auto nested_type = fuzzDataType(type_nullable->getNestedType()); + if (nested_type->canBeInsideNullable()) + return std::make_shared(nested_type); + } + } + + const auto * type_low_cardinality = typeid_cast(type.get()); + if (type_low_cardinality) + { + size_t tmp = fuzz_rand() % 3; + if (tmp == 0) + return type_low_cardinality->getDictionaryType(); + + if (tmp == 1) + { + auto nested_type = fuzzDataType(type_low_cardinality->getDictionaryType()); + if (nested_type->canBeInsideLowCardinality()) + return std::make_shared(nested_type); + } + } + + size_t tmp = fuzz_rand() % 10; + if (tmp <= 1 && type->canBeInsideNullable()) + return std::make_shared(type); + + if (tmp <= 3 && type->canBeInsideLowCardinality()) + return std::make_shared(type); + + if (tmp == 4) + return getRandomType(); + + return type; +} + +DataTypePtr QueryFuzzer::getRandomType() +{ + auto type_id = static_cast(fuzz_rand() % static_cast(TypeIndex::Tuple) + 1); + + if (type_id == TypeIndex::Tuple) + { + size_t tuple_size = fuzz_rand() % 6 + 1; + DataTypes elements; + for (size_t i = 0; i < tuple_size; ++i) + elements.push_back(getRandomType()); + return std::make_shared(elements); + } + + if (type_id == TypeIndex::Array) + return std::make_shared(getRandomType()); + +#define DISPATCH(DECIMAL) \ + if (type_id == TypeIndex::DECIMAL) \ + return std::make_shared>( \ + DataTypeDecimal::maxPrecision(), DataTypeDecimal::maxPrecision()); // NOLINT + + DISPATCH(Decimal32) + DISPATCH(Decimal64) + DISPATCH(Decimal128) + DISPATCH(Decimal256) +#undef DISPATCH + + if (type_id == TypeIndex::FixedString) + return std::make_shared(fuzz_rand() % 20); + + if (type_id == TypeIndex::Enum8) + return std::make_shared(); + + if (type_id == TypeIndex::Enum16) + return std::make_shared(); + + return DataTypeFactory::instance().get(String(magic_enum::enum_name(type_id))); +} + +void QueryFuzzer::fuzzTableName(ASTTableExpression & table) +{ + if (!table.database_and_table_name || fuzz_rand() % 3 == 0) + return; + + const auto * identifier = table.database_and_table_name->as(); + if (!identifier) + return; + + auto table_id = identifier->getTableId(); + if (table_id.empty()) + return; + + auto it = original_table_name_to_fuzzed.find(table_id.getTableName()); + if (it != original_table_name_to_fuzzed.end() && !it->second.empty()) + { + const auto & new_table_name = it->second[fuzz_rand() % it->second.size()]; + StorageID new_table_id(table_id.database_name, new_table_name); + table.database_and_table_name = std::make_shared(new_table_id); + } +} + +static ASTPtr tryParseInsertQuery(const String & full_query) +{ + const char * pos = full_query.data(); + const char * end = full_query.data() + full_query.size(); + + ParserInsertQuery parser(end, false); + String message; + + return tryParseQuery(parser, pos, end, message, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH); +} + +ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query) +{ + auto parsed_query = tryParseInsertQuery(full_query); + if (!parsed_query) + return {}; + + const auto & insert = *parsed_query->as(); + if (!insert.table) + return {}; + + auto table_name = insert.getTable(); + auto it = original_table_name_to_fuzzed.find(table_name); + if (it == original_table_name_to_fuzzed.end()) + return {}; + + ASTs queries; + for (const auto & fuzzed_name : it->second) + { + auto & query = queries.emplace_back(tryParseInsertQuery(full_query)); + query->as()->setTable(fuzzed_name); + } + + return queries; +} + void QueryFuzzer::fuzz(ASTs & asts) { for (auto & ast : asts) @@ -465,6 +705,7 @@ void QueryFuzzer::fuzz(ASTPtr & ast) } else if (auto * table_expr = typeid_cast(ast.get())) { + fuzzTableName(*table_expr); fuzz(table_expr->children); } else if (auto * expr_list = typeid_cast(ast.get())) @@ -531,6 +772,10 @@ void QueryFuzzer::fuzz(ASTPtr & ast) literal->value = fuzzField(literal->value); } } + else if (auto * create_query = typeid_cast(ast.get())) + { + fuzzCreateQuery(*create_query); + } else { fuzz(ast->children); diff --git a/src/Client/QueryFuzzer.h b/src/Client/QueryFuzzer.h index 25bd7f0c88d..40c203b6351 100644 --- a/src/Client/QueryFuzzer.h +++ b/src/Client/QueryFuzzer.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -16,6 +17,10 @@ namespace DB class ASTExpressionList; class ASTOrderByElement; +class ASTCreateQuery; +class ASTInsertQuery; +class ASTColumnDeclaration; +struct ASTTableExpression; struct ASTWindowDefinition; /* @@ -54,6 +59,9 @@ struct QueryFuzzer std::unordered_set debug_visited_nodes; ASTPtr * debug_top_ast = nullptr; + std::unordered_map> original_table_name_to_fuzzed; + std::unordered_map index_of_fuzzed_table; + std::set created_tables_hashes; // This is the only function you have to call -- it will modify the passed // ASTPtr to point to new AST with some random changes. @@ -63,12 +71,18 @@ struct QueryFuzzer Field getRandomField(int type); Field fuzzField(Field field); ASTPtr getRandomColumnLike(); + DataTypePtr fuzzDataType(DataTypePtr type); + DataTypePtr getRandomType(); + ASTs getInsertQueriesForFuzzedTables(const String & full_query); void replaceWithColumnLike(ASTPtr & ast); void replaceWithTableLike(ASTPtr & ast); void fuzzOrderByElement(ASTOrderByElement * elem); void fuzzOrderByList(IAST * ast); void fuzzColumnLikeExpressionList(IAST * ast); void fuzzWindowFrame(ASTWindowDefinition & def); + void fuzzCreateQuery(ASTCreateQuery & create); + void fuzzColumnDeclaration(ASTColumnDeclaration & column); + void fuzzTableName(ASTTableExpression & table); void fuzz(ASTs & asts); void fuzz(ASTPtr & ast); void collectFuzzInfoMain(ASTPtr ast); From 00144bb6a93dec87f76d13891b2e17b0e2fc6734 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 11 Aug 2022 02:43:47 +0000 Subject: [PATCH 02/28] fix style check --- src/Client/QueryFuzzer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index be265e13caa..e1b72b0f830 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -52,7 +52,6 @@ namespace DB namespace ErrorCodes { extern const int TOO_DEEP_RECURSION; - extern const int LOGICAL_ERROR; } Field QueryFuzzer::getRandomField(int type) From 0ba886f763ee7f74a95031db92605c4c3465b254 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 11 Aug 2022 12:24:15 +0000 Subject: [PATCH 03/28] enable fuzzing of table definitions --- docker/test/fuzzer/run-fuzzer.sh | 1 + src/Client/QueryFuzzer.cpp | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh index 393c980afba..d8ce62348d2 100755 --- a/docker/test/fuzzer/run-fuzzer.sh +++ b/docker/test/fuzzer/run-fuzzer.sh @@ -220,6 +220,7 @@ quit --receive_data_timeout_ms=10000 \ --stacktrace \ --query-fuzzer-runs=1000 \ + --create-query-fuzzer-runs=30 \ --queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \ $NEW_TESTS_OPT \ > >(tail -n 100000 > fuzzer.log) \ diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index e1b72b0f830..995b4b455b0 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -448,7 +448,7 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create) if (create.columns_list) create.columns_list->updateTreeHash(sip_hash); if (create.storage) - create.columns_list->updateTreeHash(sip_hash); + create.storage->updateTreeHash(sip_hash); IAST::Hash hash; sip_hash.get128(hash); @@ -558,7 +558,7 @@ DataTypePtr QueryFuzzer::getRandomType() #define DISPATCH(DECIMAL) \ if (type_id == TypeIndex::DECIMAL) \ - return std::make_shared>( \ + return std::make_shared>( \ // NOLINT DataTypeDecimal::maxPrecision(), DataTypeDecimal::maxPrecision()); // NOLINT DISPATCH(Decimal32) From 0d74ae33632a6ec1a2d2ce41bbe42bf8bddb376a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 11 Aug 2022 13:39:15 +0000 Subject: [PATCH 04/28] fix build --- src/Client/QueryFuzzer.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 995b4b455b0..73637a8b4c2 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -556,10 +556,12 @@ DataTypePtr QueryFuzzer::getRandomType() if (type_id == TypeIndex::Array) return std::make_shared(getRandomType()); +/// NOLINTNEXTLINE #define DISPATCH(DECIMAL) \ if (type_id == TypeIndex::DECIMAL) \ - return std::make_shared>( \ // NOLINT - DataTypeDecimal::maxPrecision(), DataTypeDecimal::maxPrecision()); // NOLINT + return std::make_shared>( \ + DataTypeDecimal::maxPrecision(), \ + fuzz_rand() % DataTypeDecimal::maxPrecision() + 1); DISPATCH(Decimal32) DISPATCH(Decimal64) From 038325038331577597845eb514f10ea8e5265729 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 12 Aug 2022 00:32:21 +0000 Subject: [PATCH 05/28] execute DROP queries for fuzzed tables --- programs/client/Client.cpp | 13 +++++++++---- src/Client/QueryFuzzer.cpp | 24 ++++++++++++++++++++++++ src/Client/QueryFuzzer.h | 2 ++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 6ef0db93669..a0eccae957b 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -618,9 +618,9 @@ bool Client::processWithFuzzing(const String & full_query) // - SET -- The time to fuzz the settings has not yet come // (see comments in Client/QueryFuzzer.cpp) size_t this_query_runs = query_fuzzer_runs; - ASTs inserts_for_fuzzed_tables; + ASTs queries_for_fuzzed_tables; - if (orig_ast->as() || orig_ast->as()) + if (orig_ast->as()) { this_query_runs = 1; } @@ -634,7 +634,12 @@ bool Client::processWithFuzzing(const String & full_query) else if (const auto * insert = orig_ast->as()) { this_query_runs = 1; - inserts_for_fuzzed_tables = fuzzer.getInsertQueriesForFuzzedTables(full_query); + queries_for_fuzzed_tables = fuzzer.getInsertQueriesForFuzzedTables(full_query); + } + else if (const auto * drop = orig_ast->as()) + { + this_query_runs = 1; + queries_for_fuzzed_tables = fuzzer.getDropQueriesForFuzzedTables(*drop); } String query_to_execute; @@ -815,7 +820,7 @@ bool Client::processWithFuzzing(const String & full_query) } } - for (const auto & insert_query : inserts_for_fuzzed_tables) + for (const auto & insert_query : queries_for_fuzzed_tables) { std::cout << std::endl; WriteBufferFromOStream ast_buf(std::cout, 4096); diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 73637a8b4c2..0035bd7e990 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -632,6 +633,9 @@ ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query) ASTs queries; for (const auto & fuzzed_name : it->second) { + /// Parse query from scratch for each table instead of clone, + /// to store proper pointers to inlined data, + /// which are not copies during clone. auto & query = queries.emplace_back(tryParseInsertQuery(full_query)); query->as()->setTable(fuzzed_name); } @@ -639,6 +643,26 @@ ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query) return queries; } +ASTs QueryFuzzer::getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query) +{ + if (drop_query.kind != ASTDropQuery::Drop) + return {}; + + auto table_name = drop_query.getTable(); + auto it = original_table_name_to_fuzzed.find(table_name); + if (it == original_table_name_to_fuzzed.end()) + return {}; + + ASTs queries; + for (const auto & fuzzed_name : it->second) + { + auto & query = queries.emplace_back(drop_query.clone()); + query->as()->setTable(fuzzed_name); + } + + return queries; +} + void QueryFuzzer::fuzz(ASTs & asts) { for (auto & ast : asts) diff --git a/src/Client/QueryFuzzer.h b/src/Client/QueryFuzzer.h index 40c203b6351..ffe9fc7c91e 100644 --- a/src/Client/QueryFuzzer.h +++ b/src/Client/QueryFuzzer.h @@ -20,6 +20,7 @@ class ASTOrderByElement; class ASTCreateQuery; class ASTInsertQuery; class ASTColumnDeclaration; +class ASTDropQuery; struct ASTTableExpression; struct ASTWindowDefinition; @@ -74,6 +75,7 @@ struct QueryFuzzer DataTypePtr fuzzDataType(DataTypePtr type); DataTypePtr getRandomType(); ASTs getInsertQueriesForFuzzedTables(const String & full_query); + ASTs getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query); void replaceWithColumnLike(ASTPtr & ast); void replaceWithTableLike(ASTPtr & ast); void fuzzOrderByElement(ASTOrderByElement * elem); From 9b35d54e8fc1f9b0e6e7d43817bc76c4568f5ef3 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 26 Aug 2022 12:21:30 +0000 Subject: [PATCH 06/28] First try --- programs/server/Server.cpp | 1 - src/Core/Settings.h | 2 +- src/Interpreters/AsynchronousInsertQueue.cpp | 16 ++++++++++------ src/Interpreters/AsynchronousInsertQueue.h | 7 ++++--- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 14f97923ce3..c2eedbbb99c 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1417,7 +1417,6 @@ int Server::main(const std::vector & /*args*/) global_context->setAsynchronousInsertQueue(std::make_shared( global_context, settings.async_insert_threads, - settings.async_insert_max_data_size, AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms})); /// Size of cache for marks (index of MergeTree family of tables). diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9dd87904a56..c718f05d2d9 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -592,7 +592,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \ M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \ M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ - M(UInt64, async_insert_max_data_size, 100000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ + M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \ \ diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index cad2200c5ec..55ce518a591 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -120,9 +120,8 @@ std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() co } -AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts) +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, const Timeout & timeouts) : WithContext(context_) - , max_data_size(max_data_size_) , busy_timeout(timeouts.busy) , stale_timeout(timeouts.stale) , pool(pool_size) @@ -250,7 +249,10 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'", data->entries.size(), data->size, queryToString(it->first.query)); - if (data->size > max_data_size) + /// Here we check whether we hit the limit on maximum data size in the buffer. + /// And use setting from query context! + /// It works, because queries with the same set of settings are already grouped together. + if (data->size > it->first.settings.async_insert_max_data_size) scheduleDataProcessingJob(it->first, std::move(data), getContext()); CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert); @@ -290,17 +292,19 @@ void AsynchronousInsertQueue::busyCheck() timeout = busy_timeout; std::shared_lock read_lock(rwlock); + const auto now = std::chrono::steady_clock::now(); + for (auto & [key, elem] : queue) { std::lock_guard data_lock(elem->mutex); if (!elem->data) continue; - auto lag = std::chrono::steady_clock::now() - elem->data->first_update; - if (lag >= busy_timeout) + const auto deadline = elem->data->first_update + std::chrono::milliseconds(key.settings.async_insert_busy_timeout_ms); + if (now >= deadline) scheduleDataProcessingJob(key, std::move(elem->data), getContext()); else - timeout = std::min(timeout, std::chrono::ceil(busy_timeout - lag)); + timeout = std::min(timeout, std::chrono::ceil(deadline - now)); } } } diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index 6d9aeb7f55d..9f99c334752 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -25,7 +25,7 @@ public: Milliseconds stale; }; - AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts); + AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, const Timeout & timeouts); ~AsynchronousInsertQueue(); void push(ASTPtr query, ContextPtr query_context); @@ -109,9 +109,10 @@ private: /// grow for a long period of time and users will be able to select new data in deterministic manner. /// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last /// piece of inserted data. - /// - max_data_size: if the maximum size of data is reached, then again we dump the data. + /// + /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting) + /// If so, then again we dump the data. - const size_t max_data_size; /// in bytes const Milliseconds busy_timeout; const Milliseconds stale_timeout; From 636cedf488c76c4649726786f5890e1830cf7309 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 26 Aug 2022 20:29:26 +0000 Subject: [PATCH 07/28] Better --- programs/server/Server.cpp | 3 +- src/Core/Settings.h | 2 +- src/Interpreters/AsynchronousInsertQueue.cpp | 73 ++++++++----------- src/Interpreters/AsynchronousInsertQueue.h | 35 ++++----- .../StorageSystemAsynchronousInserts.cpp | 3 +- 5 files changed, 48 insertions(+), 68 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c2eedbbb99c..758c8b5999d 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1416,8 +1416,7 @@ int Server::main(const std::vector & /*args*/) if (settings.async_insert_threads) global_context->setAsynchronousInsertQueue(std::make_shared( global_context, - settings.async_insert_threads, - AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms})); + settings.async_insert_threads)); /// Size of cache for marks (index of MergeTree family of tables). size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c718f05d2d9..2d47024d2b1 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -594,7 +594,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ - M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \ \ M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \ M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \ @@ -667,6 +666,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) MAKE_OBSOLETE(M, Bool, allow_experimental_database_atomic, true) \ MAKE_OBSOLETE(M, Bool, allow_experimental_bigint_types, true) \ MAKE_OBSOLETE(M, Bool, allow_experimental_window_functions, true) \ + MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \ MAKE_OBSOLETE(M, HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT) \ MAKE_OBSOLETE(M, Bool, database_replicated_ddl_output, true) \ MAKE_OBSOLETE(M, UInt64, replication_alter_columns_timeout, 60) \ diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 55ce518a591..af3dadd3c1d 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -120,10 +120,8 @@ std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() co } -AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, const Timeout & timeouts) +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size) : WithContext(context_) - , busy_timeout(timeouts.busy) - , stale_timeout(timeouts.stale) , pool(pool_size) , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) , cleanup_thread(&AsynchronousInsertQueue::cleanup, this) @@ -131,9 +129,6 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo using namespace std::chrono; assert(pool_size); - - if (stale_timeout > 0ms) - dump_by_last_update_thread = ThreadFromGlobalPool(&AsynchronousInsertQueue::staleCheck, this); } AsynchronousInsertQueue::~AsynchronousInsertQueue() @@ -154,9 +149,6 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() assert(cleanup_thread.joinable()); cleanup_thread.join(); - if (dump_by_last_update_thread.joinable()) - dump_by_last_update_thread.join(); - pool.wait(); std::lock_guard lock(currently_processing_mutex); @@ -233,12 +225,15 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator std::lock_guard data_lock(data_mutex); if (!data) - data = std::make_unique(); + { + auto now = std::chrono::steady_clock::now(); + data = std::make_unique(now); + deadline_queue.insert({now, it}); + } size_t entry_data_size = entry->bytes.size(); data->size += entry_data_size; - data->last_update = std::chrono::steady_clock::now(); data->entries.emplace_back(entry); { @@ -284,46 +279,37 @@ void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, co void AsynchronousInsertQueue::busyCheck() { - auto timeout = busy_timeout; - - while (!waitForShutdown(timeout)) + while (!shutdown) { - /// TODO: use priority queue instead of raw unsorted queue. - timeout = busy_timeout; - std::shared_lock read_lock(rwlock); + std::unique_lock lock(deadline_mutex); + are_tasks_available.wait(lock, [this]() + { + if (shutdown) + return true; + + if (!deadline_queue.empty() && deadline_queue.begin()->first >= std::chrono::steady_clock::now()) + return true; + + return false; + }); const auto now = std::chrono::steady_clock::now(); - for (auto & [key, elem] : queue) + while (true) { + if (deadline_queue.empty() || deadline_queue.begin()->first < now) + break; + + + std::shared_lock read_lock(rwlock); + auto main_queue_it = deadline_queue.begin()->second; + auto & [key, elem] = *main_queue_it; + std::lock_guard data_lock(elem->mutex); if (!elem->data) continue; - const auto deadline = elem->data->first_update + std::chrono::milliseconds(key.settings.async_insert_busy_timeout_ms); - if (now >= deadline) - scheduleDataProcessingJob(key, std::move(elem->data), getContext()); - else - timeout = std::min(timeout, std::chrono::ceil(deadline - now)); - } - } -} - -void AsynchronousInsertQueue::staleCheck() -{ - while (!waitForShutdown(stale_timeout)) - { - std::shared_lock read_lock(rwlock); - - for (auto & [key, elem] : queue) - { - std::lock_guard data_lock(elem->mutex); - if (!elem->data) - continue; - - auto lag = std::chrono::steady_clock::now() - elem->data->last_update; - if (lag >= stale_timeout) - scheduleDataProcessingJob(key, std::move(elem->data), getContext()); + scheduleDataProcessingJob(key, std::move(elem->data), getContext()); } } } @@ -332,7 +318,8 @@ void AsynchronousInsertQueue::cleanup() { /// Do not run cleanup too often, /// because it holds exclusive lock. - auto timeout = busy_timeout * 5; + /// FIXME: Come up with another mechanism. + auto timeout = Milliseconds(1000); while (!waitForShutdown(timeout)) { diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index 9f99c334752..cc1e8048612 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -18,14 +18,7 @@ class AsynchronousInsertQueue : public WithContext public: using Milliseconds = std::chrono::milliseconds; - /// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor. - struct Timeout - { - Milliseconds busy; - Milliseconds stale; - }; - - AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, const Timeout & timeouts); + AsynchronousInsertQueue(ContextPtr context_, size_t pool_size); ~AsynchronousInsertQueue(); void push(ASTPtr query, ContextPtr query_context); @@ -69,6 +62,10 @@ private: std::exception_ptr exception; }; + explicit InsertData(std::chrono::steady_clock::time_point now) + : first_update(now) + {} + using EntryPtr = std::shared_ptr; std::list entries; @@ -76,11 +73,7 @@ private: /// Timestamp of the first insert into queue, or after the last queue dump. /// Used to detect for how long the queue is active, so we can dump it by timer. - std::chrono::time_point first_update = std::chrono::steady_clock::now(); - - /// Timestamp of the last insert into queue. - /// Used to detect for how long the queue is stale, so we can dump it by another timer. - std::chrono::time_point last_update; + std::chrono::time_point first_update; }; using InsertDataPtr = std::unique_ptr; @@ -96,10 +89,17 @@ private: using Queue = std::unordered_map, InsertQuery::Hash>; using QueueIterator = Queue::iterator; + /// Ordered container + using DeadlineQueue = std::map; + mutable std::shared_mutex rwlock; Queue queue; + mutable std::mutex deadline_mutex; + mutable std::condition_variable are_tasks_available; + DeadlineQueue deadline_queue; + using QueryIdToEntry = std::unordered_map; mutable std::mutex currently_processing_mutex; QueryIdToEntry currently_processing_queries; @@ -113,22 +113,15 @@ private: /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting) /// If so, then again we dump the data. - const Milliseconds busy_timeout; - const Milliseconds stale_timeout; - - std::mutex shutdown_mutex; - std::condition_variable shutdown_cv; - bool shutdown{false}; + std::atomic shutdown{false}; ThreadPool pool; /// dump the data only inside this pool. ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() - ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck() ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup() Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue"); void busyCheck(); - void staleCheck(); void cleanup(); /// Should be called with shared or exclusively locked 'rwlock'. diff --git a/src/Storages/System/StorageSystemAsynchronousInserts.cpp b/src/Storages/System/StorageSystemAsynchronousInserts.cpp index 80fc070c83a..7c100a831c3 100644 --- a/src/Storages/System/StorageSystemAsynchronousInserts.cpp +++ b/src/Storages/System/StorageSystemAsynchronousInserts.cpp @@ -77,7 +77,8 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co res_columns[i++]->insert(insert_query.format); res_columns[i++]->insert(time_in_microseconds(elem->data->first_update)); - res_columns[i++]->insert(time_in_microseconds(elem->data->last_update)); + /// FIXME: + res_columns[i++]->insert(time_in_microseconds(std::chrono::steady_clock::now())); res_columns[i++]->insert(elem->data->size); Array arr_query_id; From a5eacc2e029e596484b9e9ac5eeb250df4d65cbb Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Fri, 26 Aug 2022 20:41:34 +0000 Subject: [PATCH 08/28] Even better --- src/Interpreters/AsynchronousInsertQueue.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index af3dadd3c1d..af546855f05 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -143,6 +143,11 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() shutdown_cv.notify_all(); } + { + std::lock_guard lock(deadline_mutex); + are_tasks_available.notify_one(); + } + assert(dump_by_first_update_thread.joinable()); dump_by_first_update_thread.join(); @@ -228,7 +233,10 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator { auto now = std::chrono::steady_clock::now(); data = std::make_unique(now); - deadline_queue.insert({now, it}); + + std::lock_guard lock(deadline_mutex); + deadline_queue.insert({now + Milliseconds{it->first.settings.async_insert_busy_timeout_ms}, it}); + are_tasks_available.notify_one(); } size_t entry_data_size = entry->bytes.size(); @@ -282,7 +290,7 @@ void AsynchronousInsertQueue::busyCheck() while (!shutdown) { std::unique_lock lock(deadline_mutex); - are_tasks_available.wait(lock, [this]() + are_tasks_available.wait_for(lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]() { if (shutdown) return true; @@ -293,6 +301,9 @@ void AsynchronousInsertQueue::busyCheck() return false; }); + if (shutdown) + return; + const auto now = std::chrono::steady_clock::now(); while (true) @@ -302,9 +313,12 @@ void AsynchronousInsertQueue::busyCheck() std::shared_lock read_lock(rwlock); + std::unique_lock deadline_lock(deadline_mutex); auto main_queue_it = deadline_queue.begin()->second; auto & [key, elem] = *main_queue_it; + deadline_queue.erase(deadline_queue.begin()); + std::lock_guard data_lock(elem->mutex); if (!elem->data) continue; From a089ab9238caa8c5844d60e2f87190ef4d85cc9d Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Mon, 29 Aug 2022 10:53:27 +0000 Subject: [PATCH 09/28] Style --- src/Storages/System/StorageSystemAsynchronousInserts.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/System/StorageSystemAsynchronousInserts.cpp b/src/Storages/System/StorageSystemAsynchronousInserts.cpp index 7c100a831c3..0aca98b9864 100644 --- a/src/Storages/System/StorageSystemAsynchronousInserts.cpp +++ b/src/Storages/System/StorageSystemAsynchronousInserts.cpp @@ -77,7 +77,7 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co res_columns[i++]->insert(insert_query.format); res_columns[i++]->insert(time_in_microseconds(elem->data->first_update)); - /// FIXME: + /// FIXME: res_columns[i++]->insert(time_in_microseconds(std::chrono::steady_clock::now())); res_columns[i++]->insert(elem->data->size); From fa587ce05796f4deb104ee65d2fa672b2e1acadb Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 30 Aug 2022 17:31:10 +0000 Subject: [PATCH 10/28] Better --- src/Interpreters/AsynchronousInsertQueue.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index af546855f05..6e670efeae6 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -295,7 +295,7 @@ void AsynchronousInsertQueue::busyCheck() if (shutdown) return true; - if (!deadline_queue.empty() && deadline_queue.begin()->first >= std::chrono::steady_clock::now()) + if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now()) return true; return false; @@ -308,12 +308,11 @@ void AsynchronousInsertQueue::busyCheck() while (true) { - if (deadline_queue.empty() || deadline_queue.begin()->first < now) + if (deadline_queue.empty() || deadline_queue.begin()->first > now) break; std::shared_lock read_lock(rwlock); - std::unique_lock deadline_lock(deadline_mutex); auto main_queue_it = deadline_queue.begin()->second; auto & [key, elem] = *main_queue_it; From 852d084950079c2704537491040464d52c93747a Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 14 Sep 2022 20:31:19 +0000 Subject: [PATCH 11/28] Save --- src/Core/Settings.h | 1 + src/Interpreters/AsynchronousInsertQueue.cpp | 30 ++++----- src/Interpreters/AsynchronousInsertQueue.h | 9 ++- .../tests/gtest_async_inserts.cpp | 63 +++++++++++++++++++ .../StorageSystemAsynchronousInserts.cpp | 3 - 5 files changed, 83 insertions(+), 23 deletions(-) create mode 100644 src/Interpreters/tests/gtest_async_inserts.cpp diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2d47024d2b1..c41dd92df61 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -594,6 +594,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \ M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \ M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \ + M(Milliseconds, async_insert_cleanup_timeout_ms, 1000, "Time to wait before each iteration of cleaning up buffers for INSERT queries which don't appear anymore. Only has meaning at server startup.", 0) \ \ M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \ M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \ diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 6e670efeae6..4681a9b6bd5 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -120,8 +120,9 @@ std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() co } -AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size) +AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout_) : WithContext(context_) + , cleanup_timeout(cleanup_timeout_) , pool(pool_size) , dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this) , cleanup_thread(&AsynchronousInsertQueue::cleanup, this) @@ -137,14 +138,9 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() LOG_TRACE(log, "Shutting down the asynchronous insertion queue"); - { - std::lock_guard lock(shutdown_mutex); - shutdown = true; - shutdown_cv.notify_all(); - } - { std::lock_guard lock(deadline_mutex); + shutdown = true; are_tasks_available.notify_one(); } @@ -329,13 +325,16 @@ void AsynchronousInsertQueue::busyCheck() void AsynchronousInsertQueue::cleanup() { - /// Do not run cleanup too often, - /// because it holds exclusive lock. - /// FIXME: Come up with another mechanism. - auto timeout = Milliseconds(1000); - - while (!waitForShutdown(timeout)) + while (true) { + { + std::unique_lock shutdown_lock(shutdown_mutex); + shutdown_cv.wait_for(shutdown_lock, Milliseconds(cleanup_timeout), [this]() { return shutdown; }); + + if (shutdown) + return; + } + std::vector keys_to_remove; { @@ -387,11 +386,6 @@ void AsynchronousInsertQueue::cleanup() } } -bool AsynchronousInsertQueue::waitForShutdown(const Milliseconds & timeout) -{ - std::unique_lock shutdown_lock(shutdown_mutex); - return shutdown_cv.wait_for(shutdown_lock, timeout, [this]() { return shutdown; }); -} // static void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context) diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index cc1e8048612..aa5f02203fa 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -18,7 +18,7 @@ class AsynchronousInsertQueue : public WithContext public: using Milliseconds = std::chrono::milliseconds; - AsynchronousInsertQueue(ContextPtr context_, size_t pool_size); + AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout); ~AsynchronousInsertQueue(); void push(ASTPtr query, ContextPtr query_context); @@ -96,6 +96,9 @@ private: mutable std::shared_mutex rwlock; Queue queue; + mutable std::mutex shutdown_mutex; + mutable std::condition_variable shutdown_cv; + mutable std::mutex deadline_mutex; mutable std::condition_variable are_tasks_available; DeadlineQueue deadline_queue; @@ -113,7 +116,9 @@ private: /// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting) /// If so, then again we dump the data. - std::atomic shutdown{false}; + const Milliseconds cleanup_timeout; + + bool shutdown{false}; ThreadPool pool; /// dump the data only inside this pool. ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() diff --git a/src/Interpreters/tests/gtest_async_inserts.cpp b/src/Interpreters/tests/gtest_async_inserts.cpp new file mode 100644 index 00000000000..0720592b8a7 --- /dev/null +++ b/src/Interpreters/tests/gtest_async_inserts.cpp @@ -0,0 +1,63 @@ +#include + +#include +#include +#include "Processors/Executors/PullingPipelineExecutor.h" + +#include +#include +#include +#include +#include +#include +#include +#include "Common/Exception.h" + +using namespace DB; + +static SharedContextHolder shared_context; +static ContextMutablePtr context; + +static bool initialize() +{ + try + { + shared_context = Context::createShared(); + context = Context::createGlobal(shared_context.get()); + context->makeGlobalContext(); + context->setApplicationType(Context::ApplicationType::LOCAL); + + // registerFunctions(); + // registerAggregateFunctions(); + // registerTableFunctions(); + // registerStorages(); + // registerDictionaries(); + // registerDisks(); + // registerFormats(); + + return true; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + throw; + } +} + +[[ maybe_unused ]] static bool initialized = initialize(); + +TEST(AsyncInsertQueue, SimpleTest) +{ + try + { + auto io = executeQuery("CREATE TABLE SimpleTest ENGINE=Memory()", context, true, QueryProcessingStage::Complete); + PullingPipelineExecutor executor(io.pipeline); + Block res; + while (!res && executor.pull(res)); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + +} diff --git a/src/Storages/System/StorageSystemAsynchronousInserts.cpp b/src/Storages/System/StorageSystemAsynchronousInserts.cpp index 0aca98b9864..5ebdb828c34 100644 --- a/src/Storages/System/StorageSystemAsynchronousInserts.cpp +++ b/src/Storages/System/StorageSystemAsynchronousInserts.cpp @@ -24,7 +24,6 @@ NamesAndTypesList StorageSystemAsynchronousInserts::getNamesAndTypes() {"table", std::make_shared()}, {"format", std::make_shared()}, {"first_update", std::make_shared(TIME_SCALE)}, - {"last_update", std::make_shared(TIME_SCALE)}, {"total_bytes", std::make_shared()}, {"entries.query_id", std::make_shared(std::make_shared())}, {"entries.bytes", std::make_shared(std::make_shared())}, @@ -77,8 +76,6 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co res_columns[i++]->insert(insert_query.format); res_columns[i++]->insert(time_in_microseconds(elem->data->first_update)); - /// FIXME: - res_columns[i++]->insert(time_in_microseconds(std::chrono::steady_clock::now())); res_columns[i++]->insert(elem->data->size); Array arr_query_id; From 502338560c4175e8c50083db22464e15a005c1a6 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 20 Sep 2022 22:46:40 +0000 Subject: [PATCH 12/28] Added a test --- programs/server/Server.cpp | 3 +- ...51_async_insert_user_level_settings.python | 62 +++++++++++++++++++ ...async_insert_user_level_settings.reference | 1 + .../02451_async_insert_user_level_settings.sh | 9 +++ 4 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/02451_async_insert_user_level_settings.python create mode 100644 tests/queries/0_stateless/02451_async_insert_user_level_settings.reference create mode 100755 tests/queries/0_stateless/02451_async_insert_user_level_settings.sh diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 758c8b5999d..8c0ebba694d 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1416,7 +1416,8 @@ int Server::main(const std::vector & /*args*/) if (settings.async_insert_threads) global_context->setAsynchronousInsertQueue(std::make_shared( global_context, - settings.async_insert_threads)); + settings.async_insert_threads, + settings.async_insert_cleanup_timeout_ms)); /// Size of cache for marks (index of MergeTree family of tables). size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python new file mode 100644 index 00000000000..9fe6142edc6 --- /dev/null +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +import os +import sys +import time + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) + +CLICKHOUSE_URL = os.environ.get("CLICKHOUSE_URL") +CLICKHOUSE_TMP = os.environ.get("CLICKHOUSE_TMP") + +from pure_http_client import ClickHouseClient + +client = ClickHouseClient() + +NUM_RUNS = 20 +TIME_TO_WAIT_MS = 500 + +# The purpose of this test is to check that AsyncInsertQueue +# respects timeouts specified in the scope of query. +# Like if we execute NUM_RUNS subsequent inserts +# then we should spend at least (NUM_RUNS - 1) * TIME_TO_WAIT_MS +# Because each query corresponds to a timepoint when it been flushed +# And time period between first and last flush is exactly such +# as descibed above. +# Note that this doesn't include the time to process the query itself +# and this time maybe different depending on the build type (release or with sanitizer) + +gen_data_query = "SELECT number + {} AS id, toString(id) AS s, range(id) AS arr FROM numbers(10) FORMAT TSV" +insert_query = "INSERT INTO t_async_insert_user_settings FORMAT TSV" +settings = { + "async_insert": 1, + "wait_for_async_insert": 1, + "async_insert_busy_timeout_ms": TIME_TO_WAIT_MS, +} + +all_data = [] + +for i in range(NUM_RUNS): + all_data.append( + client.query(gen_data_query.format(i * 10), settings={}, binary_result=True) + ) + +client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") +client.query( + "CREATE TABLE t_async_insert_user_settings (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory" +) + +start_ms = time.time() * 1000.0 +for i in range(NUM_RUNS): + client.query_with_data(insert_query, all_data[i], settings=settings) +end_ms = time.time() * 1000.0 + +duration = end_ms - start_ms + +expected = (NUM_RUNS - 1) * TIME_TO_WAIT_MS +if duration >= expected: + print("Ok.") +else: + print(f"Fail. Duration: {duration}. Expected: {expected}") + +client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference b/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference new file mode 100644 index 00000000000..587579af915 --- /dev/null +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference @@ -0,0 +1 @@ +Ok. diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh b/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh new file mode 100755 index 00000000000..3d627e273b9 --- /dev/null +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test +python3 "$CURDIR"/02451_async_insert_user_level_settings.python From 4d1d87da2c4370fbeafbc35407e3a6b56c637e51 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 20 Sep 2022 22:53:09 +0000 Subject: [PATCH 13/28] Delete garbage --- .../tests/gtest_async_inserts.cpp | 63 ------------------- 1 file changed, 63 deletions(-) delete mode 100644 src/Interpreters/tests/gtest_async_inserts.cpp diff --git a/src/Interpreters/tests/gtest_async_inserts.cpp b/src/Interpreters/tests/gtest_async_inserts.cpp deleted file mode 100644 index 0720592b8a7..00000000000 --- a/src/Interpreters/tests/gtest_async_inserts.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include - -#include -#include -#include "Processors/Executors/PullingPipelineExecutor.h" - -#include -#include -#include -#include -#include -#include -#include -#include "Common/Exception.h" - -using namespace DB; - -static SharedContextHolder shared_context; -static ContextMutablePtr context; - -static bool initialize() -{ - try - { - shared_context = Context::createShared(); - context = Context::createGlobal(shared_context.get()); - context->makeGlobalContext(); - context->setApplicationType(Context::ApplicationType::LOCAL); - - // registerFunctions(); - // registerAggregateFunctions(); - // registerTableFunctions(); - // registerStorages(); - // registerDictionaries(); - // registerDisks(); - // registerFormats(); - - return true; - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - throw; - } -} - -[[ maybe_unused ]] static bool initialized = initialize(); - -TEST(AsyncInsertQueue, SimpleTest) -{ - try - { - auto io = executeQuery("CREATE TABLE SimpleTest ENGINE=Memory()", context, true, QueryProcessingStage::Complete); - PullingPipelineExecutor executor(io.pipeline); - Block res; - while (!res && executor.pull(res)); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - -} From 3e6cc4421e782292812a688ebbce5960256d4e8d Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 21 Sep 2022 17:27:22 +0000 Subject: [PATCH 14/28] Fix fast test --- .../queries/0_stateless/02117_show_create_table_system.reference | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index 02a0d339e3a..7653a27b34a 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -12,7 +12,6 @@ CREATE TABLE system.asynchronous_inserts `table` String, `format` String, `first_update` DateTime64(6), - `last_update` DateTime64(6), `total_bytes` UInt64, `entries.query_id` Array(String), `entries.bytes` Array(UInt64), From c99305126555919ba42aa760a10856ed6fc42217 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Mon, 26 Sep 2022 15:17:34 +0000 Subject: [PATCH 15/28] Fix deadlock and flaky test --- src/Interpreters/AsynchronousInsertQueue.cpp | 58 +++++++++++-------- src/Interpreters/AsynchronousInsertQueue.h | 8 ++- ...51_async_insert_user_level_settings.python | 11 +--- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 4681a9b6bd5..7f03bc25b62 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -138,11 +138,15 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue() LOG_TRACE(log, "Shutting down the asynchronous insertion queue"); + shutdown = true; { std::lock_guard lock(deadline_mutex); - shutdown = true; are_tasks_available.notify_one(); } + { + std::lock_guard lock(cleanup_mutex); + cleanup_can_run.notify_one(); + } assert(dump_by_first_update_thread.joinable()); dump_by_first_update_thread.join(); @@ -285,35 +289,39 @@ void AsynchronousInsertQueue::busyCheck() { while (!shutdown) { - std::unique_lock lock(deadline_mutex); - are_tasks_available.wait_for(lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]() + std::vector entries_to_flush; { + std::unique_lock deadline_lock(deadline_mutex); + are_tasks_available.wait_for(deadline_lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]() + { + if (shutdown) + return true; + + if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now()) + return true; + + return false; + }); + if (shutdown) - return true; + return; - if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now()) - return true; + const auto now = std::chrono::steady_clock::now(); - return false; - }); + while (true) + { + if (deadline_queue.empty() || deadline_queue.begin()->first > now) + break; - if (shutdown) - return; + entries_to_flush.emplace_back(deadline_queue.begin()->second); + deadline_queue.erase(deadline_queue.begin()); + } + } - const auto now = std::chrono::steady_clock::now(); - - while (true) + std::shared_lock read_lock(rwlock); + for (auto & entry : entries_to_flush) { - if (deadline_queue.empty() || deadline_queue.begin()->first > now) - break; - - - std::shared_lock read_lock(rwlock); - auto main_queue_it = deadline_queue.begin()->second; - auto & [key, elem] = *main_queue_it; - - deadline_queue.erase(deadline_queue.begin()); - + auto & [key, elem] = *entry; std::lock_guard data_lock(elem->mutex); if (!elem->data) continue; @@ -328,8 +336,8 @@ void AsynchronousInsertQueue::cleanup() while (true) { { - std::unique_lock shutdown_lock(shutdown_mutex); - shutdown_cv.wait_for(shutdown_lock, Milliseconds(cleanup_timeout), [this]() { return shutdown; }); + std::unique_lock cleanup_lock(cleanup_mutex); + cleanup_can_run.wait_for(cleanup_lock, Milliseconds(cleanup_timeout), [this]() -> bool { return shutdown; }); if (shutdown) return; diff --git a/src/Interpreters/AsynchronousInsertQueue.h b/src/Interpreters/AsynchronousInsertQueue.h index aa5f02203fa..93483301ee6 100644 --- a/src/Interpreters/AsynchronousInsertQueue.h +++ b/src/Interpreters/AsynchronousInsertQueue.h @@ -5,6 +5,7 @@ #include #include +#include #include @@ -96,8 +97,9 @@ private: mutable std::shared_mutex rwlock; Queue queue; - mutable std::mutex shutdown_mutex; - mutable std::condition_variable shutdown_cv; + /// This is needed only for using inside cleanup() function and correct signaling about shutdown + mutable std::mutex cleanup_mutex; + mutable std::condition_variable cleanup_can_run; mutable std::mutex deadline_mutex; mutable std::condition_variable are_tasks_available; @@ -118,7 +120,7 @@ private: const Milliseconds cleanup_timeout; - bool shutdown{false}; + std::atomic shutdown{false}; ThreadPool pool; /// dump the data only inside this pool. ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python index 9fe6142edc6..8c75f4898c4 100644 --- a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python @@ -13,7 +13,7 @@ from pure_http_client import ClickHouseClient client = ClickHouseClient() -NUM_RUNS = 20 +NUM_RUNS = 5 TIME_TO_WAIT_MS = 500 # The purpose of this test is to check that AsyncInsertQueue @@ -27,7 +27,7 @@ TIME_TO_WAIT_MS = 500 # and this time maybe different depending on the build type (release or with sanitizer) gen_data_query = "SELECT number + {} AS id, toString(id) AS s, range(id) AS arr FROM numbers(10) FORMAT TSV" -insert_query = "INSERT INTO t_async_insert_user_settings FORMAT TSV" +insert_query = "INSERT INTO t_async_insert_user_settings VALUES ({}, '{}', [{}])" settings = { "async_insert": 1, "wait_for_async_insert": 1, @@ -36,11 +36,6 @@ settings = { all_data = [] -for i in range(NUM_RUNS): - all_data.append( - client.query(gen_data_query.format(i * 10), settings={}, binary_result=True) - ) - client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") client.query( "CREATE TABLE t_async_insert_user_settings (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory" @@ -48,7 +43,7 @@ client.query( start_ms = time.time() * 1000.0 for i in range(NUM_RUNS): - client.query_with_data(insert_query, all_data[i], settings=settings) + client.query(query = insert_query.format(i,i,i), settings=settings) end_ms = time.time() * 1000.0 duration = end_ms - start_ms From 4ac5df25400d23234fce6dbe7bad365c1350c5a3 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 28 Sep 2022 14:52:10 +0000 Subject: [PATCH 16/28] drop all fuzzed tables --- docker/test/fuzzer/allow-nullable-key.xml | 2 +- docker/test/fuzzer/run-fuzzer.sh | 2 +- programs/client/Client.cpp | 10 +++---- src/Client/QueryFuzzer.cpp | 32 ++++++++++++++++------- src/Client/QueryFuzzer.h | 2 ++ 5 files changed, 32 insertions(+), 16 deletions(-) diff --git a/docker/test/fuzzer/allow-nullable-key.xml b/docker/test/fuzzer/allow-nullable-key.xml index 5a0c2c20e1c..331012a2254 100644 --- a/docker/test/fuzzer/allow-nullable-key.xml +++ b/docker/test/fuzzer/allow-nullable-key.xml @@ -3,4 +3,4 @@ 1 - \ No newline at end of file + diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh index fd7acf88e05..7248728864e 100755 --- a/docker/test/fuzzer/run-fuzzer.sh +++ b/docker/test/fuzzer/run-fuzzer.sh @@ -241,7 +241,7 @@ quit --receive_data_timeout_ms=10000 \ --stacktrace \ --query-fuzzer-runs=1000 \ - --create-query-fuzzer-runs=30 \ + --create-query-fuzzer-runs=50 \ --queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \ $NEW_TESTS_OPT \ > >(tail -n 100000 > fuzzer.log) \ diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 175b7a0fbd3..d44827d7bec 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -627,7 +627,7 @@ bool Client::processWithFuzzing(const String & full_query) } else if (const auto * create = orig_ast->as()) { - if (create->columns_list) + if (QueryFuzzer::isSuitableForFuzzing(*create)) this_query_runs = create_query_fuzzer_runs; else this_query_runs = 1; @@ -821,18 +821,18 @@ bool Client::processWithFuzzing(const String & full_query) } } - for (const auto & insert_query : queries_for_fuzzed_tables) + for (const auto & query : queries_for_fuzzed_tables) { std::cout << std::endl; WriteBufferFromOStream ast_buf(std::cout, 4096); - formatAST(*insert_query, ast_buf, false /*highlight*/); + formatAST(*query, ast_buf, false /*highlight*/); ast_buf.next(); std::cout << std::endl << std::endl; try { - query_to_execute = insert_query->formatForErrorMessage(); - if (auto res = processFuzzingStep(query_to_execute, insert_query)) + query_to_execute = query->formatForErrorMessage(); + if (auto res = processFuzzingStep(query_to_execute, query)) return *res; } catch (...) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 9986d81cd7b..3f786f84e2a 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -448,6 +448,11 @@ void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def) } } +bool QueryFuzzer::isSuitableForFuzzing(const ASTCreateQuery & create) +{ + return create.columns_list && create.columns_list->columns; +} + void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create) { if (create.columns_list && create.columns_list->columns) @@ -485,6 +490,8 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create) IAST::Hash hash; sip_hash.get128(hash); + + /// Save only tables with unique definition. if (created_tables_hashes.insert(hash).second) original_table_name_to_fuzzed[original_name].push_back(new_name); } @@ -502,7 +509,7 @@ void QueryFuzzer::fuzzColumnDeclaration(ASTColumnDeclaration & column) DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) { - /// Do not replace Array with not Array to often. + /// Do not replace Array/Tuple/etc. with not Array/Tuple too often. const auto * type_array = typeid_cast(type.get()); if (type_array && fuzz_rand() % 5 != 0) return std::make_shared(fuzzDataType(type_array->getNestedType())); @@ -560,14 +567,17 @@ DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) } } - size_t tmp = fuzz_rand() % 10; + size_t tmp = fuzz_rand() % 8; + if (tmp == 0) + return std::make_shared(type); + if (tmp <= 1 && type->canBeInsideNullable()) return std::make_shared(type); - if (tmp <= 3 && type->canBeInsideLowCardinality()) + if (tmp <= 2 && type->canBeInsideLowCardinality()) return std::make_shared(type); - if (tmp == 4) + if (tmp <= 3) return getRandomType(); return type; @@ -594,7 +604,7 @@ DataTypePtr QueryFuzzer::getRandomType() if (type_id == TypeIndex::DECIMAL) \ return std::make_shared>( \ DataTypeDecimal::maxPrecision(), \ - fuzz_rand() % DataTypeDecimal::maxPrecision() + 1); + (fuzz_rand() % DataTypeDecimal::maxPrecision()) + 1); DISPATCH(Decimal32) DISPATCH(Decimal64) @@ -667,7 +677,7 @@ ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query) { /// Parse query from scratch for each table instead of clone, /// to store proper pointers to inlined data, - /// which are not copies during clone. + /// which are not copied during clone. auto & query = queries.emplace_back(tryParseInsertQuery(full_query)); query->as()->setTable(fuzzed_name); } @@ -681,15 +691,19 @@ ASTs QueryFuzzer::getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query) return {}; auto table_name = drop_query.getTable(); - auto it = original_table_name_to_fuzzed.find(table_name); - if (it == original_table_name_to_fuzzed.end()) + auto it = index_of_fuzzed_table.find(table_name); + if (it == index_of_fuzzed_table.end()) return {}; ASTs queries; - for (const auto & fuzzed_name : it->second) + /// Drop all created tables, not only unique ones. + for (size_t i = 0; i < it->second; ++i) { + auto fuzzed_name = table_name + "__fuzz_" + toString(i); auto & query = queries.emplace_back(drop_query.clone()); query->as()->setTable(fuzzed_name); + /// Just in case add IF EXISTS to avoid exceptions. + query->as()->if_exists = true; } return queries; diff --git a/src/Client/QueryFuzzer.h b/src/Client/QueryFuzzer.h index ffe9fc7c91e..3771d2bc61a 100644 --- a/src/Client/QueryFuzzer.h +++ b/src/Client/QueryFuzzer.h @@ -91,6 +91,8 @@ struct QueryFuzzer void addTableLike(ASTPtr ast); void addColumnLike(ASTPtr ast); void collectFuzzInfoRecurse(ASTPtr ast); + + static bool isSuitableForFuzzing(const ASTCreateQuery & create); }; } From 34bc16cd5b4364c5531076eaeb3f18595582d4df Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 28 Sep 2022 15:55:39 +0000 Subject: [PATCH 17/28] avoid more useless errors --- programs/client/Client.cpp | 1 + src/Client/QueryFuzzer.cpp | 38 +++++++++++++++++-- src/Client/QueryFuzzer.h | 3 +- .../MergeTree/registerStorageMergeTree.cpp | 2 +- 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index d44827d7bec..9171f58b2e8 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -805,6 +805,7 @@ bool Client::processWithFuzzing(const String & full_query) // so that it doesn't influence the exit code. server_exception.reset(); client_exception.reset(); + fuzzer.notifyQueryFailed(ast_to_process); have_error = false; } else if (ast_to_process->formatForErrorMessage().size() > 500) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 3f786f84e2a..f7309695920 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -468,9 +468,21 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create) if (create.storage && create.storage->engine) { + /// Replace ReplicatedMergeTree to ordinary MergeTree + /// to avoid inconsistency of metadata in zookeeper. auto & engine_name = create.storage->engine->name; if (startsWith(engine_name, "Replicated")) + { engine_name = engine_name.substr(strlen("Replicated")); + if (auto & arguments = create.storage->engine->arguments) + { + auto & children = arguments->children; + if (children.size() <= 2) + arguments.reset(); + else + children.erase(children.begin(), children.begin() + 2); + } + } } auto full_name = create.getTable(); @@ -493,7 +505,7 @@ void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create) /// Save only tables with unique definition. if (created_tables_hashes.insert(hash).second) - original_table_name_to_fuzzed[original_name].push_back(new_name); + original_table_name_to_fuzzed[original_name].insert(new_name); } void QueryFuzzer::fuzzColumnDeclaration(ASTColumnDeclaration & column) @@ -640,8 +652,9 @@ void QueryFuzzer::fuzzTableName(ASTTableExpression & table) auto it = original_table_name_to_fuzzed.find(table_id.getTableName()); if (it != original_table_name_to_fuzzed.end() && !it->second.empty()) { - const auto & new_table_name = it->second[fuzz_rand() % it->second.size()]; - StorageID new_table_id(table_id.database_name, new_table_name); + auto new_table_name = it->second.begin(); + std::advance(new_table_name, fuzz_rand() % it->second.size()); + StorageID new_table_id(table_id.database_name, *new_table_name); table.database_and_table_name = std::make_shared(new_table_id); } } @@ -709,6 +722,25 @@ ASTs QueryFuzzer::getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query) return queries; } +void QueryFuzzer::notifyQueryFailed(ASTPtr ast) +{ + auto remove_fuzzed_table = [this](const auto & table_name) + { + auto pos = table_name.find("__fuzz_"); + if (pos != std::string::npos) + { + auto original_name = table_name.substr(0, pos); + original_table_name_to_fuzzed[original_name].erase(table_name); + } + }; + + if (const auto * create = ast->as()) + remove_fuzzed_table(create->getTable()); + + if (const auto * insert = ast->as()) + remove_fuzzed_table(insert->getTable()); +} + void QueryFuzzer::fuzz(ASTs & asts) { for (auto & ast : asts) diff --git a/src/Client/QueryFuzzer.h b/src/Client/QueryFuzzer.h index 3771d2bc61a..9afe7867dd2 100644 --- a/src/Client/QueryFuzzer.h +++ b/src/Client/QueryFuzzer.h @@ -60,7 +60,7 @@ struct QueryFuzzer std::unordered_set debug_visited_nodes; ASTPtr * debug_top_ast = nullptr; - std::unordered_map> original_table_name_to_fuzzed; + std::unordered_map> original_table_name_to_fuzzed; std::unordered_map index_of_fuzzed_table; std::set created_tables_hashes; @@ -76,6 +76,7 @@ struct QueryFuzzer DataTypePtr getRandomType(); ASTs getInsertQueriesForFuzzedTables(const String & full_query); ASTs getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query); + void notifyQueryFailed(ASTPtr ast); void replaceWithColumnLike(ASTPtr & ast); void replaceWithTableLike(ASTPtr & ast); void fuzzOrderByElement(ASTOrderByElement * elem); diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 6982521f76a..7e2d5e1727b 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -265,7 +265,7 @@ static StoragePtr create(const StorageFactory::Arguments & args) if (max_num_params == 0) msg += "no parameters"; - if (min_num_params == max_num_params) + else if (min_num_params == max_num_params) msg += fmt::format("{} parameters: {}", min_num_params, needed_params); else msg += fmt::format("{} to {} parameters: {}", min_num_params, max_num_params, needed_params); From 986f00f6d9228d2d5a989d6b463ae627d32ef00f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 28 Sep 2022 21:06:50 +0000 Subject: [PATCH 18/28] clear fuzzed tables after drop --- src/Client/QueryFuzzer.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index f7309695920..1fa7de65ce9 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -719,6 +719,9 @@ ASTs QueryFuzzer::getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query) query->as()->if_exists = true; } + index_of_fuzzed_table.erase(it); + original_table_name_to_fuzzed.erase(table_name); + return queries; } From 14119fc30673e7073917604b4d7cb0a660fd754a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 28 Sep 2022 21:10:41 +0000 Subject: [PATCH 19/28] more often mutations for complex type --- src/Client/QueryFuzzer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 1fa7de65ce9..a1e6565fe98 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -523,11 +523,11 @@ DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) { /// Do not replace Array/Tuple/etc. with not Array/Tuple too often. const auto * type_array = typeid_cast(type.get()); - if (type_array && fuzz_rand() % 5 != 0) + if (type_array && fuzz_rand() % 4 != 0) return std::make_shared(fuzzDataType(type_array->getNestedType())); const auto * type_tuple = typeid_cast(type.get()); - if (type_tuple && fuzz_rand() % 5 != 0) + if (type_tuple && fuzz_rand() % 4 != 0) { DataTypes elements; for (const auto & element : type_tuple->getElements()) @@ -539,7 +539,7 @@ DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) } const auto * type_map = typeid_cast(type.get()); - if (type_map && fuzz_rand() % 5 != 0) + if (type_map && fuzz_rand() % 4 != 0) { auto key_type = fuzzDataType(type_map->getKeyType()); auto value_type = fuzzDataType(type_map->getValueType()); From c97bec829aa5d30eca7378723bed93a506738af8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 28 Sep 2022 21:47:10 +0000 Subject: [PATCH 20/28] slightly better --- programs/client/Client.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 9171f58b2e8..cc0acfeab0b 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -846,6 +846,7 @@ bool Client::processWithFuzzing(const String & full_query) { server_exception.reset(); client_exception.reset(); + fuzzer.notifyQueryFailed(query); have_error = false; } } From a3009ed9e44c9878bc30b98174a84e50dee24ca1 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 30 Sep 2022 13:54:05 +0000 Subject: [PATCH 21/28] fix clang-tidy --- src/Client/QueryFuzzer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index a1e6565fe98..77b13eb0f48 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -611,7 +611,7 @@ DataTypePtr QueryFuzzer::getRandomType() if (type_id == TypeIndex::Array) return std::make_shared(getRandomType()); -/// NOLINTNEXTLINE +/// NOLINTBEGIN(bugprone-macro-parentheses) #define DISPATCH(DECIMAL) \ if (type_id == TypeIndex::DECIMAL) \ return std::make_shared>( \ @@ -623,6 +623,7 @@ DataTypePtr QueryFuzzer::getRandomType() DISPATCH(Decimal128) DISPATCH(Decimal256) #undef DISPATCH +/// NOLINTEND(bugprone-macro-parentheses) if (type_id == TypeIndex::FixedString) return std::make_shared(fuzz_rand() % 20); From 620b0673d0b8a4c37cbf48ff3bd22327338b6db9 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 3 Oct 2022 17:24:57 +0200 Subject: [PATCH 22/28] Update AMQP --- contrib/AMQP-CPP | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/AMQP-CPP b/contrib/AMQP-CPP index 1a6c51f4ac5..818c2d8ad96 160000 --- a/contrib/AMQP-CPP +++ b/contrib/AMQP-CPP @@ -1 +1 @@ -Subproject commit 1a6c51f4ac51ac56610fa95081bd2f349911375a +Subproject commit 818c2d8ad96a08a5d20fece7d1e1e8855a2b0860 From 3dfbd5eb153cf2db3ab2ef3f0362cae46ff4fef0 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Mon, 3 Oct 2022 18:07:19 +0000 Subject: [PATCH 23/28] Fix test --- .../02451_async_insert_user_level_settings.python | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python index 8c75f4898c4..1660fb78d2b 100644 --- a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python @@ -41,6 +41,10 @@ client.query( "CREATE TABLE t_async_insert_user_settings (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory" ) +VALUE_IN_CI = int(client.query( + "select value from system.settings where name='async_insert_busy_timeout_ms';" +)) + start_ms = time.time() * 1000.0 for i in range(NUM_RUNS): client.query(query = insert_query.format(i,i,i), settings=settings) @@ -48,8 +52,15 @@ end_ms = time.time() * 1000.0 duration = end_ms - start_ms -expected = (NUM_RUNS - 1) * TIME_TO_WAIT_MS -if duration >= expected: +expected = (NUM_RUNS - 1) * VALUE_IN_CI + +def check_inequality(duration, expected): + if TIME_TO_WAIT_MS >= VALUE_IN_CI: + return duration >= expected + else: + return duration < expected + +if check_inequality(duration, expected): print("Ok.") else: print(f"Fail. Duration: {duration}. Expected: {expected}") From 052e7d74b37fec16b6beef9a1236ee2c5744ffd7 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 4 Oct 2022 11:27:44 +0000 Subject: [PATCH 24/28] Delete test --- ...51_async_insert_user_level_settings.python | 68 ------------------- ...async_insert_user_level_settings.reference | 1 - .../02451_async_insert_user_level_settings.sh | 9 --- 3 files changed, 78 deletions(-) delete mode 100644 tests/queries/0_stateless/02451_async_insert_user_level_settings.python delete mode 100644 tests/queries/0_stateless/02451_async_insert_user_level_settings.reference delete mode 100755 tests/queries/0_stateless/02451_async_insert_user_level_settings.sh diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python deleted file mode 100644 index 1660fb78d2b..00000000000 --- a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -import time - -CURDIR = os.path.dirname(os.path.realpath(__file__)) -sys.path.insert(0, os.path.join(CURDIR, "helpers")) - -CLICKHOUSE_URL = os.environ.get("CLICKHOUSE_URL") -CLICKHOUSE_TMP = os.environ.get("CLICKHOUSE_TMP") - -from pure_http_client import ClickHouseClient - -client = ClickHouseClient() - -NUM_RUNS = 5 -TIME_TO_WAIT_MS = 500 - -# The purpose of this test is to check that AsyncInsertQueue -# respects timeouts specified in the scope of query. -# Like if we execute NUM_RUNS subsequent inserts -# then we should spend at least (NUM_RUNS - 1) * TIME_TO_WAIT_MS -# Because each query corresponds to a timepoint when it been flushed -# And time period between first and last flush is exactly such -# as descibed above. -# Note that this doesn't include the time to process the query itself -# and this time maybe different depending on the build type (release or with sanitizer) - -gen_data_query = "SELECT number + {} AS id, toString(id) AS s, range(id) AS arr FROM numbers(10) FORMAT TSV" -insert_query = "INSERT INTO t_async_insert_user_settings VALUES ({}, '{}', [{}])" -settings = { - "async_insert": 1, - "wait_for_async_insert": 1, - "async_insert_busy_timeout_ms": TIME_TO_WAIT_MS, -} - -all_data = [] - -client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") -client.query( - "CREATE TABLE t_async_insert_user_settings (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory" -) - -VALUE_IN_CI = int(client.query( - "select value from system.settings where name='async_insert_busy_timeout_ms';" -)) - -start_ms = time.time() * 1000.0 -for i in range(NUM_RUNS): - client.query(query = insert_query.format(i,i,i), settings=settings) -end_ms = time.time() * 1000.0 - -duration = end_ms - start_ms - -expected = (NUM_RUNS - 1) * VALUE_IN_CI - -def check_inequality(duration, expected): - if TIME_TO_WAIT_MS >= VALUE_IN_CI: - return duration >= expected - else: - return duration < expected - -if check_inequality(duration, expected): - print("Ok.") -else: - print(f"Fail. Duration: {duration}. Expected: {expected}") - -client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference b/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference deleted file mode 100644 index 587579af915..00000000000 --- a/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference +++ /dev/null @@ -1 +0,0 @@ -Ok. diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh b/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh deleted file mode 100755 index 3d627e273b9..00000000000 --- a/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash -# Tags: no-fasttest - -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CURDIR"/../shell_config.sh - -# We should have correct env vars from shell_config.sh to run this test -python3 "$CURDIR"/02451_async_insert_user_level_settings.python From 59605a6c7ed3d56dc0b942f90c8306c575c8b50f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 4 Oct 2022 13:40:59 +0200 Subject: [PATCH 25/28] Apply suggestions from code review Co-authored-by: Nikita Mikhaylov --- src/Client/QueryFuzzer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index 77b13eb0f48..e6905d8dc3c 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -554,7 +554,7 @@ DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) { size_t tmp = fuzz_rand() % 3; if (tmp == 0) - return type_nullable->getNestedType(); + return fuzzDataType(type_nullable->getNestedType()); if (tmp == 1) { @@ -569,7 +569,7 @@ DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type) { size_t tmp = fuzz_rand() % 3; if (tmp == 0) - return type_low_cardinality->getDictionaryType(); + return fuzzDataType(type_low_cardinality->getDictionaryType()); if (tmp == 1) { From aacb314a410c6c08b23a9c9168dbb87f9ce4de29 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 4 Oct 2022 16:15:33 +0200 Subject: [PATCH 26/28] Update cmake --- contrib/amqpcpp-cmake/CMakeLists.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/contrib/amqpcpp-cmake/CMakeLists.txt b/contrib/amqpcpp-cmake/CMakeLists.txt index 6f6a0188e6f..c65c8e9ce31 100644 --- a/contrib/amqpcpp-cmake/CMakeLists.txt +++ b/contrib/amqpcpp-cmake/CMakeLists.txt @@ -4,6 +4,11 @@ if (NOT ENABLE_AMQPCPP) message(STATUS "Not using AMQP-CPP") return() endif() +if (OS_DARWIN AND ARCH_AARCH64) + message(STATUS "Not using AMQP-CPP because libuv is disabled") + return() +endif() + # can be removed once libuv build on MacOS with GCC is possible if (NOT TARGET ch_contrib::uv) From 7935337513b16c213183a576f295505c0645f558 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 4 Oct 2022 19:59:08 +0200 Subject: [PATCH 27/28] Update CMakeLists.txt --- contrib/amqpcpp-cmake/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/amqpcpp-cmake/CMakeLists.txt b/contrib/amqpcpp-cmake/CMakeLists.txt index c65c8e9ce31..631f40e6ed3 100644 --- a/contrib/amqpcpp-cmake/CMakeLists.txt +++ b/contrib/amqpcpp-cmake/CMakeLists.txt @@ -4,7 +4,7 @@ if (NOT ENABLE_AMQPCPP) message(STATUS "Not using AMQP-CPP") return() endif() -if (OS_DARWIN AND ARCH_AARCH64) +if (OS_FREEBSD) message(STATUS "Not using AMQP-CPP because libuv is disabled") return() endif() From 5341bf445a64a085b1930ed04c9064b380383d01 Mon Sep 17 00:00:00 2001 From: Denny Crane Date: Tue, 4 Oct 2022 15:08:29 -0300 Subject: [PATCH 28/28] Update settings.md --- docs/en/operations/settings/settings.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index dc74b607289..bb8198b8b72 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -668,7 +668,7 @@ log_query_views=1 ## log_formatted_queries {#settings-log-formatted-queries} -Allows to log formatted queries to the [system.query_log](../../operations/system-tables/query_log.md) system table. +Allows to log formatted queries to the [system.query_log](../../operations/system-tables/query_log.md) system table (populates `formatted_query` column in the [system.query_log](../../operations/system-tables/query_log.md)). Possible values: