Merge branch 'master' into dist/fix-startup-race

This commit is contained in:
Antonio Andelic 2023-01-18 09:44:52 +01:00 committed by GitHub
commit f3469ee077
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 575 additions and 288 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 799234226187c0ae0b8c90f23465b25ed7956e56
Subproject commit 0ab9bba7ccad3c8dacce04a35cb3b78218547ab4

View File

@ -128,6 +128,7 @@ function run_tests()
if [[ "${HIGH_LEVEL_COVERAGE}" = "YES" ]]; then
ADDITIONAL_OPTIONS+=('--report-coverage')
ADDITIONAL_OPTIONS+=('--report-logs-stats')
fi
set +e

View File

@ -289,6 +289,7 @@ if __name__ == "__main__":
"--database=system",
"--hung-check",
"--stress",
"--report-logs-stats",
"00001_select_1",
]
)

View File

@ -182,6 +182,31 @@ No matter what pool is used for a job, at start `ThreadStatus` instance is creat
If thread is related to query execution, then the most important thing attached to `ThreadStatus` is query context `ContextPtr`. Every query has its master thread in the server pool. Master thread does the attachment by holding an `ThreadStatus::QueryScope query_scope(query_context)` object. Master thread also creates a thread group represented with `ThreadGroupStatus` object. Every additional thread that is allocated during this query execution is attached to its thread group by `CurrentThread::attachTo(thread_group)` call. Thread groups are used to aggregate profile event counters and track memory consumption by all threads dedicated to a single task (see `MemoryTracker` and `ProfileEvents::Counters` classes for more information).
## Concurrency control {#concurrency-control}
Query that can be parallelized uses `max_threads` setting to limit itself. Default value for this setting is selected in a way that allows single query to utilize all CPU cores in the best way. But what if there are multiple concurrent queries and each of them uses default `max_threads` setting value? Then queries will share CPU resources. OS will ensure fairness by constantly switching threads, which introduce some performance penalty. `ConcurrencyControl` helps to deal with this penalty and avoid allocating a lot of threads. Configuration setting `concurrent_threads_soft_limit_num` is used to limit how many concurrent thread can be allocated before applying some kind of CPU pressure.
:::note
`concurrent_threads_soft_limit_num` and `concurrent_threads_soft_limit_ratio_to_cores` are disabled (equal 0) by default. So this feature must be enabled before use.
:::
Notion of CPU `slot` is introduced. Slot is a unit of concurrency: to run a thread query has to acquire a slot in advance and release it when thread stops. The number of slots is globally limited in a server. Multiple concurrent queries are competing for CPU slots if the total demand exceeds the total number of slots. `ConcurrencyControl` is responsible to resolve this competition by doing CPU slot scheduling in a fair manner.
Each slot can be seen as an independent state machine with the following states:
* `free`: slot is available to be allocated by any query.
* `granted`: slot is `allocated` by specific query, but not yet acquired by any thread.
* `acquired`: slot is `allocated` by specific query and acquired by a thread.
Note that `allocated` slot can be in two different states: `granted` and `acquired`. The former is a transitional state, that actually should be short (from the instant when a slot is allocated to a query till the moment when the up-scaling procedure is run by any thread of that query).
![state diagram](@site/docs/en/development/images/concurrency.png)
API of `ConcurrencyControl` consists of the following functions:
1. Create a resource allocation for a query: `auto slots = ConcurrencyControl::instance().allocate(1, max_threads);`. It will allocate at least 1 and at most `max_threads` slots. Note that the first slot is granted immediately, but the remaining slots may be granted later. Thus limit is soft, because every query will obtain at least one thread.
2. For every thread a slot has to be acquired from an allocation: `while (auto slot = slots->tryAcquire()) spawnThread([slot = std::move(slot)] { ... });`.
3. Update the total amount of slots: `ConcurrencyControl::setMaxConcurrency(concurrent_threads_soft_limit_num)`. Can be done in runtime, w/o server restart.
This API allows queries to start with at least one thread (in presence of CPU pressure) and later scale up to `max_threads`.
## Distributed Query Execution {#distributed-query-execution}
Servers in a cluster setup are mostly independent. You can create a `Distributed` table on one or all servers in a cluster. The `Distributed` table does not store data itself it only provides a “view” to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The `Distributed` table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers and does not send much intermediate data over the network.

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

View File

