Merge branch 'master' into patch-1

This commit is contained in:
Nikolay Degterinsky 2022-10-05 16:18:08 +02:00 committed by GitHub
commit 61ec576900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 572 additions and 151 deletions

2
contrib/AMQP-CPP vendored

@ -1 +1 @@
Subproject commit 1a6c51f4ac51ac56610fa95081bd2f349911375a
Subproject commit 818c2d8ad96a08a5d20fece7d1e1e8855a2b0860

View File

@ -4,6 +4,11 @@ if (NOT ENABLE_AMQPCPP)
message(STATUS "Not using AMQP-CPP")
return()
endif()
if (OS_FREEBSD)
message(STATUS "Not using AMQP-CPP because libuv is disabled")
return()
endif()
# can be removed once libuv build on MacOS with GCC is possible
if (NOT TARGET ch_contrib::uv)

View File

@ -0,0 +1,6 @@
<clickhouse>
<!-- Allow nullable key to avoid errors while fuzzing definitions of tables -->
<merge_tree>
<allow_nullable_key>1</allow_nullable_key>
</merge_tree>
</clickhouse>

View File

@ -94,6 +94,7 @@ function configure
# TODO figure out which ones are needed
cp -av --dereference "$repo_dir"/tests/config/config.d/listen.xml db/config.d
cp -av --dereference "$script_dir"/query-fuzzer-tweaks-users.xml db/users.d
cp -av --dereference "$script_dir"/allow-nullable-key.xml db/config.d
cat > db/config.d/core.xml <<EOL
<clickhouse>
@ -240,6 +241,7 @@ quit
--receive_data_timeout_ms=10000 \
--stacktrace \
--query-fuzzer-runs=1000 \
--create-query-fuzzer-runs=50 \
--queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
$NEW_TESTS_OPT \
> >(tail -n 100000 > fuzzer.log) \

View File

@ -668,7 +668,7 @@ log_query_views=1
## log_formatted_queries {#settings-log-formatted-queries}
Allows to log formatted queries to the [system.query_log](../../operations/system-tables/query_log.md) system table.
Allows to log formatted queries to the [system.query_log](../../operations/system-tables/query_log.md) system table (populates `formatted_query` column in the [system.query_log](../../operations/system-tables/query_log.md)).
Possible values:

View File