@ -757,6 +757,10 @@ Possible values:
Default value: `0`.
**See Also**
- [Concurrency Control](/docs/en/development/architecture.md#concurrency-control)
## concurrent_threads_soft_limit_ratio_to_cores {#concurrent_threads_soft_limit_ratio_to_cores}
The maximum number of query processing threads as multiple of number of logical cores.
More details: [concurrent_threads_soft_limit_num](#concurrent-threads-soft-limit-num).
@ -768,6 +772,12 @@ Possible values:
Default value: `0`.
**Example**
``` xml
<concurrent_threads_soft_limit_ratio_to_cores>3</concurrent_threads_soft_limit_ratio_to_cores>
```
## max_concurrent_queries {#max-concurrent-queries}
The maximum number of simultaneously processed queries.

View File

@ -2,6 +2,7 @@
#include <Common/SipHash.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/IDataType.h>
#include <Analyzer/ConstantNode.h>
#include <IO/WriteBufferFromString.h>
@ -31,6 +32,15 @@ FunctionNode::FunctionNode(String function_name_)
children[arguments_child_index] = std::make_shared<ListNode>();
}
const DataTypes & FunctionNode::getArgumentTypes() const
{
if (!function)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} is not resolved",
function_name);
return function->getArgumentTypes();
}
ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
{
const auto & arguments = getArguments().getNodes();

View File

@ -85,6 +85,7 @@ public:
/// Get arguments node
QueryTreeNodePtr & getArgumentsNode() { return children[arguments_child_index]; }
const DataTypes & getArgumentTypes() const;
ColumnsWithTypeAndName getArgumentColumns() const;
/// Returns true if function node has window, false otherwise

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
@ -47,19 +48,23 @@ Field zeroField(const Field & value)
class AggregateFunctionsArithmericOperationsVisitor : public InDepthQueryTreeVisitor<AggregateFunctionsArithmericOperationsVisitor>
{
public:
explicit AggregateFunctionsArithmericOperationsVisitor(ContextPtr context_)
: context(std::move(context_))
{}
/// Traverse tree bottom to top
static bool shouldTraverseTopToBottom()
{
return false;
}
static void visitImpl(QueryTreeNodePtr & node)
void visitImpl(QueryTreeNodePtr & node)
{
auto * aggregate_function_node = node->as<FunctionNode>();
if (!aggregate_function_node || !aggregate_function_node->isAggregateFunction())
return;
static std::unordered_map<std::string_view, std::unordered_set<std::string_view>> supported_functions
static std::unordered_map<std::string_view, std::unordered_set<std::string_view>> supported_aggregate_functions
= {{"sum", {"multiply", "divide"}},
{"min", {"multiply", "divide", "plus", "minus"}},
{"max", {"multiply", "divide", "plus", "minus"}},
@ -69,88 +74,111 @@ public:
if (aggregate_function_arguments_nodes.size() != 1)
return;
auto * inner_function_node = aggregate_function_arguments_nodes[0]->as<FunctionNode>();
if (!inner_function_node)
const auto & arithmetic_function_node = aggregate_function_arguments_nodes[0];
auto * arithmetic_function_node_typed = arithmetic_function_node->as<FunctionNode>();
if (!arithmetic_function_node_typed)
return;
const auto & inner_function_arguments_nodes = inner_function_node->getArguments().getNodes();
if (inner_function_arguments_nodes.size() != 2)
const auto & arithmetic_function_arguments_nodes = arithmetic_function_node_typed->getArguments().getNodes();
if (arithmetic_function_arguments_nodes.size() != 2)
return;
/// Aggregate functions[sum|min|max|avg] is case-insensitive, so we use lower cases name
auto lower_function_name = Poco::toLower(aggregate_function_node->getFunctionName());
auto lower_aggregate_function_name = Poco::toLower(aggregate_function_node->getFunctionName());
auto supported_function_it = supported_functions.find(lower_function_name);
if (supported_function_it == supported_functions.end())
auto supported_aggregate_function_it = supported_aggregate_functions.find(lower_aggregate_function_name);
if (supported_aggregate_function_it == supported_aggregate_functions.end())
return;
const auto & inner_function_name = inner_function_node->getFunctionName();
if (!supported_function_it->second.contains(inner_function_name))
const auto & arithmetic_function_name = arithmetic_function_node_typed->getFunctionName();
if (!supported_aggregate_function_it->second.contains(arithmetic_function_name))
return;
const auto * left_argument_constant_node = inner_function_arguments_nodes[0]->as<ConstantNode>();
const auto * right_argument_constant_node = inner_function_arguments_nodes[1]->as<ConstantNode>();
const auto * left_argument_constant_node = arithmetic_function_arguments_nodes[0]->as<ConstantNode>();
const auto * right_argument_constant_node = arithmetic_function_arguments_nodes[1]->as<ConstantNode>();
/** If we extract negative constant, aggregate function name must be updated.
*
* Example: SELECT min(-1 * id);
* Result: SELECT -1 * max(id);
*/
std::string function_name_if_constant_is_negative;
if (inner_function_name == "multiply" || inner_function_name == "divide")
std::string aggregate_function_name_if_constant_is_negative;
if (arithmetic_function_name == "multiply" || arithmetic_function_name == "divide")
{
if (lower_function_name == "min")
function_name_if_constant_is_negative = "max";
else if (lower_function_name == "max")
function_name_if_constant_is_negative = "min";
if (lower_aggregate_function_name == "min")
aggregate_function_name_if_constant_is_negative = "max";
else if (lower_aggregate_function_name == "max")
aggregate_function_name_if_constant_is_negative = "min";
}
size_t arithmetic_function_argument_index = 0;
if (left_argument_constant_node && !right_argument_constant_node)
{
/// Do not rewrite `sum(1/n)` with `sum(1) * div(1/n)` because of lose accuracy
if (inner_function_name == "divide")
if (arithmetic_function_name == "divide")
return;
/// Rewrite `aggregate_function(inner_function(constant, argument))` into `inner_function(constant, aggregate_function(argument))`
const auto & left_argument_constant_value_literal = left_argument_constant_node->getValue();
if (!function_name_if_constant_is_negative.empty() &&
if (!aggregate_function_name_if_constant_is_negative.empty() &&
left_argument_constant_value_literal < zeroField(left_argument_constant_value_literal))
{
lower_function_name = function_name_if_constant_is_negative;
lower_aggregate_function_name = aggregate_function_name_if_constant_is_negative;
}
auto inner_function_clone = inner_function_node->clone();
auto & inner_function_clone_arguments = inner_function_clone->as<FunctionNode &>().getArguments();
auto & inner_function_clone_arguments_nodes = inner_function_clone_arguments.getNodes();
auto inner_function_clone_right_argument = inner_function_clone_arguments_nodes[1];
aggregate_function_arguments_nodes = {inner_function_clone_right_argument};
resolveAggregateFunctionNode(*aggregate_function_node, inner_function_clone_right_argument, lower_function_name);
inner_function_clone_arguments_nodes[1] = node;
node = std::move(inner_function_clone);
arithmetic_function_argument_index = 1;
}
else if (right_argument_constant_node)
{
/// Rewrite `aggregate_function(inner_function(argument, constant))` into `inner_function(aggregate_function(argument), constant)`
const auto & right_argument_constant_value_literal = right_argument_constant_node->getValue();
if (!function_name_if_constant_is_negative.empty() &&
if (!aggregate_function_name_if_constant_is_negative.empty() &&
right_argument_constant_value_literal < zeroField(right_argument_constant_value_literal))
{
lower_function_name = function_name_if_constant_is_negative;
lower_aggregate_function_name = aggregate_function_name_if_constant_is_negative;
}
auto inner_function_clone = inner_function_node->clone();
auto & inner_function_clone_arguments = inner_function_clone->as<FunctionNode &>().getArguments();
auto & inner_function_clone_arguments_nodes = inner_function_clone_arguments.getNodes();
auto inner_function_clone_left_argument = inner_function_clone_arguments_nodes[0];
aggregate_function_arguments_nodes = {inner_function_clone_left_argument};
resolveAggregateFunctionNode(*aggregate_function_node, inner_function_clone_left_argument, lower_function_name);
inner_function_clone_arguments_nodes[0] = node;
node = std::move(inner_function_clone);
arithmetic_function_argument_index = 0;
}
auto optimized_function_node = cloneArithmeticFunctionAndWrapArgumentIntoAggregateFunction(arithmetic_function_node,
arithmetic_function_argument_index,
node,
lower_aggregate_function_name);
if (optimized_function_node->getResultType()->equals(*node->getResultType()))
node = std::move(optimized_function_node);
}
private:
QueryTreeNodePtr cloneArithmeticFunctionAndWrapArgumentIntoAggregateFunction(
const QueryTreeNodePtr & arithmetic_function,
size_t arithmetic_function_argument_index,
const QueryTreeNodePtr & aggregate_function,
const std::string & result_aggregate_function_name)
{
auto arithmetic_function_clone = arithmetic_function->clone();
auto & arithmetic_function_clone_typed = arithmetic_function_clone->as<FunctionNode &>();
auto & arithmetic_function_clone_arguments_nodes = arithmetic_function_clone_typed.getArguments().getNodes();
auto & arithmetic_function_clone_argument = arithmetic_function_clone_arguments_nodes[arithmetic_function_argument_index];
auto aggregate_function_clone = aggregate_function->clone();
auto & aggregate_function_clone_typed = aggregate_function_clone->as<FunctionNode &>();
aggregate_function_clone_typed.getArguments().getNodes() = { arithmetic_function_clone_argument };
resolveAggregateFunctionNode(aggregate_function_clone_typed, arithmetic_function_clone_argument, result_aggregate_function_name);
arithmetic_function_clone_arguments_nodes[arithmetic_function_argument_index] = std::move(aggregate_function_clone);
resolveOrdinaryFunctionNode(arithmetic_function_clone_typed, arithmetic_function_clone_typed.getFunctionName());
return arithmetic_function_clone;
}
inline void resolveOrdinaryFunctionNode(FunctionNode & function_node, const String & function_name) const
{
auto function = FunctionFactory::instance().get(function_name, context);
function_node.resolveAsFunction(function->build(function_node.getArgumentColumns()));
}
static inline void resolveAggregateFunctionNode(FunctionNode & function_node, const QueryTreeNodePtr & argument, const String & aggregate_function_name)
{
auto function_aggregate_function = function_node.getAggregateFunction();
@ -163,13 +191,15 @@ private:
function_node.resolveAsAggregateFunction(std::move(aggregate_function));
}
ContextPtr context;
};
}
void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr)
void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
AggregateFunctionsArithmericOperationsVisitor visitor;
AggregateFunctionsArithmericOperationsVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}

View File

@ -1,6 +1,7 @@
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Common/NamePrompter.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
@ -66,6 +67,14 @@
#include <Analyzer/UnionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/HashUtils.h>
namespace ProfileEvents
{
extern const Event ScalarSubqueriesGlobalCacheHit;
extern const Event ScalarSubqueriesCacheMiss;
}
#include <Common/checkStackSize.h>
@ -1097,7 +1106,7 @@ private:
static QueryTreeNodePtr tryGetLambdaFromSQLUserDefinedFunctions(const std::string & function_name, ContextPtr context);
static void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, size_t subquery_depth, ContextPtr context);
void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, size_t subquery_depth, ContextPtr context);
static void mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope);
@ -1207,6 +1216,9 @@ private:
/// Global resolve expression node to projection names map
std::unordered_map<QueryTreeNodePtr, ProjectionNames> resolved_expressions;
/// Results of scalar sub queries
std::unordered_map<QueryTreeNodeConstRawPtrWithHash, std::shared_ptr<ConstantValue>> scalars;
};
/// Utility functions implementation
@ -1687,6 +1699,16 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
node->getNodeTypeName(),
node->formatASTForErrorMessage());
auto scalars_iterator = scalars.find(node.get());
if (scalars_iterator != scalars.end())
{
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
node = std::make_shared<ConstantNode>(scalars_iterator->second, node);
return;
}
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
@ -1699,10 +1721,11 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
auto io = interpreter->execute();
Block block;
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
Block block;
while (block.rows() == 0 && executor.pull(block))
{
}
@ -1743,7 +1766,6 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
block = materializeBlock(block);
size_t columns = block.columns();
// Block scalar;
Field scalar_value;
DataTypePtr scalar_type;
@ -1770,6 +1792,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
}
auto constant_value = std::make_shared<ConstantValue>(std::move(scalar_value), std::move(scalar_type));
scalars[node.get()] = constant_value;
node = std::make_shared<ConstantNode>(std::move(constant_value), node);
}

View File

@ -1,5 +1,18 @@
#include <Analyzer/QueryTreePassManager.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/Context.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
@ -17,15 +30,6 @@
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Common/Exception.h>
namespace DB
{
@ -45,24 +49,6 @@ namespace
*/
class ValidationChecker : public InDepthQueryTreeVisitor<ValidationChecker>
{
String pass_name;
void visitColumn(ColumnNode * column) const
{
if (column->getColumnSourceOrNull() == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} {} query tree node does not have valid source node after running {} pass",
column->getColumnName(), column->getColumnType(), pass_name);
}
void visitFunction(FunctionNode * function) const
{
if (!function->isResolved())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} is not resolved after running {} pass",
function->toAST()->formatForErrorMessage(), pass_name);
}
public:
explicit ValidationChecker(String pass_name_)
: pass_name(std::move(pass_name_))
@ -75,6 +61,57 @@ public:
else if (auto * function = node->as<FunctionNode>())
return visitFunction(function);
}
private:
void visitColumn(ColumnNode * column) const
{
if (column->getColumnSourceOrNull() == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} {} query tree node does not have valid source node after running {} pass",
column->getColumnName(), column->getColumnType(), pass_name);
}
void visitFunction(FunctionNode * function) const
{
if (!function->isResolved())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} is not resolved after running {} pass",
function->toAST()->formatForErrorMessage(), pass_name);
if (isNameOfInFunction(function->getFunctionName()))
return;
const auto & expected_argument_types = function->getArgumentTypes();
size_t expected_argument_types_size = expected_argument_types.size();
auto actual_argument_columns = function->getArgumentColumns();
if (expected_argument_types_size != actual_argument_columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} expects {} arguments but has {} after running {} pass",
function->toAST()->formatForErrorMessage(),
expected_argument_types_size,
actual_argument_columns.size(),
pass_name);
for (size_t i = 0; i < expected_argument_types_size; ++i)
{
// Skip lambdas
if (WhichDataType(expected_argument_types[i]).isFunction())
continue;
if (!expected_argument_types[i]->equals(*actual_argument_columns[i].type))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} expects {} argument to have {} type but receives {} after running {} pass",
function->toAST()->formatForErrorMessage(),
i + 1,
expected_argument_types[i]->getName(),
actual_argument_columns[i].type->getName(),
pass_name);
}
}
}
String pass_name;
};
#endif