@ -12,6 +12,7 @@
#include <string>
#include "Client.h"
#include "Core/Protocol.h"
#include "Parsers/formatAST.h"
#include <base/find_symbols.h>
@ -514,6 +515,66 @@ static bool queryHasWithClause(const IAST & ast)
return false;
}
std::optional<bool> Client::processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query)
{
processParsedSingleQuery(query_to_execute, query_to_execute, parsed_query);
const auto * exception = server_exception ? server_exception.get() : client_exception.get();
// Sometimes you may get TOO_DEEP_RECURSION from the server,
// and TOO_DEEP_RECURSION should not fail the fuzzer check.
if (have_error && exception->code() == ErrorCodes::TOO_DEEP_RECURSION)
{
have_error = false;
server_exception.reset();
client_exception.reset();
return true;
}
if (have_error)
{
fmt::print(stderr, "Error on processing query '{}': {}\n", parsed_query->formatForErrorMessage(), exception->message());
// Try to reconnect after errors, for two reasons:
// 1. We might not have realized that the server died, e.g. if
// it sent us a <Fatal> trace and closed connection properly.
// 2. The connection might have gotten into a wrong state and
// the next query will get false positive about
// "Unknown packet from server".
try
{
connection->forceConnected(connection_parameters.timeouts);
}
catch (...)
{
// Just report it, we'll terminate below.
fmt::print(stderr,
"Error while reconnecting to the server: {}\n",
getCurrentExceptionMessage(true));
// The reconnection might fail, but we'll still be connected
// in the sense of `connection->isConnected() = true`,
// in case when the requested database doesn't exist.
// Disconnect manually now, so that the following code doesn't
// have any doubts, and the connection state is predictable.
connection->disconnect();
}
}
if (!connection->isConnected())
{
// Probably the server is dead because we found an assertion
// failure. Fail fast.
fmt::print(stderr, "Lost connection to the server.\n");
// Print the changed settings because they might be needed to
// reproduce the error.
printChangedSettings();
return false;
}
return std::nullopt;
}
/// Returns false when server is not available.
bool Client::processWithFuzzing(const String & full_query)
@ -558,18 +619,33 @@ bool Client::processWithFuzzing(const String & full_query)
// - SET -- The time to fuzz the settings has not yet come
// (see comments in Client/QueryFuzzer.cpp)
size_t this_query_runs = query_fuzzer_runs;
if (orig_ast->as<ASTInsertQuery>() ||
orig_ast->as<ASTCreateQuery>() ||
orig_ast->as<ASTDropQuery>() ||
orig_ast->as<ASTSetQuery>())
ASTs queries_for_fuzzed_tables;
if (orig_ast->as<ASTSetQuery>())
{
this_query_runs = 1;
}
else if (const auto * create = orig_ast->as<ASTCreateQuery>())
{
if (QueryFuzzer::isSuitableForFuzzing(*create))
this_query_runs = create_query_fuzzer_runs;
else
this_query_runs = 1;
}
else if (const auto * insert = orig_ast->as<ASTInsertQuery>())
{
this_query_runs = 1;
queries_for_fuzzed_tables = fuzzer.getInsertQueriesForFuzzedTables(full_query);
}
else if (const auto * drop = orig_ast->as<ASTDropQuery>())
{
this_query_runs = 1;
queries_for_fuzzed_tables = fuzzer.getDropQueriesForFuzzedTables(*drop);
}
String query_to_execute;
ASTPtr parsed_query;
ASTPtr fuzz_base = orig_ast;
for (size_t fuzz_step = 0; fuzz_step < this_query_runs; ++fuzz_step)
{
fmt::print(stderr, "Fuzzing step {} out of {}\n", fuzz_step, this_query_runs);
@ -630,9 +706,9 @@ bool Client::processWithFuzzing(const String & full_query)
continue;
}
parsed_query = ast_to_process;
query_to_execute = parsed_query->formatForErrorMessage();
processParsedSingleQuery(full_query, query_to_execute, parsed_query);
query_to_execute = ast_to_process->formatForErrorMessage();
if (auto res = processFuzzingStep(query_to_execute, ast_to_process))
return *res;
}
catch (...)
{
@ -645,60 +721,6 @@ bool Client::processWithFuzzing(const String & full_query)
have_error = true;
}
const auto * exception = server_exception ? server_exception.get() : client_exception.get();
// Sometimes you may get TOO_DEEP_RECURSION from the server,
// and TOO_DEEP_RECURSION should not fail the fuzzer check.
if (have_error && exception->code() == ErrorCodes::TOO_DEEP_RECURSION)
{
have_error = false;
server_exception.reset();
client_exception.reset();
return true;
}
if (have_error)
{
fmt::print(stderr, "Error on processing query '{}': {}\n", ast_to_process->formatForErrorMessage(), exception->message());
// Try to reconnect after errors, for two reasons:
// 1. We might not have realized that the server died, e.g. if
// it sent us a <Fatal> trace and closed connection properly.
// 2. The connection might have gotten into a wrong state and
// the next query will get false positive about
// "Unknown packet from server".
try
{
connection->forceConnected(connection_parameters.timeouts);
}
catch (...)
{
// Just report it, we'll terminate below.
fmt::print(stderr,
"Error while reconnecting to the server: {}\n",
getCurrentExceptionMessage(true));
// The reconnection might fail, but we'll still be connected
// in the sense of `connection->isConnected() = true`,
// in case when the requested database doesn't exist.
// Disconnect manually now, so that the following code doesn't
// have any doubts, and the connection state is predictable.
connection->disconnect();
}
}
if (!connection->isConnected())
{
// Probably the server is dead because we found an assertion
// failure. Fail fast.
fmt::print(stderr, "Lost connection to the server.\n");
// Print the changed settings because they might be needed to
// reproduce the error.
printChangedSettings();
return false;
}
// Check that after the query is formatted, we can parse it back,
// format again and get the same result. Unfortunately, we can't
// compare the ASTs, which would be more sensitive to errors. This
@ -729,13 +751,12 @@ bool Client::processWithFuzzing(const String & full_query)
// query, but second and third.
// If you have to add any more workarounds to this check, just remove
// it altogether, it's not so useful.
if (parsed_query && !have_error && !queryHasWithClause(*parsed_query))
if (ast_to_process && !have_error && !queryHasWithClause(*ast_to_process))
{
ASTPtr ast_2;
try
{
const auto * tmp_pos = query_to_execute.c_str();
ast_2 = parseQuery(tmp_pos, tmp_pos + query_to_execute.size(), false /* allow_multi_statements */);
}
catch (Exception & e)
@ -762,7 +783,7 @@ bool Client::processWithFuzzing(const String & full_query)
"Got the following (different) text after formatting the fuzzed query and parsing it back:\n'{}'\n, expected:\n'{}'\n",
text_3, text_2);
fmt::print(stderr, "In more detail:\n");
fmt::print(stderr, "AST-1 (generated by fuzzer):\n'{}'\n", parsed_query->dumpTree());
fmt::print(stderr, "AST-1 (generated by fuzzer):\n'{}'\n", ast_to_process->dumpTree());
fmt::print(stderr, "Text-1 (AST-1 formatted):\n'{}'\n", query_to_execute);
fmt::print(stderr, "AST-2 (Text-1 parsed):\n'{}'\n", ast_2->dumpTree());
fmt::print(stderr, "Text-2 (AST-2 formatted):\n'{}'\n", text_2);
@ -784,6 +805,7 @@ bool Client::processWithFuzzing(const String & full_query)
// so that it doesn't influence the exit code.
server_exception.reset();
client_exception.reset();
fuzzer.notifyQueryFailed(ast_to_process);
have_error = false;
}
else if (ast_to_process->formatForErrorMessage().size() > 500)
@ -800,6 +822,35 @@ bool Client::processWithFuzzing(const String & full_query)
}
}
for (const auto & query : queries_for_fuzzed_tables)
{
std::cout << std::endl;
WriteBufferFromOStream ast_buf(std::cout, 4096);
formatAST(*query, ast_buf, false /*highlight*/);
ast_buf.next();
std::cout << std::endl << std::endl;
try
{
query_to_execute = query->formatForErrorMessage();
if (auto res = processFuzzingStep(query_to_execute, query))
return *res;
}
catch (...)
{
client_exception = std::make_unique<Exception>(getCurrentExceptionMessage(print_stack_trace), getCurrentExceptionCode());
have_error = true;
}
if (have_error)
{
server_exception.reset();
client_exception.reset();
fuzzer.notifyQueryFailed(query);
have_error = false;
}
}
return true;
}
@ -834,6 +885,7 @@ void Client::addOptions(OptionsDescription & options_description)
("compression", po::value<bool>(), "enable or disable compression (enabled by default for remote communication and disabled for localhost communication).")
("query-fuzzer-runs", po::value<int>()->default_value(0), "After executing every SELECT query, do random mutations in it and run again specified number of times. This is used for testing to discover unexpected corner cases.")
("create-query-fuzzer-runs", po::value<int>()->default_value(0), "")
("interleave-queries-file", po::value<std::vector<std::string>>()->multitoken(),
"file path with queries to execute before every file from 'queries-file'; multiple files can be specified (--queries-file file1 file2...); this is needed to enable more aggressive fuzzing of newly added tests (see 'query-fuzzer-runs' option)")
@ -994,6 +1046,17 @@ void Client::processOptions(const OptionsDescription & options_description,
ignore_error = true;
}
if ((create_query_fuzzer_runs = options["create-query-fuzzer-runs"].as<int>()))
{
// Fuzzer implies multiquery.
config().setBool("multiquery", true);
// Ignore errors in parsing queries.
config().setBool("ignore-error", true);
global_context->setSetting("allow_suspicious_low_cardinality_types", true);
ignore_error = true;
}
if (options.count("opentelemetry-traceparent"))
{
String traceparent = options["opentelemetry-traceparent"].as<std::string>();

View File

@ -17,6 +17,7 @@ public:
protected:
bool processWithFuzzing(const String & full_query) override;
std::optional<bool> processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query);
void connect() override;