View File

@ -7,6 +7,29 @@
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_) , logger(logger_) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(const Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (auto * channel = logger->getChannel())
channel->log(message);
}
};
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
namespace
{
@ -17,8 +40,37 @@ namespace
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
template<typename T> struct is_fmt_runtime : std::false_type {};
template<typename T> struct is_fmt_runtime<fmt::basic_runtime<T>> : std::true_type {};
/// Usually we use LOG_*(...) macros with either string literals or fmt::runtime(whatever) as a format string.
/// This function is useful to get a string_view to a static format string passed to LOG_* macro.
template <typename T> constexpr std::string_view tryGetStaticFormatString(T && x)
{
if constexpr (is_fmt_runtime<T>::value)
{
/// It definitely was fmt::runtime(something).
/// We are not sure about a lifetime of the string, so return empty view.
/// Also it can be arbitrary string, not a formatting pattern.
/// So returning empty pattern will not pollute the set of patterns.
return std::string_view();
}
else
{
/// Most likely it was a string literal.
/// Unfortunately, there's no good way to check if something is a string literal.
/// But fmtlib requires a format string to be compile-time constant unless fmt::runtime is used.
static_assert(std::is_nothrow_convertible<T, const char * const>::value);
static_assert(!std::is_pointer<T>::value);
return std::string_view(x);
}
}
}
#define LOG_IMPL_FIRST_ARG(X, ...) X
/// Logs a message to a specified logger with that level.
/// If more than one argument is provided,
/// the first argument is interpreted as template with {}-substitutions
@ -30,7 +82,7 @@ namespace
auto _logger = ::getLogger(logger); \
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
(DB::CurrentThread::getGroup()->client_logs_level >= (priority)); \
if (_logger->is((PRIORITY)) || _is_clients_log) \
if (_is_clients_log || _logger->is((PRIORITY))) \
{ \
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
if (auto _channel = _logger->getChannel()) \
@ -40,7 +92,7 @@ namespace
file_function += "; "; \
file_function += __PRETTY_FUNCTION__; \
Poco::Message poco_message(_logger->name(), formatted_message, \
(PRIORITY), file_function.c_str(), __LINE__); \
(PRIORITY), file_function.c_str(), __LINE__, tryGetStaticFormatString(LOG_IMPL_FIRST_ARG(__VA_ARGS__))); \
_channel->log(poco_message); \
} \
} \

View File

@ -49,7 +49,9 @@ NamesAndTypesList TextLogElement::getNamesAndTypes()
{"revision", std::make_shared<DataTypeUInt32>()},
{"source_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"source_line", std::make_shared<DataTypeUInt64>()}
{"source_line", std::make_shared<DataTypeUInt64>()},
{"message_format_string", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
};
}
@ -74,6 +76,8 @@ void TextLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file);
columns[i++]->insert(source_line);
columns[i++]->insert(message_format_string);
}
TextLog::TextLog(ContextPtr context_, const String & database_name_,

View File

@ -28,6 +28,8 @@ struct TextLogElement
String source_file;
UInt64 source_line{};
std::string_view message_format_string;
static std::string name() { return "TextLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }

View File

@ -133,6 +133,8 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
elem.source_file = msg.getSourceFile();
elem.source_line = msg.getSourceLine();
elem.message_format_string = msg.getFormatString();
std::shared_ptr<TextLog> text_log_locked{};
{
std::lock_guard<std::mutex> lock(text_log_mutex);

View File

@ -196,7 +196,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num)
#ifndef NDEBUG
auto & context = tasks.getThreadContext(thread_num);
LOG_TRACE(log,
LOG_TEST(log,
"Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.",
context.total_time_ns / 1e9,
context.execution_time_ns / 1e9,

View File

@ -402,14 +402,10 @@ void TCPHandler::runImpl()
{
auto callback = [this]()
{
{
std::lock_guard task_callback_lock(task_callback_mutex);
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
if (isQueryCancelled())
return true;
}
std::lock_guard lock(fatal_error_mutex);
if (isQueryCancelled())
return true;
sendProgress();
sendSelectProfileEvents();
@ -424,6 +420,9 @@ void TCPHandler::runImpl()
}
state.io.onFinish();
std::lock_guard lock(task_callback_mutex);
/// Send final progress after calling onFinish(), since it will update the progress.
///
/// NOTE: we cannot send Progress for regular INSERT (with VALUES)
@ -446,8 +445,11 @@ void TCPHandler::runImpl()
if (state.is_connection_closed)
break;
sendLogs();
sendEndOfStream();
{
std::lock_guard lock(task_callback_mutex);
sendLogs();
sendEndOfStream();
}
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
@ -760,6 +762,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
}
/// Defer locking to cover a part of the scope below and everything after it
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
{
PullingAsyncPipelineExecutor executor(pipeline);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
@ -796,6 +801,11 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
}
/// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together
/// with sendProgress() out of the scope
progress_lock.lock();
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.

View File

@ -59,6 +59,7 @@ namespace ErrorCodes
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int CANNOT_COMPILE_REGEXP;
}
namespace
{
@ -75,6 +76,9 @@ namespace
const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);

View File

@ -651,7 +651,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
}
#endif
LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
LOG_WARNING(log, "Will retry fetching part without zero-copy: {}", e.message());
/// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock
/// on http pool.

View File

@ -109,10 +109,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
/// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too)
/// 3. We have two intersecting parts, both cover source_part_name. It's logical error.
/// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else?
constexpr const char * message = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
LOG_WARNING(log, fmt::runtime(message), source_part_name, source_part_or_covering->name, entry.new_part_name);
constexpr auto fmt_string = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
String message;
LOG_WARNING(LogToStr(message, log), fmt_string, source_part_name, source_part_or_covering->name, entry.new_part_name);
if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version)))
throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, source_part_or_covering->name, entry.new_part_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, message);
return PrepareResult{
.prepared_successfully = false,

View File

@ -3899,7 +3899,9 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
if (error)
{
LOG_ERROR(log, "The set of parts restored in place of {} looks incomplete. There might or might not be a data loss.{}", part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
"SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}",
part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
}
}

View File

@ -373,9 +373,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
String message = "Part " + part_name + " looks broken. Removing it and will try to fetch.";
LOG_ERROR(log, fmt::runtime(message));
constexpr auto fmt_string = "Part {} looks broken. Removing it and will try to fetch.";
String message = fmt::format(fmt_string, part_name);
LOG_ERROR(log, fmt_string, part_name);
/// Delete part locally.
storage.outdateBrokenPartAndCloneToDetached(part, "broken");
@ -392,9 +392,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// Probably, someone just wrote down the part, and has not yet added to ZK.
/// Therefore, delete only if the part is old (not very reliable).
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
String message = "Unexpected part " + part_name + " in filesystem. Removing.";
LOG_ERROR(log, fmt::runtime(message));
constexpr auto fmt_string = "Unexpected part {} in filesystem. Removing.";
String message = fmt::format(fmt_string, part_name);
LOG_ERROR(log, fmt_string, part_name);
storage.outdateBrokenPartAndCloneToDetached(part, "unexpected");
return {part_name, false, message};
}

View File