View File

@ -1418,8 +1418,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
global_context,
settings.async_insert_threads,
settings.async_insert_max_data_size,
AsynchronousInsertQueue::Timeout{.busy = settings.async_insert_busy_timeout_ms, .stale = settings.async_insert_stale_timeout_ms}));
settings.async_insert_cleanup_timeout_ms));
/// Size of cache for marks (index of MergeTree family of tables).
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);

View File

@ -251,6 +251,7 @@ protected:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
int create_query_fuzzer_runs = 0;
struct
{

View File

@ -1,4 +1,22 @@
#include "QueryFuzzer.h"
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/IDataType.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ParserDataType.h>
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <unordered_set>
@ -430,6 +448,303 @@ void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def)
}
}
bool QueryFuzzer::isSuitableForFuzzing(const ASTCreateQuery & create)
{
return create.columns_list && create.columns_list->columns;
}
void QueryFuzzer::fuzzCreateQuery(ASTCreateQuery & create)
{
if (create.columns_list && create.columns_list->columns)
{
for (auto & ast : create.columns_list->columns->children)
{
if (auto * column = ast->as<ASTColumnDeclaration>())
{
fuzzColumnDeclaration(*column);
}
}
}
if (create.storage && create.storage->engine)
{
/// Replace ReplicatedMergeTree to ordinary MergeTree
/// to avoid inconsistency of metadata in zookeeper.
auto & engine_name = create.storage->engine->name;
if (startsWith(engine_name, "Replicated"))
{
engine_name = engine_name.substr(strlen("Replicated"));
if (auto & arguments = create.storage->engine->arguments)
{
auto & children = arguments->children;
if (children.size() <= 2)
arguments.reset();
else
children.erase(children.begin(), children.begin() + 2);
}
}
}
auto full_name = create.getTable();
auto original_name = full_name.substr(0, full_name.find("__fuzz_"));
size_t index = index_of_fuzzed_table[original_name]++;
auto new_name = original_name + "__fuzz_" + toString(index);
create.setTable(new_name);
SipHash sip_hash;
sip_hash.update(original_name);
if (create.columns_list)
create.columns_list->updateTreeHash(sip_hash);
if (create.storage)
create.storage->updateTreeHash(sip_hash);
IAST::Hash hash;
sip_hash.get128(hash);
/// Save only tables with unique definition.
if (created_tables_hashes.insert(hash).second)
original_table_name_to_fuzzed[original_name].insert(new_name);
}
void QueryFuzzer::fuzzColumnDeclaration(ASTColumnDeclaration & column)
{
if (column.type)
{
auto data_type = fuzzDataType(DataTypeFactory::instance().get(column.type));
ParserDataType parser;
column.type = parseQuery(parser, data_type->getName(), DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
}
}
DataTypePtr QueryFuzzer::fuzzDataType(DataTypePtr type)
{
/// Do not replace Array/Tuple/etc. with not Array/Tuple too often.
const auto * type_array = typeid_cast<const DataTypeArray *>(type.get());
if (type_array && fuzz_rand() % 4 != 0)
return std::make_shared<DataTypeArray>(fuzzDataType(type_array->getNestedType()));
const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type.get());
if (type_tuple && fuzz_rand() % 4 != 0)
{
DataTypes elements;
for (const auto & element : type_tuple->getElements())
elements.push_back(fuzzDataType(element));
return type_tuple->haveExplicitNames()
? std::make_shared<DataTypeTuple>(elements, type_tuple->getElementNames())
: std::make_shared<DataTypeTuple>(elements);
}
const auto * type_map = typeid_cast<const DataTypeMap *>(type.get());
if (type_map && fuzz_rand() % 4 != 0)
{
auto key_type = fuzzDataType(type_map->getKeyType());
auto value_type = fuzzDataType(type_map->getValueType());
if (!DataTypeMap::checkKeyType(key_type))
key_type = type_map->getKeyType();
return std::make_shared<DataTypeMap>(key_type, value_type);
}
const auto * type_nullable = typeid_cast<const DataTypeNullable *>(type.get());
if (type_nullable)
{
size_t tmp = fuzz_rand() % 3;
if (tmp == 0)
return fuzzDataType(type_nullable->getNestedType());
if (tmp == 1)
{
auto nested_type = fuzzDataType(type_nullable->getNestedType());
if (nested_type->canBeInsideNullable())
return std::make_shared<DataTypeNullable>(nested_type);
}
}
const auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(type.get());
if (type_low_cardinality)
{
size_t tmp = fuzz_rand() % 3;
if (tmp == 0)
return fuzzDataType(type_low_cardinality->getDictionaryType());
if (tmp == 1)
{
auto nested_type = fuzzDataType(type_low_cardinality->getDictionaryType());
if (nested_type->canBeInsideLowCardinality())
return std::make_shared<DataTypeLowCardinality>(nested_type);
}
}
size_t tmp = fuzz_rand() % 8;
if (tmp == 0)
return std::make_shared<DataTypeArray>(type);
if (tmp <= 1 && type->canBeInsideNullable())
return std::make_shared<DataTypeNullable>(type);
if (tmp <= 2 && type->canBeInsideLowCardinality())
return std::make_shared<DataTypeLowCardinality>(type);
if (tmp <= 3)
return getRandomType();
return type;
}
DataTypePtr QueryFuzzer::getRandomType()
{
auto type_id = static_cast<TypeIndex>(fuzz_rand() % static_cast<size_t>(TypeIndex::Tuple) + 1);
if (type_id == TypeIndex::Tuple)
{
size_t tuple_size = fuzz_rand() % 6 + 1;
DataTypes elements;
for (size_t i = 0; i < tuple_size; ++i)
elements.push_back(getRandomType());
return std::make_shared<DataTypeTuple>(elements);
}
if (type_id == TypeIndex::Array)
return std::make_shared<DataTypeArray>(getRandomType());
/// NOLINTBEGIN(bugprone-macro-parentheses)
#define DISPATCH(DECIMAL) \
if (type_id == TypeIndex::DECIMAL) \
return std::make_shared<DataTypeDecimal<DECIMAL>>( \
DataTypeDecimal<DECIMAL>::maxPrecision(), \
(fuzz_rand() % DataTypeDecimal<DECIMAL>::maxPrecision()) + 1);
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Decimal256)
#undef DISPATCH
/// NOLINTEND(bugprone-macro-parentheses)
if (type_id == TypeIndex::FixedString)
return std::make_shared<DataTypeFixedString>(fuzz_rand() % 20);
if (type_id == TypeIndex::Enum8)
return std::make_shared<DataTypeUInt8>();
if (type_id == TypeIndex::Enum16)
return std::make_shared<DataTypeUInt16>();
return DataTypeFactory::instance().get(String(magic_enum::enum_name(type_id)));
}
void QueryFuzzer::fuzzTableName(ASTTableExpression & table)
{
if (!table.database_and_table_name || fuzz_rand() % 3 == 0)
return;
const auto * identifier = table.database_and_table_name->as<ASTTableIdentifier>();
if (!identifier)
return;
auto table_id = identifier->getTableId();
if (table_id.empty())
return;
auto it = original_table_name_to_fuzzed.find(table_id.getTableName());
if (it != original_table_name_to_fuzzed.end() && !it->second.empty())
{
auto new_table_name = it->second.begin();
std::advance(new_table_name, fuzz_rand() % it->second.size());
StorageID new_table_id(table_id.database_name, *new_table_name);
table.database_and_table_name = std::make_shared<ASTTableIdentifier>(new_table_id);
}
}
static ASTPtr tryParseInsertQuery(const String & full_query)
{
const char * pos = full_query.data();
const char * end = full_query.data() + full_query.size();
ParserInsertQuery parser(end, false);
String message;
return tryParseQuery(parser, pos, end, message, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH);
}
ASTs QueryFuzzer::getInsertQueriesForFuzzedTables(const String & full_query)
{
auto parsed_query = tryParseInsertQuery(full_query);
if (!parsed_query)
return {};
const auto & insert = *parsed_query->as<ASTInsertQuery>();
if (!insert.table)
return {};
auto table_name = insert.getTable();
auto it = original_table_name_to_fuzzed.find(table_name);
if (it == original_table_name_to_fuzzed.end())
return {};
ASTs queries;
for (const auto & fuzzed_name : it->second)
{
/// Parse query from scratch for each table instead of clone,
/// to store proper pointers to inlined data,
/// which are not copied during clone.
auto & query = queries.emplace_back(tryParseInsertQuery(full_query));
query->as<ASTInsertQuery>()->setTable(fuzzed_name);
}
return queries;
}
ASTs QueryFuzzer::getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query)
{
if (drop_query.kind != ASTDropQuery::Drop)
return {};
auto table_name = drop_query.getTable();
auto it = index_of_fuzzed_table.find(table_name);
if (it == index_of_fuzzed_table.end())
return {};
ASTs queries;
/// Drop all created tables, not only unique ones.
for (size_t i = 0; i < it->second; ++i)
{
auto fuzzed_name = table_name + "__fuzz_" + toString(i);
auto & query = queries.emplace_back(drop_query.clone());
query->as<ASTDropQuery>()->setTable(fuzzed_name);
/// Just in case add IF EXISTS to avoid exceptions.
query->as<ASTDropQuery>()->if_exists = true;
}
index_of_fuzzed_table.erase(it);
original_table_name_to_fuzzed.erase(table_name);
return queries;
}
void QueryFuzzer::notifyQueryFailed(ASTPtr ast)
{
auto remove_fuzzed_table = [this](const auto & table_name)
{
auto pos = table_name.find("__fuzz_");
if (pos != std::string::npos)
{
auto original_name = table_name.substr(0, pos);
original_table_name_to_fuzzed[original_name].erase(table_name);
}
};
if (const auto * create = ast->as<ASTCreateQuery>())
remove_fuzzed_table(create->getTable());
if (const auto * insert = ast->as<ASTInsertQuery>())
remove_fuzzed_table(insert->getTable());
}
void QueryFuzzer::fuzz(ASTs & asts)
{
for (auto & ast : asts)
@ -497,6 +812,7 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
}
else if (auto * table_expr = typeid_cast<ASTTableExpression *>(ast.get()))
{
fuzzTableName(*table_expr);
fuzz(table_expr->children);
}
else if (auto * expr_list = typeid_cast<ASTExpressionList *>(ast.get()))
@ -563,6 +879,10 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
literal->value = fuzzField(literal->value);
}
}
else if (auto * create_query = typeid_cast<ASTCreateQuery *>(ast.get()))
{
fuzzCreateQuery(*create_query);
}
else
{
fuzz(ast->children);

View File

@ -1,5 +1,6 @@
#pragma once
#include <DataTypes/IDataType.h>
#include <unordered_set>
#include <unordered_map>
#include <vector>
@ -16,6 +17,11 @@ namespace DB
class ASTExpressionList;
class ASTOrderByElement;
class ASTCreateQuery;
class ASTInsertQuery;
class ASTColumnDeclaration;
class ASTDropQuery;
struct ASTTableExpression;
struct ASTWindowDefinition;
/*
@ -54,6 +60,9 @@ struct QueryFuzzer
std::unordered_set<const IAST *> debug_visited_nodes;
ASTPtr * debug_top_ast = nullptr;
std::unordered_map<std::string, std::unordered_set<std::string>> original_table_name_to_fuzzed;
std::unordered_map<std::string, size_t> index_of_fuzzed_table;
std::set<IAST::Hash> created_tables_hashes;
// This is the only function you have to call -- it will modify the passed
// ASTPtr to point to new AST with some random changes.
@ -63,18 +72,28 @@ struct QueryFuzzer
Field getRandomField(int type);
Field fuzzField(Field field);
ASTPtr getRandomColumnLike();
DataTypePtr fuzzDataType(DataTypePtr type);
DataTypePtr getRandomType();
ASTs getInsertQueriesForFuzzedTables(const String & full_query);
ASTs getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query);
void notifyQueryFailed(ASTPtr ast);
void replaceWithColumnLike(ASTPtr & ast);
void replaceWithTableLike(ASTPtr & ast);
void fuzzOrderByElement(ASTOrderByElement * elem);
void fuzzOrderByList(IAST * ast);
void fuzzColumnLikeExpressionList(IAST * ast);
void fuzzWindowFrame(ASTWindowDefinition & def);
void fuzzCreateQuery(ASTCreateQuery & create);
void fuzzColumnDeclaration(ASTColumnDeclaration & column);
void fuzzTableName(ASTTableExpression & table);
void fuzz(ASTs & asts);
void fuzz(ASTPtr & ast);
void collectFuzzInfoMain(ASTPtr ast);
void addTableLike(ASTPtr ast);
void addColumnLike(ASTPtr ast);
void collectFuzzInfoRecurse(ASTPtr ast);
static bool isSuitableForFuzzing(const ASTCreateQuery & create);
};
}

View File

@ -596,9 +596,9 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 100000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
M(Milliseconds, async_insert_cleanup_timeout_ms, 1000, "Time to wait before each iteration of cleaning up buffers for INSERT queries which don't appear anymore. Only has meaning at server startup.", 0) \
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
@ -671,6 +671,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
MAKE_OBSOLETE(M, Bool, allow_experimental_database_atomic, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_bigint_types, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_window_functions, true) \
MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \
MAKE_OBSOLETE(M, HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT) \
MAKE_OBSOLETE(M, Bool, database_replicated_ddl_output, true) \
MAKE_OBSOLETE(M, UInt64, replication_alter_columns_timeout, 60) \

View File

@ -120,11 +120,9 @@ std::exception_ptr AsynchronousInsertQueue::InsertData::Entry::getException() co
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size_, const Timeout & timeouts)
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout_)
: WithContext(context_)
, max_data_size(max_data_size_)
, busy_timeout(timeouts.busy)
, stale_timeout(timeouts.stale)
, cleanup_timeout(cleanup_timeout_)
, pool(pool_size)
, dump_by_first_update_thread(&AsynchronousInsertQueue::busyCheck, this)
, cleanup_thread(&AsynchronousInsertQueue::cleanup, this)
@ -132,9 +130,6 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo
using namespace std::chrono;
assert(pool_size);
if (stale_timeout > 0ms)
dump_by_last_update_thread = ThreadFromGlobalPool(&AsynchronousInsertQueue::staleCheck, this);
}
AsynchronousInsertQueue::~AsynchronousInsertQueue()
@ -143,10 +138,14 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
shutdown = true;
{
std::lock_guard lock(shutdown_mutex);
shutdown = true;
shutdown_cv.notify_all();
std::lock_guard lock(deadline_mutex);
are_tasks_available.notify_one();
}
{
std::lock_guard lock(cleanup_mutex);
cleanup_can_run.notify_one();
}
assert(dump_by_first_update_thread.joinable());
@ -155,9 +154,6 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
assert(cleanup_thread.joinable());
cleanup_thread.join();
if (dump_by_last_update_thread.joinable())
dump_by_last_update_thread.join();
pool.wait();
std::lock_guard lock(currently_processing_mutex);
@ -234,12 +230,18 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
std::lock_guard data_lock(data_mutex);
if (!data)
data = std::make_unique<InsertData>();
{
auto now = std::chrono::steady_clock::now();
data = std::make_unique<InsertData>(now);
std::lock_guard lock(deadline_mutex);
deadline_queue.insert({now + Milliseconds{it->first.settings.async_insert_busy_timeout_ms}, it});
are_tasks_available.notify_one();
}
size_t entry_data_size = entry->bytes.size();
data->size += entry_data_size;
data->last_update = std::chrono::steady_clock::now();
data->entries.emplace_back(entry);
{
@ -250,7 +252,10 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size, queryToString(it->first.query));
if (data->size > max_data_size)
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context!
/// It works, because queries with the same set of settings are already grouped together.
if (data->size > it->first.settings.async_insert_max_data_size)
scheduleDataProcessingJob(it->first, std::move(data), getContext());
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
@ -282,56 +287,62 @@ void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, co
void AsynchronousInsertQueue::busyCheck()
{
auto timeout = busy_timeout;
while (!waitForShutdown(timeout))
while (!shutdown)
{
/// TODO: use priority queue instead of raw unsorted queue.
timeout = busy_timeout;
std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue)
std::vector<QueueIterator> entries_to_flush;
{
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
continue;
std::unique_lock deadline_lock(deadline_mutex);
are_tasks_available.wait_for(deadline_lock, Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [this]()
{
if (shutdown)
return true;
auto lag = std::chrono::steady_clock::now() - elem->data->first_update;
if (lag >= busy_timeout)
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
else
timeout = std::min(timeout, std::chrono::ceil<std::chrono::milliseconds>(busy_timeout - lag));
if (!deadline_queue.empty() && deadline_queue.begin()->first < std::chrono::steady_clock::now())
return true;
return false;
});
if (shutdown)
return;
const auto now = std::chrono::steady_clock::now();
while (true)
{
if (deadline_queue.empty() || deadline_queue.begin()->first > now)
break;
entries_to_flush.emplace_back(deadline_queue.begin()->second);
deadline_queue.erase(deadline_queue.begin());
}
}
}
}
void AsynchronousInsertQueue::staleCheck()
{
while (!waitForShutdown(stale_timeout))
{
std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue)
for (auto & entry : entries_to_flush)
{
auto & [key, elem] = *entry;
std::lock_guard data_lock(elem->mutex);
if (!elem->data)
continue;
auto lag = std::chrono::steady_clock::now() - elem->data->last_update;
if (lag >= stale_timeout)
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
scheduleDataProcessingJob(key, std::move(elem->data), getContext());
}
}
}
void AsynchronousInsertQueue::cleanup()
{
/// Do not run cleanup too often,
/// because it holds exclusive lock.
auto timeout = busy_timeout * 5;
while (!waitForShutdown(timeout))
while (true)
{
{
std::unique_lock cleanup_lock(cleanup_mutex);
cleanup_can_run.wait_for(cleanup_lock, Milliseconds(cleanup_timeout), [this]() -> bool { return shutdown; });
if (shutdown)
return;
}
std::vector<InsertQuery> keys_to_remove;
{
@ -383,11 +394,6 @@ void AsynchronousInsertQueue::cleanup()
}
}
bool AsynchronousInsertQueue::waitForShutdown(const Milliseconds & timeout)
{
std::unique_lock shutdown_lock(shutdown_mutex);
return shutdown_cv.wait_for(shutdown_lock, timeout, [this]() { return shutdown; });
}
// static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)

View File

@ -5,6 +5,7 @@
#include <Core/Settings.h>
#include <Poco/Logger.h>
#include <atomic>
#include <unordered_map>
@ -18,14 +19,7 @@ class AsynchronousInsertQueue : public WithContext
public:
using Milliseconds = std::chrono::milliseconds;
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout
{
Milliseconds busy;
Milliseconds stale;
};
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, Milliseconds cleanup_timeout);
~AsynchronousInsertQueue();
void push(ASTPtr query, ContextPtr query_context);
@ -69,6 +63,10 @@ private:
std::exception_ptr exception;
};
explicit InsertData(std::chrono::steady_clock::time_point now)
: first_update(now)
{}
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;
@ -76,11 +74,7 @@ private:
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update = std::chrono::steady_clock::now();
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
std::chrono::time_point<std::chrono::steady_clock> last_update;
std::chrono::time_point<std::chrono::steady_clock> first_update;
};
using InsertDataPtr = std::unique_ptr<InsertData>;
@ -96,10 +90,21 @@ private:
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<Container>, InsertQuery::Hash>;
using QueueIterator = Queue::iterator;
/// Ordered container
using DeadlineQueue = std::map<std::chrono::steady_clock::time_point, QueueIterator>;
mutable std::shared_mutex rwlock;
Queue queue;
/// This is needed only for using inside cleanup() function and correct signaling about shutdown
mutable std::mutex cleanup_mutex;
mutable std::condition_variable cleanup_can_run;
mutable std::mutex deadline_mutex;
mutable std::condition_variable are_tasks_available;
DeadlineQueue deadline_queue;
using QueryIdToEntry = std::unordered_map<String, InsertData::EntryPtr>;
mutable std::mutex currently_processing_mutex;
QueryIdToEntry currently_processing_queries;
@ -109,25 +114,21 @@ private:
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
///
/// During processing incoming INSERT queries we can also check whether the maximum size of data in buffer is reached (async_insert_max_data_size setting)
/// If so, then again we dump the data.
const size_t max_data_size; /// in bytes
const Milliseconds busy_timeout;
const Milliseconds stale_timeout;
const Milliseconds cleanup_timeout;
std::mutex shutdown_mutex;
std::condition_variable shutdown_cv;
bool shutdown{false};
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup()
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
void busyCheck();
void staleCheck();
void cleanup();
/// Should be called with shared or exclusively locked 'rwlock'.

View File

@ -265,7 +265,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (max_num_params == 0)
msg += "no parameters";
if (min_num_params == max_num_params)
else if (min_num_params == max_num_params)
msg += fmt::format("{} parameters: {}", min_num_params, needed_params);
else
msg += fmt::format("{} to {} parameters: {}", min_num_params, max_num_params, needed_params);

View File

@ -24,7 +24,6 @@ NamesAndTypesList StorageSystemAsynchronousInserts::getNamesAndTypes()
{"table", std::make_shared<DataTypeString>()},
{"format", std::make_shared<DataTypeString>()},
{"first_update", std::make_shared<DataTypeDateTime64>(TIME_SCALE)},
{"last_update", std::make_shared<DataTypeDateTime64>(TIME_SCALE)},
{"total_bytes", std::make_shared<DataTypeUInt64>()},
{"entries.query_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"entries.bytes", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
@ -77,7 +76,6 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co
res_columns[i++]->insert(insert_query.format);
res_columns[i++]->insert(time_in_microseconds(elem->data->first_update));
res_columns[i++]->insert(time_in_microseconds(elem->data->last_update));
res_columns[i++]->insert(elem->data->size);
Array arr_query_id;

View File

@ -12,7 +12,6 @@ CREATE TABLE system.asynchronous_inserts
`table` String,
`format` String,
`first_update` DateTime64(6),
`last_update` DateTime64(6),
`total_bytes` UInt64,
`entries.query_id` Array(String),
`entries.bytes` Array(UInt64),