@ -1191,12 +1191,10 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
if (entry_for_same_part_it != future_parts.end())
{
const LogEntry & another_entry = *entry_for_same_part_it->second;
out_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because another log entry {} of type {} for the same part ({}) is being processed.",
entry.znode_name, entry.type, entry.new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
LOG_INFO(log, fmt::runtime(out_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another log entry {} of type {} for the same part ({}) is being processed.";
LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
return true;
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
@ -1238,11 +1236,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
{
if (entry.znode_name < future_part_elem.second->znode_name)
{
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing and another entry {} is newer.",
entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
LOG_TRACE(log, fmt::runtime(out_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing and another entry {} is newer.";
LOG_TRACE(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
return true;
}
@ -1250,11 +1246,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
continue;
}
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing.",
entry.znode_name, new_part_name, future_part_elem.first);
LOG_TRACE(log, fmt::runtime(out_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing.";
LOG_TEST(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first);
return true;
}
@ -1337,11 +1331,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (future_parts.contains(name))
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).",
entry.znode_name, entry.typeToString(), entry.new_part_name, name);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
return false;
}
@ -1357,10 +1349,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (merger_mutator.merges_blocker.isCancelled())
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.",
entry.znode_name, entry.typeToString(), entry.new_part_name);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
@ -1375,8 +1365,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!disks.empty() && only_s3_storage && storage.checkZeroCopyLockExists(entry.new_part_name, disks[0]))
{
out_postpone_reason = "Not executing merge/mutation for the part " + entry.new_part_name
+ ", waiting other replica to execute it and will fetch after.";
constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting other replica to execute it and will fetch after.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name);
return false;
}
}
@ -1387,9 +1377,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
out_postpone_reason = reason;
constexpr auto fmt_string = "Not executing merge for the part {}, waiting for {} to execute merge.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}
@ -1411,20 +1400,16 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (merger_mutator.ttl_merges_blocker.isCancelled())
{
out_postpone_reason = fmt::format(
"Not executing log entry {} for part {} because merges with TTL are cancelled now.",
entry.znode_name, entry.new_part_name);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.new_part_name);
return false;
}
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
{
out_postpone_reason = fmt::format(
"Not executing log entry {} for part {} because {} merges with TTL already executing, maximum {}.",
entry.znode_name, entry.new_part_name, total_merges_with_ttl,
data_settings->max_number_of_merges_with_ttl_in_pool);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} because {} merges with TTL already executing, maximum {}.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.new_part_name, total_merges_with_ttl,
data_settings->max_number_of_merges_with_ttl_in_pool);
return false;
}
}
@ -1432,12 +1417,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
out_postpone_reason = fmt::format("Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).",
entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
return false;
}
@ -1450,10 +1433,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = fmt::format(
"Cannot execute alter metadata {} with version {} because another alter {} must be executed before",
entry.znode_name, entry.alter_version, head_alter);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version, head_alter);
return false;
}
}
@ -1466,17 +1447,13 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
{
out_postpone_reason = fmt::format(
"Cannot execute alter data {} with version {} because metadata still not altered",
entry.znode_name, entry.alter_version);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Cannot execute alter data {} with version {} because metadata still not altered";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version);
}
else
{
out_postpone_reason = fmt::format(
"Cannot execute alter data {} with version {} because another alter {} must be executed before",
entry.znode_name, entry.alter_version, head_alter);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version, head_alter);
}
return false;
@ -1498,14 +1475,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (drop_range_info.isDisjoint(info))
continue;
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.",
entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
return false;
}
}
@ -1531,11 +1506,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).",
entry.znode_name, entry.typeToString(), entry.new_part_name, replace_entry->znode_name);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
entry.new_part_name, replace_entry->znode_name);
return false;
}
}

View File

@ -80,6 +80,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int CANNOT_COMPILE_REGEXP;
}
namespace
@ -105,6 +106,9 @@ void listFilesWithRegexpMatchingImpl(
auto regexp = makeRegexpPatternFromGlobs(current_glob);
re2::RE2 matcher(regexp);
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
bool skip_regex = current_glob == "/*" ? true : false;
if (!recursive)

View File

@ -1303,10 +1303,10 @@ bool StorageMergeTree::optimize(
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr const char * message = "Cannot OPTIMIZE table: {}";
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, fmt::runtime(message), disable_reason);
LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
@ -1330,10 +1330,10 @@ bool StorageMergeTree::optimize(
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr const char * message = "Cannot OPTIMIZE table: {}";
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, fmt::runtime(message), disable_reason);
LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);

View File

@ -1272,12 +1272,12 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
const auto storage_settings_ptr = getSettings();
bool insane = uncovered_unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
constexpr const char * sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
constexpr auto sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
"{} rows of {} total rows in filesystem are suspicious. "
"There are {} uncovered unexpected parts with {} rows ({} of them is not just-written with {} rows), "
"{} missing parts (with {} blocks), {} covered unexpected parts (with {} rows).";
constexpr const char * sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
constexpr auto sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
if (insane && !skip_sanity_checks)
{
@ -1293,7 +1293,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
LOG_WARNING(log, fmt::runtime(sanity_report_fmt), getStorageID().getNameForLogs(),
LOG_WARNING(log, sanity_report_fmt, getStorageID().getNameForLogs(),
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
uncovered_unexpected_parts.size(), uncovered_unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks, covered_unexpected_parts.size(), unexpected_parts_rows - uncovered_unexpected_parts_rows);

View File

@ -109,6 +109,7 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_COMPILE_REGEXP;
}
class IOutputFormat;
@ -174,6 +175,10 @@ public:
outcome_future = listObjectsAsync();
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(globbed_uri.key));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error());
recursive = globbed_uri.key == "/**" ? true : false;
fillInternalBufferAssumeLocked();
}

View File

@ -176,6 +176,24 @@ private:
}
ReaderHolder() = default;
ReaderHolder(const ReaderHolder & other) = delete;
ReaderHolder & operator=(const ReaderHolder & other) = delete;
ReaderHolder(ReaderHolder && other) noexcept
{
*this = std::move(other);
}
ReaderHolder & operator=(ReaderHolder && other) noexcept
{
/// The order of destruction is important.
/// reader uses pipeline, pipeline uses read_buf.
reader = std::move(other.reader);
pipeline = std::move(other.pipeline);
read_buf = std::move(other.read_buf);
path = std::move(other.path);
return *this;
}
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }

View File

@ -345,9 +345,11 @@ namespace
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
if (supports_ranges)
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is supported");
else
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT

View File

@ -1817,6 +1817,56 @@ def reportCoverage(args):
"""
)
def reportLogStats(args):
query = """
WITH
120 AS mins,
(
SELECT (count(), sum(length(message)))
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
) AS total
SELECT
count() AS count,
round(count / (total.1), 3) AS `count_%`,
formatReadableSize(sum(length(message))) AS size,
round(sum(length(message)) / (total.2), 3) AS `size_%`,
countDistinct(logger_name) AS uniq_loggers,
countDistinct(thread_id) AS uniq_threads,
groupArrayDistinct(toString(level)) AS levels,
round(sum(query_id = '') / count, 3) AS `background_%`,
message_format_string
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
GROUP BY message_format_string
ORDER BY count DESC
LIMIT 100
FORMAT TSVWithNamesAndTypes
"""
value = clickhouse_execute(args, query).decode()
print("\nTop patterns of log messages:\n")
print(value)
print("\n")
query = """
WITH
120 AS mins
SELECT
count() AS count,
substr(replaceRegexpAll(message, '[^A-Za-z]+', ''), 1, 32) AS pattern,
substr(any(message), 1, 256) as runtime_message
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
GROUP BY pattern
ORDER BY count DESC
LIMIT 50
FORMAT TSVWithNamesAndTypes
"""
value = clickhouse_execute(args, query).decode()
print("\nTop messages without format string (fmt::runtime):\n")
print(value)
print("\n")
def main(args):
global server_died
@ -1951,6 +2001,9 @@ def main(args):
else:
print("All tests have finished.")
if args.report_logs_stats:
reportLogStats(args)
if args.report_coverage and not reportCoverage(args):
exit_code.value = 1
@ -2242,6 +2295,12 @@ if __name__ == "__main__":
default=False,
help="Check what high-level server components were covered by tests",
)
parser.add_argument(
"--report-logs-stats",
action="store_true",
default=False,
help="Report statistics about log messages",
)
args = parser.parse_args()
if args.queries and not os.path.isdir(args.queries):

View File

@ -1,2 +1,3 @@
02177_CTE_GLOBAL_ON 5 500 11 0 5
02177_CTE_GLOBAL_OFF 1 100 5 0 1
02177_CTE_NEW_ANALYZER 2 200 3 0 2

View File

@ -18,6 +18,16 @@ SELECT '02177_CTE_GLOBAL_OFF', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS enable_global_with_statement = 0;
WITH
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ) as a5
SELECT '02177_CTE_NEW_ANALYZER', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS allow_experimental_analyzer = 1;
SYSTEM FLUSH LOGS;
SELECT
'02177_CTE_GLOBAL_ON',
@ -46,3 +56,17 @@ WHERE
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_GLOBAL_OFF%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
SELECT
'02177_CTE_NEW_ANALYZER',
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_NEW_ANALYZER%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;

View File

@ -0,0 +1,64 @@
-- { echoOn }
EXPLAIN QUERY TREE SELECT avg(log(2) * number) FROM numbers(10);
QUERY id: 0
PROJECTION COLUMNS
avg(multiply(log(2), number)) Float64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: multiply, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 3, nodes: 2
CONSTANT id: 4, constant_value: Float64_0.6931471805599453, constant_value_type: Float64
EXPRESSION
FUNCTION id: 5, function_name: log, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 6, nodes: 1
CONSTANT id: 7, constant_value: UInt64_2, constant_value_type: UInt8
FUNCTION id: 8, function_name: avg, function_type: aggregate, result_type: Float64
ARGUMENTS
LIST id: 9, nodes: 1
COLUMN id: 10, column_name: number, result_type: UInt64, source_id: 11
JOIN TREE
TABLE_FUNCTION id: 11, table_function_name: numbers
ARGUMENTS
LIST id: 12, nodes: 1
CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
EXPLAIN QUERY TREE SELECT avg(number * log(2)) FROM numbers(10);
QUERY id: 0
PROJECTION COLUMNS
avg(multiply(number, log(2))) Float64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: multiply, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: avg, function_type: aggregate, result_type: Float64
ARGUMENTS
LIST id: 5, nodes: 1
COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
CONSTANT id: 8, constant_value: Float64_0.6931471805599453, constant_value_type: Float64
EXPRESSION
FUNCTION id: 9, function_name: log, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 10, nodes: 1
CONSTANT id: 11, constant_value: UInt64_2, constant_value_type: UInt8
JOIN TREE
TABLE_FUNCTION id: 7, table_function_name: numbers
ARGUMENTS
LIST id: 12, nodes: 1
CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
SELECT round(avg(log(2) * number), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028
SELECT round(avg(number * log(2)), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028

View File

@ -0,0 +1,14 @@
SET allow_experimental_analyzer = 1;
SET optimize_arithmetic_operations_in_aggregate_functions = 1;
-- { echoOn }
EXPLAIN QUERY TREE SELECT avg(log(2) * number) FROM numbers(10);
EXPLAIN QUERY TREE SELECT avg(number * log(2)) FROM numbers(10);
SELECT round(avg(log(2) * number), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
SELECT round(avg(number * log(2)), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
-- { echoOff }

View File

@ -1,81 +0,0 @@
-- { echoOn }
EXPLAIN actions=1
(
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
Expression ((Project names + Projection))
Actions: INPUT : 0 -> avg(number_0) Float64 : 0
COLUMN Const(Float64) -> 0.6931471805599453_Float64 Float64 : 1
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
FUNCTION multiply(0.6931471805599453_Float64 :: 1, avg(number_0) :: 0) -> multiply(0.6931471805599453_Float64, avg(number_0)) Float64 : 3
FUNCTION round(multiply(0.6931471805599453_Float64, avg(number_0)) :: 3, 6_UInt8 :: 2) -> round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) Float64 : 0
ALIAS round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) :: 0 -> k Float64 : 2
Positions: 2
Aggregating
Keys: modulo(number_0, 3_UInt8), modulo(number_0, 2_UInt8)
Aggregates:
avg(number_0)
Function: avg(UInt64) → Float64
Arguments: number_0
Expression ((Before GROUP BY + Change column names to column identifiers))
Actions: INPUT : 0 -> number UInt64 : 0
COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1
COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 2
ALIAS number :: 0 -> number_0 UInt64 : 3
FUNCTION modulo(number_0 : 3, 3_UInt8 :: 1) -> modulo(number_0, 3_UInt8) UInt8 : 0
FUNCTION modulo(number_0 : 3, 2_UInt8 :: 2) -> modulo(number_0, 2_UInt8) UInt8 : 1
Positions: 0 1 3
ReadFromStorage (SystemNumbers)
EXPLAIN actions=1
(
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
Expression ((Project names + Projection))
Actions: INPUT : 0 -> avg(number_0) Float64 : 0
COLUMN Const(Float64) -> 0.6931471805599453_Float64 Float64 : 1
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
FUNCTION multiply(0.6931471805599453_Float64 :: 1, avg(number_0) :: 0) -> multiply(0.6931471805599453_Float64, avg(number_0)) Float64 : 3
FUNCTION round(multiply(0.6931471805599453_Float64, avg(number_0)) :: 3, 6_UInt8 :: 2) -> round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) Float64 : 0
ALIAS round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) :: 0 -> k Float64 : 2
Positions: 2
Aggregating
Keys: modulo(number_0, 3_UInt8), modulo(number_0, 2_UInt8)
Aggregates:
avg(number_0)
Function: avg(UInt64) → Float64
Arguments: number_0
Expression ((Before GROUP BY + Change column names to column identifiers))
Actions: INPUT : 0 -> number UInt64 : 0
COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1
COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 2
ALIAS number :: 0 -> number_0 UInt64 : 3
FUNCTION modulo(number_0 : 3, 3_UInt8 :: 1) -> modulo(number_0, 3_UInt8) UInt8 : 0
FUNCTION modulo(number_0 : 3, 2_UInt8 :: 2) -> modulo(number_0, 2_UInt8) UInt8 : 1
Positions: 0 1 3
ReadFromStorage (SystemNumbers)
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=1;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=0;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028

View File

@ -1,26 +0,0 @@
-- { echoOn }
EXPLAIN actions=1
(
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
EXPLAIN actions=1
(
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=1;
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=0;

View File

@ -0,0 +1,4 @@
1 2 3
4 5 6
7 8 9
0 0 0

View File

@ -0,0 +1,5 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
select * from s3('http://localhost:11111/test/a.tsv', CustomSeparated);

View File

@ -363,7 +363,7 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
uint32_t getInode(const char * self)
uint64_t getInode(const char * self)
{
std::ifstream maps("/proc/self/maps");
if (maps.fail())
@ -380,7 +380,7 @@ uint32_t getInode(const char * self)
{
std::stringstream ss(line); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::string addr, mode, offset, id, path;
uint32_t inode = 0;
uint64_t inode = 0;
if (ss >> addr >> mode >> offset >> id >> inode >> path && path == self)
return inode;
}
@ -415,7 +415,7 @@ int main(int/* argc*/, char* argv[])
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// get inode of this executable
uint32_t inode = getInode(self);
uint64_t inode = getInode(self);
if (inode == 0)
{
std::cerr << "Unable to obtain inode." << std::endl;
@ -447,6 +447,11 @@ int main(int/* argc*/, char* argv[])
return 1;
}
/// inconsistency in WSL1 Ubuntu - inode reported in /proc/self/maps is a 64bit to
/// 32bit conversion of input_info.st_ino
if (input_info.st_ino & 0xFFFFFFFF00000000 && !(inode & 0xFFFFFFFF00000000))
input_info.st_ino &= 0x00000000FFFFFFFF;
/// if decompression was performed by another process since this copy was started
/// then file referred by path "self" is already pointing to different inode
if (input_info.st_ino != inode)