diff --git a/contrib/poco b/contrib/poco
index 79923422618..0ab9bba7cca 160000
--- a/contrib/poco
+++ b/contrib/poco
@@ -1 +1 @@
-Subproject commit 799234226187c0ae0b8c90f23465b25ed7956e56
+Subproject commit 0ab9bba7ccad3c8dacce04a35cb3b78218547ab4
diff --git a/docker/test/stateless/run.sh b/docker/test/stateless/run.sh
index 2165045e565..ee4b5d7c156 100755
--- a/docker/test/stateless/run.sh
+++ b/docker/test/stateless/run.sh
@@ -128,6 +128,7 @@ function run_tests()
if [[ "${HIGH_LEVEL_COVERAGE}" = "YES" ]]; then
ADDITIONAL_OPTIONS+=('--report-coverage')
+ ADDITIONAL_OPTIONS+=('--report-logs-stats')
fi
set +e
diff --git a/docker/test/stress/stress b/docker/test/stress/stress
index cf92b86c18f..4afd2745526 100755
--- a/docker/test/stress/stress
+++ b/docker/test/stress/stress
@@ -289,6 +289,7 @@ if __name__ == "__main__":
"--database=system",
"--hung-check",
"--stress",
+ "--report-logs-stats",
"00001_select_1",
]
)
diff --git a/docs/en/development/architecture.md b/docs/en/development/architecture.md
index fe644c43889..4d4271ab79f 100644
--- a/docs/en/development/architecture.md
+++ b/docs/en/development/architecture.md
@@ -182,6 +182,31 @@ No matter what pool is used for a job, at start `ThreadStatus` instance is creat
If thread is related to query execution, then the most important thing attached to `ThreadStatus` is query context `ContextPtr`. Every query has its master thread in the server pool. Master thread does the attachment by holding an `ThreadStatus::QueryScope query_scope(query_context)` object. Master thread also creates a thread group represented with `ThreadGroupStatus` object. Every additional thread that is allocated during this query execution is attached to its thread group by `CurrentThread::attachTo(thread_group)` call. Thread groups are used to aggregate profile event counters and track memory consumption by all threads dedicated to a single task (see `MemoryTracker` and `ProfileEvents::Counters` classes for more information).
+## Concurrency control {#concurrency-control}
+Query that can be parallelized uses `max_threads` setting to limit itself. Default value for this setting is selected in a way that allows single query to utilize all CPU cores in the best way. But what if there are multiple concurrent queries and each of them uses default `max_threads` setting value? Then queries will share CPU resources. OS will ensure fairness by constantly switching threads, which introduce some performance penalty. `ConcurrencyControl` helps to deal with this penalty and avoid allocating a lot of threads. Configuration setting `concurrent_threads_soft_limit_num` is used to limit how many concurrent thread can be allocated before applying some kind of CPU pressure.
+
+:::note
+`concurrent_threads_soft_limit_num` and `concurrent_threads_soft_limit_ratio_to_cores` are disabled (equal 0) by default. So this feature must be enabled before use.
+:::
+
+Notion of CPU `slot` is introduced. Slot is a unit of concurrency: to run a thread query has to acquire a slot in advance and release it when thread stops. The number of slots is globally limited in a server. Multiple concurrent queries are competing for CPU slots if the total demand exceeds the total number of slots. `ConcurrencyControl` is responsible to resolve this competition by doing CPU slot scheduling in a fair manner.
+
+Each slot can be seen as an independent state machine with the following states:
+ * `free`: slot is available to be allocated by any query.
+ * `granted`: slot is `allocated` by specific query, but not yet acquired by any thread.
+ * `acquired`: slot is `allocated` by specific query and acquired by a thread.
+
+Note that `allocated` slot can be in two different states: `granted` and `acquired`. The former is a transitional state, that actually should be short (from the instant when a slot is allocated to a query till the moment when the up-scaling procedure is run by any thread of that query).
+
+![state diagram](@site/docs/en/development/images/concurrency.png)
+
+API of `ConcurrencyControl` consists of the following functions:
+1. Create a resource allocation for a query: `auto slots = ConcurrencyControl::instance().allocate(1, max_threads);`. It will allocate at least 1 and at most `max_threads` slots. Note that the first slot is granted immediately, but the remaining slots may be granted later. Thus limit is soft, because every query will obtain at least one thread.
+2. For every thread a slot has to be acquired from an allocation: `while (auto slot = slots->tryAcquire()) spawnThread([slot = std::move(slot)] { ... });`.
+3. Update the total amount of slots: `ConcurrencyControl::setMaxConcurrency(concurrent_threads_soft_limit_num)`. Can be done in runtime, w/o server restart.
+
+This API allows queries to start with at least one thread (in presence of CPU pressure) and later scale up to `max_threads`.
+
## Distributed Query Execution {#distributed-query-execution}
Servers in a cluster setup are mostly independent. You can create a `Distributed` table on one or all servers in a cluster. The `Distributed` table does not store data itself – it only provides a “view” to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The `Distributed` table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers and does not send much intermediate data over the network.
diff --git a/docs/en/development/images/concurrency.png b/docs/en/development/images/concurrency.png
new file mode 100644
index 00000000000..ffd344a54fb
Binary files /dev/null and b/docs/en/development/images/concurrency.png differ
diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md
index 02f7b5008d5..cf400780826 100644
--- a/docs/en/operations/server-configuration-parameters/settings.md
+++ b/docs/en/operations/server-configuration-parameters/settings.md
@@ -757,6 +757,10 @@ Possible values:
Default value: `0`.
+**See Also**
+
+- [Concurrency Control](/docs/en/development/architecture.md#concurrency-control)
+
## concurrent_threads_soft_limit_ratio_to_cores {#concurrent_threads_soft_limit_ratio_to_cores}
The maximum number of query processing threads as multiple of number of logical cores.
More details: [concurrent_threads_soft_limit_num](#concurrent-threads-soft-limit-num).
@@ -768,6 +772,12 @@ Possible values:
Default value: `0`.
+**Example**
+
+``` xml
+3
+```
+
## max_concurrent_queries {#max-concurrent-queries}
The maximum number of simultaneously processed queries.
diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp
index 84a1d5025e3..7961bfbae31 100644
--- a/src/Analyzer/FunctionNode.cpp
+++ b/src/Analyzer/FunctionNode.cpp
@@ -2,6 +2,7 @@
#include
#include
+#include
#include
#include
@@ -31,6 +32,15 @@ FunctionNode::FunctionNode(String function_name_)
children[arguments_child_index] = std::make_shared();
}
+const DataTypes & FunctionNode::getArgumentTypes() const
+{
+ if (!function)
+ throw Exception(ErrorCodes::LOGICAL_ERROR,
+ "Function {} is not resolved",
+ function_name);
+ return function->getArgumentTypes();
+}
+
ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
{
const auto & arguments = getArguments().getNodes();
diff --git a/src/Analyzer/FunctionNode.h b/src/Analyzer/FunctionNode.h
index 49e66ba32c1..92d3ab84722 100644
--- a/src/Analyzer/FunctionNode.h
+++ b/src/Analyzer/FunctionNode.h
@@ -85,6 +85,7 @@ public:
/// Get arguments node
QueryTreeNodePtr & getArgumentsNode() { return children[arguments_child_index]; }
+ const DataTypes & getArgumentTypes() const;
ColumnsWithTypeAndName getArgumentColumns() const;
/// Returns true if function node has window, false otherwise
diff --git a/src/Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.cpp b/src/Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.cpp
index 149af61e002..03236848903 100644
--- a/src/Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.cpp
+++ b/src/Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.cpp
@@ -3,6 +3,7 @@
#include
#include
+#include
#include
#include
@@ -47,19 +48,23 @@ Field zeroField(const Field & value)
class AggregateFunctionsArithmericOperationsVisitor : public InDepthQueryTreeVisitor
{
public:
+ explicit AggregateFunctionsArithmericOperationsVisitor(ContextPtr context_)
+ : context(std::move(context_))
+ {}
+
/// Traverse tree bottom to top
static bool shouldTraverseTopToBottom()
{
return false;
}
- static void visitImpl(QueryTreeNodePtr & node)
+ void visitImpl(QueryTreeNodePtr & node)
{
auto * aggregate_function_node = node->as();
if (!aggregate_function_node || !aggregate_function_node->isAggregateFunction())
return;
- static std::unordered_map> supported_functions
+ static std::unordered_map> supported_aggregate_functions
= {{"sum", {"multiply", "divide"}},
{"min", {"multiply", "divide", "plus", "minus"}},
{"max", {"multiply", "divide", "plus", "minus"}},
@@ -69,88 +74,111 @@ public:
if (aggregate_function_arguments_nodes.size() != 1)
return;
- auto * inner_function_node = aggregate_function_arguments_nodes[0]->as();
- if (!inner_function_node)
+ const auto & arithmetic_function_node = aggregate_function_arguments_nodes[0];
+ auto * arithmetic_function_node_typed = arithmetic_function_node->as();
+ if (!arithmetic_function_node_typed)
return;
- const auto & inner_function_arguments_nodes = inner_function_node->getArguments().getNodes();
- if (inner_function_arguments_nodes.size() != 2)
+ const auto & arithmetic_function_arguments_nodes = arithmetic_function_node_typed->getArguments().getNodes();
+ if (arithmetic_function_arguments_nodes.size() != 2)
return;
/// Aggregate functions[sum|min|max|avg] is case-insensitive, so we use lower cases name
- auto lower_function_name = Poco::toLower(aggregate_function_node->getFunctionName());
+ auto lower_aggregate_function_name = Poco::toLower(aggregate_function_node->getFunctionName());
- auto supported_function_it = supported_functions.find(lower_function_name);
- if (supported_function_it == supported_functions.end())
+ auto supported_aggregate_function_it = supported_aggregate_functions.find(lower_aggregate_function_name);
+ if (supported_aggregate_function_it == supported_aggregate_functions.end())
return;
- const auto & inner_function_name = inner_function_node->getFunctionName();
-
- if (!supported_function_it->second.contains(inner_function_name))
+ const auto & arithmetic_function_name = arithmetic_function_node_typed->getFunctionName();
+ if (!supported_aggregate_function_it->second.contains(arithmetic_function_name))
return;
- const auto * left_argument_constant_node = inner_function_arguments_nodes[0]->as();
- const auto * right_argument_constant_node = inner_function_arguments_nodes[1]->as();
+ const auto * left_argument_constant_node = arithmetic_function_arguments_nodes[0]->as();
+ const auto * right_argument_constant_node = arithmetic_function_arguments_nodes[1]->as();
/** If we extract negative constant, aggregate function name must be updated.
*
* Example: SELECT min(-1 * id);
* Result: SELECT -1 * max(id);
*/
- std::string function_name_if_constant_is_negative;
- if (inner_function_name == "multiply" || inner_function_name == "divide")
+ std::string aggregate_function_name_if_constant_is_negative;
+ if (arithmetic_function_name == "multiply" || arithmetic_function_name == "divide")
{
- if (lower_function_name == "min")
- function_name_if_constant_is_negative = "max";
- else if (lower_function_name == "max")
- function_name_if_constant_is_negative = "min";
+ if (lower_aggregate_function_name == "min")
+ aggregate_function_name_if_constant_is_negative = "max";
+ else if (lower_aggregate_function_name == "max")
+ aggregate_function_name_if_constant_is_negative = "min";
}
+ size_t arithmetic_function_argument_index = 0;
+
if (left_argument_constant_node && !right_argument_constant_node)
{
/// Do not rewrite `sum(1/n)` with `sum(1) * div(1/n)` because of lose accuracy
- if (inner_function_name == "divide")
+ if (arithmetic_function_name == "divide")
return;
/// Rewrite `aggregate_function(inner_function(constant, argument))` into `inner_function(constant, aggregate_function(argument))`
const auto & left_argument_constant_value_literal = left_argument_constant_node->getValue();
- if (!function_name_if_constant_is_negative.empty() &&
+ if (!aggregate_function_name_if_constant_is_negative.empty() &&
left_argument_constant_value_literal < zeroField(left_argument_constant_value_literal))
{
- lower_function_name = function_name_if_constant_is_negative;
+ lower_aggregate_function_name = aggregate_function_name_if_constant_is_negative;
}
- auto inner_function_clone = inner_function_node->clone();
- auto & inner_function_clone_arguments = inner_function_clone->as().getArguments();
- auto & inner_function_clone_arguments_nodes = inner_function_clone_arguments.getNodes();
- auto inner_function_clone_right_argument = inner_function_clone_arguments_nodes[1];
- aggregate_function_arguments_nodes = {inner_function_clone_right_argument};
- resolveAggregateFunctionNode(*aggregate_function_node, inner_function_clone_right_argument, lower_function_name);
- inner_function_clone_arguments_nodes[1] = node;
- node = std::move(inner_function_clone);
+ arithmetic_function_argument_index = 1;
}
else if (right_argument_constant_node)
{
/// Rewrite `aggregate_function(inner_function(argument, constant))` into `inner_function(aggregate_function(argument), constant)`
const auto & right_argument_constant_value_literal = right_argument_constant_node->getValue();
- if (!function_name_if_constant_is_negative.empty() &&
+ if (!aggregate_function_name_if_constant_is_negative.empty() &&
right_argument_constant_value_literal < zeroField(right_argument_constant_value_literal))
{
- lower_function_name = function_name_if_constant_is_negative;
+ lower_aggregate_function_name = aggregate_function_name_if_constant_is_negative;
}
- auto inner_function_clone = inner_function_node->clone();
- auto & inner_function_clone_arguments = inner_function_clone->as().getArguments();
- auto & inner_function_clone_arguments_nodes = inner_function_clone_arguments.getNodes();
- auto inner_function_clone_left_argument = inner_function_clone_arguments_nodes[0];
- aggregate_function_arguments_nodes = {inner_function_clone_left_argument};
- resolveAggregateFunctionNode(*aggregate_function_node, inner_function_clone_left_argument, lower_function_name);
- inner_function_clone_arguments_nodes[0] = node;
- node = std::move(inner_function_clone);
+ arithmetic_function_argument_index = 0;
}
+
+ auto optimized_function_node = cloneArithmeticFunctionAndWrapArgumentIntoAggregateFunction(arithmetic_function_node,
+ arithmetic_function_argument_index,
+ node,
+ lower_aggregate_function_name);
+ if (optimized_function_node->getResultType()->equals(*node->getResultType()))
+ node = std::move(optimized_function_node);
}
private:
+ QueryTreeNodePtr cloneArithmeticFunctionAndWrapArgumentIntoAggregateFunction(
+ const QueryTreeNodePtr & arithmetic_function,
+ size_t arithmetic_function_argument_index,
+ const QueryTreeNodePtr & aggregate_function,
+ const std::string & result_aggregate_function_name)
+ {
+ auto arithmetic_function_clone = arithmetic_function->clone();
+ auto & arithmetic_function_clone_typed = arithmetic_function_clone->as();
+ auto & arithmetic_function_clone_arguments_nodes = arithmetic_function_clone_typed.getArguments().getNodes();
+ auto & arithmetic_function_clone_argument = arithmetic_function_clone_arguments_nodes[arithmetic_function_argument_index];
+
+ auto aggregate_function_clone = aggregate_function->clone();
+ auto & aggregate_function_clone_typed = aggregate_function_clone->as();
+ aggregate_function_clone_typed.getArguments().getNodes() = { arithmetic_function_clone_argument };
+ resolveAggregateFunctionNode(aggregate_function_clone_typed, arithmetic_function_clone_argument, result_aggregate_function_name);
+
+ arithmetic_function_clone_arguments_nodes[arithmetic_function_argument_index] = std::move(aggregate_function_clone);
+ resolveOrdinaryFunctionNode(arithmetic_function_clone_typed, arithmetic_function_clone_typed.getFunctionName());
+
+ return arithmetic_function_clone;
+ }
+
+ inline void resolveOrdinaryFunctionNode(FunctionNode & function_node, const String & function_name) const
+ {
+ auto function = FunctionFactory::instance().get(function_name, context);
+ function_node.resolveAsFunction(function->build(function_node.getArgumentColumns()));
+ }
+
static inline void resolveAggregateFunctionNode(FunctionNode & function_node, const QueryTreeNodePtr & argument, const String & aggregate_function_name)
{
auto function_aggregate_function = function_node.getAggregateFunction();
@@ -163,13 +191,15 @@ private:
function_node.resolveAsAggregateFunction(std::move(aggregate_function));
}
+
+ ContextPtr context;
};
}
-void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr)
+void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
- AggregateFunctionsArithmericOperationsVisitor visitor;
+ AggregateFunctionsArithmericOperationsVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}
diff --git a/src/Analyzer/Passes/QueryAnalysisPass.cpp b/src/Analyzer/Passes/QueryAnalysisPass.cpp
index 1c9dd01e2a5..f92eac67804 100644
--- a/src/Analyzer/Passes/QueryAnalysisPass.cpp
+++ b/src/Analyzer/Passes/QueryAnalysisPass.cpp
@@ -1,6 +1,7 @@
#include
#include
+#include
#include
#include
@@ -66,6 +67,14 @@
#include
#include
#include
+#include
+#include
+
+namespace ProfileEvents
+{
+ extern const Event ScalarSubqueriesGlobalCacheHit;
+ extern const Event ScalarSubqueriesCacheMiss;
+}
#include
@@ -1097,7 +1106,7 @@ private:
static QueryTreeNodePtr tryGetLambdaFromSQLUserDefinedFunctions(const std::string & function_name, ContextPtr context);
- static void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, size_t subquery_depth, ContextPtr context);
+ void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, size_t subquery_depth, ContextPtr context);
static void mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope);
@@ -1207,6 +1216,9 @@ private:
/// Global resolve expression node to projection names map
std::unordered_map resolved_expressions;
+ /// Results of scalar sub queries
+ std::unordered_map> scalars;
+
};
/// Utility functions implementation
@@ -1687,6 +1699,16 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
node->getNodeTypeName(),
node->formatASTForErrorMessage());
+ auto scalars_iterator = scalars.find(node.get());
+ if (scalars_iterator != scalars.end())
+ {
+ ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
+ node = std::make_shared(scalars_iterator->second, node);
+ return;
+ }
+
+ ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
+
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
@@ -1699,10 +1721,11 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
auto io = interpreter->execute();
- Block block;
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
+ Block block;
+
while (block.rows() == 0 && executor.pull(block))
{
}
@@ -1743,7 +1766,6 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
block = materializeBlock(block);
size_t columns = block.columns();
- // Block scalar;
Field scalar_value;
DataTypePtr scalar_type;
@@ -1770,6 +1792,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
}
auto constant_value = std::make_shared(std::move(scalar_value), std::move(scalar_type));
+ scalars[node.get()] = constant_value;
node = std::make_shared(std::move(constant_value), node);
}
diff --git a/src/Analyzer/QueryTreePassManager.cpp b/src/Analyzer/QueryTreePassManager.cpp
index 8efe0dd4602..4a1ab74130d 100644
--- a/src/Analyzer/QueryTreePassManager.cpp
+++ b/src/Analyzer/QueryTreePassManager.cpp
@@ -1,5 +1,18 @@
#include
+#include
+
+#include
+#include
+
+#include
+
+#include
+
+#include
+#include
+#include
+#include
#include
#include
#include
@@ -17,15 +30,6 @@
#include
#include
-#include
-#include
-
-#include
-#include
-#include
-#include
-#include
-
namespace DB
{
@@ -45,24 +49,6 @@ namespace
*/
class ValidationChecker : public InDepthQueryTreeVisitor
{
- String pass_name;
-
- void visitColumn(ColumnNode * column) const
- {
- if (column->getColumnSourceOrNull() == nullptr)
- throw Exception(ErrorCodes::LOGICAL_ERROR,
- "Column {} {} query tree node does not have valid source node after running {} pass",
- column->getColumnName(), column->getColumnType(), pass_name);
- }
-
- void visitFunction(FunctionNode * function) const
- {
- if (!function->isResolved())
- throw Exception(ErrorCodes::LOGICAL_ERROR,
- "Function {} is not resolved after running {} pass",
- function->toAST()->formatForErrorMessage(), pass_name);
- }
-
public:
explicit ValidationChecker(String pass_name_)
: pass_name(std::move(pass_name_))
@@ -75,6 +61,57 @@ public:
else if (auto * function = node->as())
return visitFunction(function);
}
+private:
+ void visitColumn(ColumnNode * column) const
+ {
+ if (column->getColumnSourceOrNull() == nullptr)
+ throw Exception(ErrorCodes::LOGICAL_ERROR,
+ "Column {} {} query tree node does not have valid source node after running {} pass",
+ column->getColumnName(), column->getColumnType(), pass_name);
+ }
+
+ void visitFunction(FunctionNode * function) const
+ {
+ if (!function->isResolved())
+ throw Exception(ErrorCodes::LOGICAL_ERROR,
+ "Function {} is not resolved after running {} pass",
+ function->toAST()->formatForErrorMessage(), pass_name);
+
+ if (isNameOfInFunction(function->getFunctionName()))
+ return;
+
+ const auto & expected_argument_types = function->getArgumentTypes();
+ size_t expected_argument_types_size = expected_argument_types.size();
+ auto actual_argument_columns = function->getArgumentColumns();
+
+ if (expected_argument_types_size != actual_argument_columns.size())
+ throw Exception(ErrorCodes::LOGICAL_ERROR,
+ "Function {} expects {} arguments but has {} after running {} pass",
+ function->toAST()->formatForErrorMessage(),
+ expected_argument_types_size,
+ actual_argument_columns.size(),
+ pass_name);
+
+ for (size_t i = 0; i < expected_argument_types_size; ++i)
+ {
+ // Skip lambdas
+ if (WhichDataType(expected_argument_types[i]).isFunction())
+ continue;
+
+ if (!expected_argument_types[i]->equals(*actual_argument_columns[i].type))
+ {
+ throw Exception(ErrorCodes::LOGICAL_ERROR,
+ "Function {} expects {} argument to have {} type but receives {} after running {} pass",
+ function->toAST()->formatForErrorMessage(),
+ i + 1,
+ expected_argument_types[i]->getName(),
+ actual_argument_columns[i].type->getName(),
+ pass_name);
+ }
+ }
+ }
+
+ String pass_name;
};
#endif
diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp
index 4529f34e2a7..aa667fde06f 100644
--- a/src/Backups/BackupIO_S3.cpp
+++ b/src/Backups/BackupIO_S3.cpp
@@ -156,10 +156,9 @@ void BackupWriterS3::copyObjectImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- const Aws::S3::Model::HeadObjectResult & head,
+ size_t size,
const std::optional & metadata) const
{
- size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
Aws::S3::Model::CopyObjectRequest request;
@@ -177,7 +176,7 @@ void BackupWriterS3::copyObjectImpl(
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
- copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
+ copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
return;
}
@@ -191,10 +190,9 @@ void BackupWriterS3::copyObjectMultipartImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- const Aws::S3::Model::HeadObjectResult & head,
+ size_t size,
const std::optional & metadata) const
{
- size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
String multipart_upload_id;
@@ -309,16 +307,16 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
std::string source_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / file_name_to;
- auto head = S3::headObject(*client, source_bucket, objects[0].absolute_path).GetResult();
- if (static_cast(head.GetContentLength()) < request_settings.getUploadSettings().max_single_operation_copy_size)
+ auto size = S3::getObjectSize(*client, source_bucket, objects[0].absolute_path);
+ if (size < request_settings.getUploadSettings().max_single_operation_copy_size)
{
copyObjectImpl(
- source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
+ source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
else
{
copyObjectMultipartImpl(
- source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
+ source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
}
}
diff --git a/src/Backups/BackupIO_S3.h b/src/Backups/BackupIO_S3.h
index 634b35c1e74..70487717c48 100644
--- a/src/Backups/BackupIO_S3.h
+++ b/src/Backups/BackupIO_S3.h
@@ -67,7 +67,7 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- const Aws::S3::Model::HeadObjectResult & head,
+ size_t size,
const std::optional & metadata = std::nullopt) const;
void copyObjectMultipartImpl(
@@ -75,7 +75,7 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- const Aws::S3::Model::HeadObjectResult & head,
+ size_t size,
const std::optional & metadata = std::nullopt) const;
void removeFilesBatch(const Strings & file_names);
diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp
index 1b387c5a080..3732af0d4f3 100644
--- a/src/Common/ProfileEvents.cpp
+++ b/src/Common/ProfileEvents.cpp
@@ -309,6 +309,8 @@ The server successfully detected this situation and will download merged part fr
M(S3CopyObject, "Number of S3 API CopyObject calls.") \
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
+ M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \
+ M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
@@ -321,6 +323,8 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
+ M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \
+ M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \
diff --git a/src/Common/logger_useful.h b/src/Common/logger_useful.h
index 1e84efd8085..e83245c0fa1 100644
--- a/src/Common/logger_useful.h
+++ b/src/Common/logger_useful.h
@@ -7,6 +7,29 @@
#include
#include
+/// This wrapper is useful to save formatted message into a String before sending it to a logger
+class LogToStrImpl
+{
+ String & out_str;
+ Poco::Logger * logger;
+ bool propagate_to_actual_log = true;
+public:
+ LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_) , logger(logger_) {}
+ LogToStrImpl & operator -> () { return *this; }
+ bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
+ LogToStrImpl * getChannel() {return this; }
+ const String & name() const { return logger->name(); }
+ void log(const Poco::Message & message)
+ {
+ out_str = message.getText();
+ if (!propagate_to_actual_log)
+ return;
+ if (auto * channel = logger->getChannel())
+ channel->log(message);
+ }
+};
+
+#define LogToStr(x, y) std::make_unique(x, y)
namespace
{
@@ -17,8 +40,37 @@ namespace
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
+ [[maybe_unused]] std::unique_ptr getLogger(std::unique_ptr && logger) { return logger; };
+
+ template struct is_fmt_runtime : std::false_type {};
+ template struct is_fmt_runtime> : std::true_type {};
+
+ /// Usually we use LOG_*(...) macros with either string literals or fmt::runtime(whatever) as a format string.
+ /// This function is useful to get a string_view to a static format string passed to LOG_* macro.
+ template constexpr std::string_view tryGetStaticFormatString(T && x)
+ {
+ if constexpr (is_fmt_runtime::value)
+ {
+ /// It definitely was fmt::runtime(something).
+ /// We are not sure about a lifetime of the string, so return empty view.
+ /// Also it can be arbitrary string, not a formatting pattern.
+ /// So returning empty pattern will not pollute the set of patterns.
+ return std::string_view();
+ }
+ else
+ {
+ /// Most likely it was a string literal.
+ /// Unfortunately, there's no good way to check if something is a string literal.
+ /// But fmtlib requires a format string to be compile-time constant unless fmt::runtime is used.
+ static_assert(std::is_nothrow_convertible::value);
+ static_assert(!std::is_pointer::value);
+ return std::string_view(x);
+ }
+ }
}
+#define LOG_IMPL_FIRST_ARG(X, ...) X
+
/// Logs a message to a specified logger with that level.
/// If more than one argument is provided,
/// the first argument is interpreted as template with {}-substitutions
@@ -30,7 +82,7 @@ namespace
auto _logger = ::getLogger(logger); \
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
(DB::CurrentThread::getGroup()->client_logs_level >= (priority)); \
- if (_logger->is((PRIORITY)) || _is_clients_log) \
+ if (_is_clients_log || _logger->is((PRIORITY))) \
{ \
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
if (auto _channel = _logger->getChannel()) \
@@ -40,7 +92,7 @@ namespace
file_function += "; "; \
file_function += __PRETTY_FUNCTION__; \
Poco::Message poco_message(_logger->name(), formatted_message, \
- (PRIORITY), file_function.c_str(), __LINE__); \
+ (PRIORITY), file_function.c_str(), __LINE__, tryGetStaticFormatString(LOG_IMPL_FIRST_ARG(__VA_ARGS__))); \
_channel->log(poco_message); \
} \
} \
diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp
index ad1d690f4a9..711f1553ce6 100644
--- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp
+++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp
@@ -171,8 +171,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
if (!hasPendingDataToRead())
return false;
- size_t size, offset;
+ chassert(file_offset_of_buffer_end <= impl->getFileSize());
+ size_t size, offset;
if (prefetch_future.valid())
{
ProfileEventTimeIncrement watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
@@ -210,8 +211,8 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
- assert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
- assert(file_offset_of_buffer_end <= impl->getFileSize());
+ chassert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
+ chassert(file_offset_of_buffer_end <= impl->getFileSize());
return bytes_read;
}
@@ -277,6 +278,15 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
+ if (read_until_position && new_pos > *read_until_position)
+ {
+ ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
+ impl->reset();
+
+ file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
+ return new_pos;
+ }
+
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).
diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
index 01d4154199a..712a9a7c3b1 100644
--- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
+++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
@@ -256,7 +256,7 @@ size_t ReadBufferFromRemoteFSGather::getFileSize() const
String ReadBufferFromRemoteFSGather::getInfoForLog()
{
if (!current_buf)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get info: buffer not initialized");
+ return "";
return current_buf->getInfoForLog();
}
diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
index 3c620ca819e..a56a78d6722 100644
--- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
+++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
@@ -125,14 +125,19 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
}
-Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
+size_t S3ObjectStorage::getObjectSize(const std::string & bucket_from, const std::string & key) const
{
- return S3::headObject(*client.get(), bucket_from, key, "", true);
+ return S3::getObjectSize(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true);
}
bool S3ObjectStorage::exists(const StoredObject & object) const
{
- return S3::objectExists(*client.get(), bucket, object.absolute_path, "", true);
+ return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, /* for_disk_s3= */ true);
+}
+
+void S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key, std::string_view description) const
+{
+ return S3::checkObjectExists(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true, description);
}
std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT
@@ -409,13 +414,10 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
{
ObjectMetadata result;
- auto object_head = requestObjectHeadData(bucket, path);
- throwIfError(object_head);
-
- auto & object_head_result = object_head.GetResult();
- result.size_bytes = object_head_result.GetContentLength();
- result.last_modified = object_head_result.GetLastModified().Millis();
- result.attributes = object_head_result.GetMetadata();
+ auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, /* for_disk_s3= */ true);
+ result.size_bytes = object_info.size;
+ result.last_modified = object_info.last_modification_time;
+ result.attributes = S3::getObjectMetadata(*client.get(), bucket, path, {}, /* for_disk_s3= */ true);
return result;
}
@@ -442,7 +444,7 @@ void S3ObjectStorage::copyObjectImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- std::optional head,
+ size_t size,
std::optional metadata) const
{
auto client_ptr = client.get();
@@ -464,7 +466,7 @@ void S3ObjectStorage::copyObjectImpl(
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
- copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
+ copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
return;
}
@@ -472,12 +474,7 @@ void S3ObjectStorage::copyObjectImpl(
auto settings_ptr = s3_settings.get();
if (settings_ptr->request_settings.check_objects_after_upload)
- {
- auto object_head = requestObjectHeadData(dst_bucket, dst_key);
- if (!object_head.IsSuccess())
- throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
- }
-
+ checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
}
void S3ObjectStorage::copyObjectMultipartImpl(
@@ -485,15 +482,11 @@ void S3ObjectStorage::copyObjectMultipartImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- std::optional head,
+ size_t size,
std::optional metadata) const
{
- if (!head)
- head = requestObjectHeadData(src_bucket, src_key).GetResult();
-
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
- size_t size = head->GetContentLength();
String multipart_upload_id;
@@ -569,29 +562,24 @@ void S3ObjectStorage::copyObjectMultipartImpl(
}
if (settings_ptr->request_settings.check_objects_after_upload)
- {
- auto object_head = requestObjectHeadData(dst_bucket, dst_key);
- if (!object_head.IsSuccess())
- throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
- }
-
+ checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
}
void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional object_to_attributes)
{
- auto head = requestObjectHeadData(bucket, object_from.absolute_path).GetResult();
+ auto size = getObjectSize(bucket, object_from.absolute_path);
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
- if (head.GetContentLength() >= multipart_upload_threashold)
+ if (size >= multipart_upload_threashold)
{
copyObjectMultipartImpl(
- bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
+ bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
}
else
{
copyObjectImpl(
- bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
+ bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
}
}
diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h
index 0a07639e253..a6318bf59b8 100644
--- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h
+++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h
@@ -172,7 +172,7 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- std::optional head = std::nullopt,
+ size_t size,
std::optional metadata = std::nullopt) const;
void copyObjectMultipartImpl(
@@ -180,13 +180,14 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
- std::optional head = std::nullopt,
+ size_t size,
std::optional metadata = std::nullopt) const;
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
- Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
+ size_t getObjectSize(const std::string & bucket_from, const std::string & key) const;
+ void checkObjectExists(const std::string & bucket_from, const std::string & key, std::string_view description) const;
std::string bucket;
diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp
index 2d274435a74..69d2a244097 100644
--- a/src/IO/ReadBufferFromS3.cpp
+++ b/src/IO/ReadBufferFromS3.cpp
@@ -250,7 +250,7 @@ size_t ReadBufferFromS3::getFileSize()
if (file_size)
return *file_size;
- auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, true, read_settings.for_object_storage);
+ auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, /* for_disk_s3= */ read_settings.for_object_storage);
file_size = object_size;
return *file_size;
diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp
index 0c3a63b46ea..a18fcf70566 100644
--- a/src/IO/S3Common.cpp
+++ b/src/IO/S3Common.cpp
@@ -27,6 +27,8 @@
# include
# include
# include
+# include
+# include
# include
# include
@@ -40,7 +42,11 @@
namespace ProfileEvents
{
+ extern const Event S3GetObjectAttributes;
+ extern const Event S3GetObjectMetadata;
extern const Event S3HeadObject;
+ extern const Event DiskS3GetObjectAttributes;
+ extern const Event DiskS3GetObjectMetadata;
extern const Event DiskS3HeadObject;
}
@@ -699,6 +705,92 @@ public:
}
};
+/// Extracts the endpoint from a constructed S3 client.
+String getEndpoint(const Aws::S3::S3Client & client)
+{
+ const auto * endpoint_provider = dynamic_cast(const_cast(client).accessEndpointProvider().get());
+ if (!endpoint_provider)
+ return {};
+ String endpoint;
+ endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
+ return endpoint;
+}
+
+/// Performs a request to get the size and last modification time of an object.
+/// The function performs either HeadObject or GetObjectAttributes request depending on the endpoint.
+std::pair, Aws::S3::S3Error> tryGetObjectInfo(
+ const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
+{
+ auto endpoint = getEndpoint(client);
+ bool use_get_object_attributes_request = (endpoint.find(".amazonaws.com") != String::npos);
+
+ if (use_get_object_attributes_request)
+ {
+ /// It's better not to use `HeadObject` requests for AWS S3 because they don't work well with the global region.
+ /// Details: `HeadObject` request never returns a response body (even if there is an error) however
+ /// if the request was sent without specifying a region in the endpoint (i.e. for example "https://test.s3.amazonaws.com/mydata.csv"
+ /// instead of "https://test.s3-us-west-2.amazonaws.com/mydata.csv") then that response body is one of the main ways
+ /// to determine the correct region and try to repeat the request again with the correct region.
+ /// For any other request type (`GetObject`, `ListObjects`, etc.) AWS SDK does that because they have response bodies,
+ /// but for `HeadObject` there is no response body so this way doesn't work. That's why we use `GetObjectAttributes` request instead.
+ /// See https://github.com/aws/aws-sdk-cpp/issues/1558 and also the function S3ErrorMarshaller::ExtractRegion() for more information.
+
+ ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes);
+ if (for_disk_s3)
+ ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes);
+
+ Aws::S3::Model::GetObjectAttributesRequest req;
+ req.SetBucket(bucket);
+ req.SetKey(key);
+
+ if (!version_id.empty())
+ req.SetVersionId(version_id);
+
+ req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize});
+
+ auto outcome = client.GetObjectAttributes(req);
+ if (outcome.IsSuccess())
+ {
+ const auto & result = outcome.GetResult();
+ DB::S3::ObjectInfo object_info;
+ object_info.size = static_cast(result.GetObjectSize());
+ object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
+ return {object_info, {}};
+ }
+
+ return {std::nullopt, outcome.GetError()};
+ }
+ else
+ {
+ /// By default we use `HeadObject` requests.
+ /// We cannot just use `GetObjectAttributes` requests always because some S3 providers (e.g. Minio)
+ /// don't support `GetObjectAttributes` requests.
+
+ ProfileEvents::increment(ProfileEvents::S3HeadObject);
+ if (for_disk_s3)
+ ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
+
+ Aws::S3::Model::HeadObjectRequest req;
+ req.SetBucket(bucket);
+ req.SetKey(key);
+
+ if (!version_id.empty())
+ req.SetVersionId(version_id);
+
+ auto outcome = client.HeadObject(req);
+ if (outcome.IsSuccess())
+ {
+ const auto & result = outcome.GetResult();
+ DB::S3::ObjectInfo object_info;
+ object_info.size = static_cast(result.GetContentLength());
+ object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
+ return {object_info, {}};
+ }
+
+ return {std::nullopt, outcome.GetError()};
+ }
+}
+
}
@@ -894,54 +986,33 @@ namespace S3
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
}
- Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
+ ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
{
- ProfileEvents::increment(ProfileEvents::S3HeadObject);
- if (for_disk_s3)
- ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
-
- Aws::S3::Model::HeadObjectRequest req;
- req.SetBucket(bucket);
- req.SetKey(key);
-
- if (!version_id.empty())
- req.SetVersionId(version_id);
-
- return client.HeadObject(req);
- }
-
- S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
- {
- auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
-
- if (outcome.IsSuccess())
+ auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
+ if (object_info)
{
- auto read_result = outcome.GetResultWithOwnership();
- return {.size = static_cast(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000};
+ return *object_info;
}
else if (throw_on_error)
{
- const auto & error = outcome.GetError();
throw DB::Exception(ErrorCodes::S3_ERROR,
- "Failed to HEAD object: {}. HTTP response code: {}",
+ "Failed to get object attributes: {}. HTTP response code: {}",
error.GetMessage(), static_cast(error.GetResponseCode()));
}
return {};
}
- size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
+ size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
{
- return getObjectInfo(client, bucket, key, version_id, throw_on_error, for_disk_s3).size;
+ return getObjectInfo(client, bucket, key, version_id, for_disk_s3, throw_on_error).size;
}
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
- auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
-
- if (outcome.IsSuccess())
+ auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
+ if (object_info)
return true;
- const auto & error = outcome.GetError();
if (isNotFoundError(error.GetErrorType()))
return false;
@@ -949,6 +1020,48 @@ namespace S3
"Failed to check existence of key {} in bucket {}: {}",
key, bucket, error.GetMessage());
}
+
+ void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, std::string_view description)
+ {
+ auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
+ if (object_info)
+ return;
+ throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",
+ (description.empty() ? "" : (String(description) + ": ")), key, bucket, error.GetMessage());
+ }
+
+ std::map getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
+ {
+ ProfileEvents::increment(ProfileEvents::S3GetObjectMetadata);
+ if (for_disk_s3)
+ ProfileEvents::increment(ProfileEvents::DiskS3GetObjectMetadata);
+
+ /// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h.
+
+ Aws::S3::Model::GetObjectRequest req;
+ req.SetBucket(bucket);
+ req.SetKey(key);
+
+ /// Only the first byte will be read.
+ /// We don't need that first byte but the range should be set otherwise the entire object will be read.
+ req.SetRange("bytes=0-0");
+
+ if (!version_id.empty())
+ req.SetVersionId(version_id);
+
+ auto outcome = client.GetObject(req);
+
+ if (outcome.IsSuccess())
+ return outcome.GetResult().GetMetadata();
+
+ if (!throw_on_error)
+ return {};
+
+ const auto & error = outcome.GetError();
+ throw S3Exception(error.GetErrorType(),
+ "Failed to get metadata of key {} in bucket {}: {}",
+ key, bucket, error.GetMessage());
+ }
}
}
diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h
index f0844c05abc..69ae1cbb4f4 100644
--- a/src/IO/S3Common.h
+++ b/src/IO/S3Common.h
@@ -11,15 +11,15 @@
#if USE_AWS_S3
#include
-#include
-#include
-#include
-#include
-#include
-
#include
#include
+#include
+#include
+#include
+
+
+namespace Aws::S3 { class S3Client; }
namespace DB
{
@@ -121,22 +121,29 @@ struct URI
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
+/// WARNING: Don't use `HeadObjectRequest`! Use the functions below instead.
+/// For explanation see the comment about `HeadObject` request in the function tryGetObjectInfo().
+
struct ObjectInfo
{
size_t size = 0;
time_t last_modification_time = 0;
};
-bool isNotFoundError(Aws::S3::S3Errors error);
+ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
-Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
-
-S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
-
-size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
+size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
+/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
+void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, std::string_view description = {});
+
+bool isNotFoundError(Aws::S3::S3Errors error);
+
+/// Returns the object's metadata.
+std::map getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
+
}
#endif
diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp
index ec77fc44de6..7d279c07e03 100644
--- a/src/IO/WriteBufferFromS3.cpp
+++ b/src/IO/WriteBufferFromS3.cpp
@@ -182,12 +182,8 @@ void WriteBufferFromS3::finalizeImpl()
if (check_objects_after_upload)
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
-
- auto response = S3::headObject(*client_ptr, bucket, key, "", write_settings.for_object_storage);
- if (!response.IsSuccess())
- throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), response.GetError().GetErrorType());
- else
- LOG_TRACE(log, "Object {} exists after upload", key);
+ S3::checkObjectExists(*client_ptr, bucket, key, {}, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload");
+ LOG_TRACE(log, "Object {} exists after upload", key);
}
}
diff --git a/src/Interpreters/TextLog.cpp b/src/Interpreters/TextLog.cpp
index 6d490e3e95f..45d5a7b2344 100644
--- a/src/Interpreters/TextLog.cpp
+++ b/src/Interpreters/TextLog.cpp
@@ -49,7 +49,9 @@ NamesAndTypesList TextLogElement::getNamesAndTypes()
{"revision", std::make_shared()},
{"source_file", std::make_shared(std::make_shared())},
- {"source_line", std::make_shared()}
+ {"source_line", std::make_shared()},
+
+ {"message_format_string", std::make_shared(std::make_shared())},
};
}
@@ -74,6 +76,8 @@ void TextLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file);
columns[i++]->insert(source_line);
+
+ columns[i++]->insert(message_format_string);
}
TextLog::TextLog(ContextPtr context_, const String & database_name_,
diff --git a/src/Interpreters/TextLog.h b/src/Interpreters/TextLog.h
index 243e001fc52..6efc1c906d4 100644
--- a/src/Interpreters/TextLog.h
+++ b/src/Interpreters/TextLog.h
@@ -28,6 +28,8 @@ struct TextLogElement
String source_file;
UInt64 source_line{};
+ std::string_view message_format_string;
+
static std::string name() { return "TextLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
diff --git a/src/Loggers/OwnSplitChannel.cpp b/src/Loggers/OwnSplitChannel.cpp
index 35a6d4ad86a..7974d5212e1 100644
--- a/src/Loggers/OwnSplitChannel.cpp
+++ b/src/Loggers/OwnSplitChannel.cpp
@@ -133,6 +133,8 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
elem.source_file = msg.getSourceFile();
elem.source_line = msg.getSourceLine();
+ elem.message_format_string = msg.getFormatString();
+
std::shared_ptr text_log_locked{};
{
std::lock_guard lock(text_log_mutex);
diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp
index a9083d8c4a8..ad504e65f94 100644
--- a/src/Processors/Executors/PipelineExecutor.cpp
+++ b/src/Processors/Executors/PipelineExecutor.cpp
@@ -196,7 +196,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num)
#ifndef NDEBUG
auto & context = tasks.getThreadContext(thread_num);
- LOG_TRACE(log,
+ LOG_TEST(log,
"Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.",
context.total_time_ns / 1e9,
context.execution_time_ns / 1e9,
diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp
index da7f18260a9..8c6cd8bd91b 100644
--- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp
+++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp
@@ -304,7 +304,9 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
};
}
- if (null_as_default)
+ /// If the Union is ['Null', Nested-Type], since the Nested-Type can not be inside
+ /// Nullable, so we will get Nested-Type, instead of Nullable type.
+ if (null_as_default || !target.isNullable())
{
auto nested_deserialize = this->createDeserializeFn(root_node->leafAt(non_null_union_index), target_type);
return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder)
@@ -1001,7 +1003,7 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
case avro::Type::AVRO_STRING:
return std::make_shared();
case avro::Type::AVRO_BYTES:
- return std::make_shared();
+ return std::make_shared();
case avro::Type::AVRO_ENUM:
{
if (node->names() < 128)
diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp
index d8ca1c124ae..5b3d18c66f6 100644
--- a/src/Server/TCPHandler.cpp
+++ b/src/Server/TCPHandler.cpp
@@ -402,14 +402,10 @@ void TCPHandler::runImpl()
{
auto callback = [this]()
{
- {
- std::lock_guard task_callback_lock(task_callback_mutex);
+ std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
- if (isQueryCancelled())
- return true;
- }
-
- std::lock_guard lock(fatal_error_mutex);
+ if (isQueryCancelled())
+ return true;
sendProgress();
sendSelectProfileEvents();
@@ -424,6 +420,9 @@ void TCPHandler::runImpl()
}
state.io.onFinish();
+
+ std::lock_guard lock(task_callback_mutex);
+
/// Send final progress after calling onFinish(), since it will update the progress.
///
/// NOTE: we cannot send Progress for regular INSERT (with VALUES)
@@ -446,8 +445,11 @@ void TCPHandler::runImpl()
if (state.is_connection_closed)
break;
- sendLogs();
- sendEndOfStream();
+ {
+ std::lock_guard lock(task_callback_mutex);
+ sendLogs();
+ sendEndOfStream();
+ }
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
@@ -760,6 +762,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
}
+ /// Defer locking to cover a part of the scope below and everything after it
+ std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
+
{
PullingAsyncPipelineExecutor executor(pipeline);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
@@ -796,6 +801,11 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
}
+ /// This lock wasn't acquired before and we make .lock() call here
+ /// so everything under this line is covered even together
+ /// with sendProgress() out of the scope
+ progress_lock.lock();
+
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp
index c7008a317c3..b0bad44092a 100644
--- a/src/Storages/HDFS/StorageHDFS.cpp
+++ b/src/Storages/HDFS/StorageHDFS.cpp
@@ -59,6 +59,7 @@ namespace ErrorCodes
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
+ extern const int CANNOT_COMPILE_REGEXP;
}
namespace
{
@@ -75,6 +76,9 @@ namespace
const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
+ if (!matcher.ok())
+ throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
+ "Cannot compile regex from glob ({}): {}", for_match, matcher.error());
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);
diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp
index 66f91aa6cd2..1f5c688cf53 100644
--- a/src/Storages/MergeTree/DataPartsExchange.cpp
+++ b/src/Storages/MergeTree/DataPartsExchange.cpp
@@ -651,7 +651,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
}
#endif
- LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
+ LOG_WARNING(log, "Will retry fetching part without zero-copy: {}", e.message());
/// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock
/// on http pool.
diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp
index d5627774052..97226825629 100644
--- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp
+++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp
@@ -109,10 +109,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
/// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too)
/// 3. We have two intersecting parts, both cover source_part_name. It's logical error.
/// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else?
- constexpr const char * message = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
- LOG_WARNING(log, fmt::runtime(message), source_part_name, source_part_or_covering->name, entry.new_part_name);
+ constexpr auto fmt_string = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
+ String message;
+ LOG_WARNING(LogToStr(message, log), fmt_string, source_part_name, source_part_or_covering->name, entry.new_part_name);
if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version)))
- throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, source_part_or_covering->name, entry.new_part_name);
+ throw Exception(ErrorCodes::LOGICAL_ERROR, message);
return PrepareResult{
.prepared_successfully = false,
diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp
index 89424e6bfd0..b1384dc799f 100644
--- a/src/Storages/MergeTree/MergeTreeData.cpp
+++ b/src/Storages/MergeTree/MergeTreeData.cpp
@@ -3899,7 +3899,9 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
if (error)
{
- LOG_ERROR(log, "The set of parts restored in place of {} looks incomplete. There might or might not be a data loss.{}", part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
+ LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
+ "SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}",
+ part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
}
}
diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
index 71f69cb25cd..6f1d8dd93e7 100644
--- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
+++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
@@ -373,9 +373,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
-
- String message = "Part " + part_name + " looks broken. Removing it and will try to fetch.";
- LOG_ERROR(log, fmt::runtime(message));
+ constexpr auto fmt_string = "Part {} looks broken. Removing it and will try to fetch.";
+ String message = fmt::format(fmt_string, part_name);
+ LOG_ERROR(log, fmt_string, part_name);
/// Delete part locally.
storage.outdateBrokenPartAndCloneToDetached(part, "broken");
@@ -392,9 +392,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// Probably, someone just wrote down the part, and has not yet added to ZK.
/// Therefore, delete only if the part is old (not very reliable).
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
-
- String message = "Unexpected part " + part_name + " in filesystem. Removing.";
- LOG_ERROR(log, fmt::runtime(message));
+ constexpr auto fmt_string = "Unexpected part {} in filesystem. Removing.";
+ String message = fmt::format(fmt_string, part_name);
+ LOG_ERROR(log, fmt_string, part_name);
storage.outdateBrokenPartAndCloneToDetached(part, "unexpected");
return {part_name, false, message};
}
diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
index bde9ce33224..bd75d76109a 100644
--- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
+++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
@@ -1191,12 +1191,10 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
if (entry_for_same_part_it != future_parts.end())
{
const LogEntry & another_entry = *entry_for_same_part_it->second;
- out_reason = fmt::format(
- "Not executing log entry {} of type {} for part {} "
- "because another log entry {} of type {} for the same part ({}) is being processed.",
- entry.znode_name, entry.type, entry.new_part_name,
- another_entry.znode_name, another_entry.type, another_entry.new_part_name);
- LOG_INFO(log, fmt::runtime(out_reason));
+ constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
+ "because another log entry {} of type {} for the same part ({}) is being processed.";
+ LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name,
+ another_entry.znode_name, another_entry.type, another_entry.new_part_name);
return true;
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
@@ -1238,11 +1236,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
{
if (entry.znode_name < future_part_elem.second->znode_name)
{
- out_reason = fmt::format(
- "Not executing log entry {} for part {} "
- "because it is not disjoint with part {} that is currently executing and another entry {} is newer.",
- entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
- LOG_TRACE(log, fmt::runtime(out_reason));
+ constexpr auto fmt_string = "Not executing log entry {} for part {} "
+ "because it is not disjoint with part {} that is currently executing and another entry {} is newer.";
+ LOG_TRACE(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
return true;
}
@@ -1250,11 +1246,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
continue;
}
- out_reason = fmt::format(
- "Not executing log entry {} for part {} "
- "because it is not disjoint with part {} that is currently executing.",
- entry.znode_name, new_part_name, future_part_elem.first);
- LOG_TRACE(log, fmt::runtime(out_reason));
+ constexpr auto fmt_string = "Not executing log entry {} for part {} "
+ "because it is not disjoint with part {} that is currently executing.";
+ LOG_TEST(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first);
return true;
}
@@ -1337,11 +1331,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (future_parts.contains(name))
{
- out_postpone_reason = fmt::format(
- "Not executing log entry {} of type {} for part {} "
- "because part {} is not ready yet (log entry for that part is being processed).",
- entry.znode_name, entry.typeToString(), entry.new_part_name, name);
- LOG_TRACE(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
+ "because part {} is not ready yet (log entry for that part is being processed).";
+ LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
return false;
}
@@ -1357,10 +1349,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (merger_mutator.merges_blocker.isCancelled())
{
- out_postpone_reason = fmt::format(
- "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.",
- entry.znode_name, entry.typeToString(), entry.new_part_name);
- LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
+ LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
@@ -1375,8 +1365,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!disks.empty() && only_s3_storage && storage.checkZeroCopyLockExists(entry.new_part_name, disks[0]))
{
- out_postpone_reason = "Not executing merge/mutation for the part " + entry.new_part_name
- + ", waiting other replica to execute it and will fetch after.";
+ constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting other replica to execute it and will fetch after.";
+ out_postpone_reason = fmt::format(fmt_string, entry.new_part_name);
return false;
}
}
@@ -1387,9 +1377,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
- String reason = "Not executing merge for the part " + entry.new_part_name
- + ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
- out_postpone_reason = reason;
+ constexpr auto fmt_string = "Not executing merge for the part {}, waiting for {} to execute merge.";
+ out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}
@@ -1411,20 +1400,16 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (merger_mutator.ttl_merges_blocker.isCancelled())
{
- out_postpone_reason = fmt::format(
- "Not executing log entry {} for part {} because merges with TTL are cancelled now.",
- entry.znode_name, entry.new_part_name);
- LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
+ LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.new_part_name);
return false;
}
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
{
- out_postpone_reason = fmt::format(
- "Not executing log entry {} for part {} because {} merges with TTL already executing, maximum {}.",
- entry.znode_name, entry.new_part_name, total_merges_with_ttl,
- data_settings->max_number_of_merges_with_ttl_in_pool);
- LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} for part {} because {} merges with TTL already executing, maximum {}.";
+ LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.new_part_name, total_merges_with_ttl,
+ data_settings->max_number_of_merges_with_ttl_in_pool);
return false;
}
}
@@ -1432,12 +1417,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
- out_postpone_reason = fmt::format("Not executing log entry {} of type {} for part {}"
- " because source parts size ({}) is greater than the current maximum ({}).",
- entry.znode_name, entry.typeToString(), entry.new_part_name,
- ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
-
- LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} of type {} for part {}"
+ " because source parts size ({}) is greater than the current maximum ({}).";
+ LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name,
+ ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
return false;
}
@@ -1450,10 +1433,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
- out_postpone_reason = fmt::format(
- "Cannot execute alter metadata {} with version {} because another alter {} must be executed before",
- entry.znode_name, entry.alter_version, head_alter);
- LOG_TRACE(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
+ LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version, head_alter);
return false;
}
}
@@ -1466,17 +1447,13 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
{
- out_postpone_reason = fmt::format(
- "Cannot execute alter data {} with version {} because metadata still not altered",
- entry.znode_name, entry.alter_version);
- LOG_TRACE(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Cannot execute alter data {} with version {} because metadata still not altered";
+ LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version);
}
else
{
- out_postpone_reason = fmt::format(
- "Cannot execute alter data {} with version {} because another alter {} must be executed before",
- entry.znode_name, entry.alter_version, head_alter);
- LOG_TRACE(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
+ LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version, head_alter);
}
return false;
@@ -1498,14 +1475,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (drop_range_info.isDisjoint(info))
continue;
- out_postpone_reason = fmt::format(
- "Not executing log entry {} of type {} for part {} "
- "because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.",
- entry.znode_name,
- entry.typeToString(),
- entry.new_part_name,
- info.getPartNameForLogs());
- LOG_TRACE(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
+ "because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
+ LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
+ entry.typeToString(),
+ entry.new_part_name,
+ info.getPartNameForLogs());
return false;
}
}
@@ -1531,11 +1506,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
- out_postpone_reason = fmt::format(
- "Not executing log entry {} of type {} for part {} "
- "because it probably depends on {} (REPLACE_RANGE).",
- entry.znode_name, entry.typeToString(), entry.new_part_name, replace_entry->znode_name);
- LOG_TRACE(log, fmt::runtime(out_postpone_reason));
+ constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
+ "because it probably depends on {} (REPLACE_RANGE).";
+ LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
+ entry.new_part_name, replace_entry->znode_name);
return false;
}
}
diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp
index c293530db46..419929dbef3 100644
--- a/src/Storages/StorageFile.cpp
+++ b/src/Storages/StorageFile.cpp
@@ -80,6 +80,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
+ extern const int CANNOT_COMPILE_REGEXP;
}
namespace
@@ -105,6 +106,9 @@ void listFilesWithRegexpMatchingImpl(
auto regexp = makeRegexpPatternFromGlobs(current_glob);
re2::RE2 matcher(regexp);
+ if (!matcher.ok())
+ throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
+ "Cannot compile regex from glob ({}): {}", for_match, matcher.error());
bool skip_regex = current_glob == "/*" ? true : false;
if (!recursive)
diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp
index c1e5a3d4f8e..c8dc53f5036 100644
--- a/src/Storages/StorageMergeTree.cpp
+++ b/src/Storages/StorageMergeTree.cpp
@@ -1303,10 +1303,10 @@ bool StorageMergeTree::optimize(
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
- constexpr const char * message = "Cannot OPTIMIZE table: {}";
+ constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
- LOG_INFO(log, fmt::runtime(message), disable_reason);
+ LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
@@ -1330,10 +1330,10 @@ bool StorageMergeTree::optimize(
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
- constexpr const char * message = "Cannot OPTIMIZE table: {}";
+ constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
- LOG_INFO(log, fmt::runtime(message), disable_reason);
+ LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp
index 7c1894b3652..036b46c5e28 100644
--- a/src/Storages/StorageReplicatedMergeTree.cpp
+++ b/src/Storages/StorageReplicatedMergeTree.cpp
@@ -1272,12 +1272,12 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
const auto storage_settings_ptr = getSettings();
bool insane = uncovered_unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
- constexpr const char * sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
+ constexpr auto sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
"{} rows of {} total rows in filesystem are suspicious. "
"There are {} uncovered unexpected parts with {} rows ({} of them is not just-written with {} rows), "
"{} missing parts (with {} blocks), {} covered unexpected parts (with {} rows).";
- constexpr const char * sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
+ constexpr auto sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
if (insane && !skip_sanity_checks)
{
@@ -1293,7 +1293,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
- LOG_WARNING(log, fmt::runtime(sanity_report_fmt), getStorageID().getNameForLogs(),
+ LOG_WARNING(log, sanity_report_fmt, getStorageID().getNameForLogs(),
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
uncovered_unexpected_parts.size(), uncovered_unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks, covered_unexpected_parts.size(), unexpected_parts_rows - uncovered_unexpected_parts_rows);
diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp
index 9cb992bd24f..5c393f3864b 100644
--- a/src/Storages/StorageS3.cpp
+++ b/src/Storages/StorageS3.cpp
@@ -109,6 +109,7 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED;
+ extern const int CANNOT_COMPILE_REGEXP;
}
class IOutputFormat;
@@ -174,6 +175,10 @@ public:
outcome_future = listObjectsAsync();
matcher = std::make_unique(makeRegexpPatternFromGlobs(globbed_uri.key));
+ if (!matcher->ok())
+ throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
+ "Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error());
+
recursive = globbed_uri.key == "/**" ? true : false;
fillInternalBufferAssumeLocked();
}
@@ -444,7 +449,7 @@ public:
/// (which means we eventually need this info anyway, so it should be ok to do it now)
if (object_infos_)
{
- info = S3::getObjectInfo(client_, bucket, key, version_id_, true, false);
+ info = S3::getObjectInfo(client_, bucket, key, version_id_);
total_size += info->size;
String path = fs::path(bucket) / key;
@@ -569,9 +574,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
if (current_key.empty())
return {};
- size_t object_size = info
- ? info->size
- : S3::getObjectSize(*client, bucket, current_key, version_id, true, false);
+ size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id);
int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max);
auto read_buf = wrapReadBufferWithCompressionMethod(
@@ -1523,7 +1526,7 @@ std::optional StorageS3::tryGetColumnsFromCache(
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
- info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false, false);
+ info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, {}, /* throw_on_error= */ false);
if (object_infos)
(*object_infos)[path] = info;
}
diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h
index 0f6fc013d51..02fcb7d624c 100644
--- a/src/Storages/StorageS3.h
+++ b/src/Storages/StorageS3.h
@@ -176,6 +176,24 @@ private:
}
ReaderHolder() = default;
+ ReaderHolder(const ReaderHolder & other) = delete;
+ ReaderHolder & operator=(const ReaderHolder & other) = delete;
+
+ ReaderHolder(ReaderHolder && other) noexcept
+ {
+ *this = std::move(other);
+ }
+
+ ReaderHolder & operator=(ReaderHolder && other) noexcept
+ {
+ /// The order of destruction is important.
+ /// reader uses pipeline, pipeline uses read_buf.
+ reader = std::move(other.reader);
+ pipeline = std::move(other.pipeline);
+ read_buf = std::move(other.read_buf);
+ path = std::move(other.path);
+ return *this;
+ }
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp
index dbe99d0ce9a..7559d9c720e 100644
--- a/src/Storages/StorageURL.cpp
+++ b/src/Storages/StorageURL.cpp
@@ -345,9 +345,11 @@ namespace
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
- LOG_TRACE(
- &Poco::Logger::get("StorageURLSource"),
- fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
+
+ if (supports_ranges)
+ LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is supported");
+ else
+ LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
diff --git a/tests/clickhouse-test b/tests/clickhouse-test
index 2c9b807574f..a01df7c0ded 100755
--- a/tests/clickhouse-test
+++ b/tests/clickhouse-test
@@ -1817,6 +1817,56 @@ def reportCoverage(args):
"""
)
+def reportLogStats(args):
+ query = """
+ WITH
+ 120 AS mins,
+ (
+ SELECT (count(), sum(length(message)))
+ FROM system.text_log
+ WHERE (now() - toIntervalMinute(mins)) < event_time
+ ) AS total
+ SELECT
+ count() AS count,
+ round(count / (total.1), 3) AS `count_%`,
+ formatReadableSize(sum(length(message))) AS size,
+ round(sum(length(message)) / (total.2), 3) AS `size_%`,
+ countDistinct(logger_name) AS uniq_loggers,
+ countDistinct(thread_id) AS uniq_threads,
+ groupArrayDistinct(toString(level)) AS levels,
+ round(sum(query_id = '') / count, 3) AS `background_%`,
+ message_format_string
+ FROM system.text_log
+ WHERE (now() - toIntervalMinute(mins)) < event_time
+ GROUP BY message_format_string
+ ORDER BY count DESC
+ LIMIT 100
+ FORMAT TSVWithNamesAndTypes
+ """
+ value = clickhouse_execute(args, query).decode()
+ print("\nTop patterns of log messages:\n")
+ print(value)
+ print("\n")
+
+ query = """
+ WITH
+ 120 AS mins
+ SELECT
+ count() AS count,
+ substr(replaceRegexpAll(message, '[^A-Za-z]+', ''), 1, 32) AS pattern,
+ substr(any(message), 1, 256) as runtime_message
+ FROM system.text_log
+ WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
+ GROUP BY pattern
+ ORDER BY count DESC
+ LIMIT 50
+ FORMAT TSVWithNamesAndTypes
+ """
+ value = clickhouse_execute(args, query).decode()
+ print("\nTop messages without format string (fmt::runtime):\n")
+ print(value)
+ print("\n")
+
def main(args):
global server_died
@@ -1951,6 +2001,9 @@ def main(args):
else:
print("All tests have finished.")
+ if args.report_logs_stats:
+ reportLogStats(args)
+
if args.report_coverage and not reportCoverage(args):
exit_code.value = 1
@@ -2242,6 +2295,12 @@ if __name__ == "__main__":
default=False,
help="Check what high-level server components were covered by tests",
)
+ parser.add_argument(
+ "--report-logs-stats",
+ action="store_true",
+ default=False,
+ help="Report statistics about log messages",
+ )
args = parser.parse_args()
if args.queries and not os.path.isdir(args.queries):
diff --git a/tests/queries/0_stateless/02174_cte_scalar_cache.reference b/tests/queries/0_stateless/02174_cte_scalar_cache.reference
index 88456b1e7ea..817116eda88 100644
--- a/tests/queries/0_stateless/02174_cte_scalar_cache.reference
+++ b/tests/queries/0_stateless/02174_cte_scalar_cache.reference
@@ -1,2 +1,3 @@
02177_CTE_GLOBAL_ON 5 500 11 0 5
02177_CTE_GLOBAL_OFF 1 100 5 0 1
+02177_CTE_NEW_ANALYZER 2 200 3 0 2
diff --git a/tests/queries/0_stateless/02174_cte_scalar_cache.sql b/tests/queries/0_stateless/02174_cte_scalar_cache.sql
index 4b015cdd007..9ed80d08cff 100644
--- a/tests/queries/0_stateless/02174_cte_scalar_cache.sql
+++ b/tests/queries/0_stateless/02174_cte_scalar_cache.sql
@@ -18,6 +18,16 @@ SELECT '02177_CTE_GLOBAL_OFF', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS enable_global_with_statement = 0;
+WITH
+ ( SELECT sleep(0.0001) FROM system.one ),
+ ( SELECT sleep(0.0001) FROM system.one ),
+ ( SELECT sleep(0.0001) FROM system.one ),
+ ( SELECT sleep(0.0001) FROM system.one ),
+ ( SELECT sleep(0.0001) FROM system.one ) as a5
+SELECT '02177_CTE_NEW_ANALYZER', a5 FROM system.numbers LIMIT 100
+ FORMAT Null
+SETTINGS allow_experimental_analyzer = 1;
+
SYSTEM FLUSH LOGS;
SELECT
'02177_CTE_GLOBAL_ON',
@@ -46,3 +56,17 @@ WHERE
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_GLOBAL_OFF%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
+
+SELECT
+ '02177_CTE_NEW_ANALYZER',
+ ProfileEvents['SleepFunctionCalls'] as sleep_calls,
+ ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
+ ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
+ ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
+ ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
+FROM system.query_log
+WHERE
+ current_database = currentDatabase()
+ AND type = 'QueryFinish'
+ AND query LIKE '%02177_CTE_NEW_ANALYZER%'
+ AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
diff --git a/tests/queries/0_stateless/02481_analyzer_optimize_aggregation_arithmetics.reference b/tests/queries/0_stateless/02481_analyzer_optimize_aggregation_arithmetics.reference
new file mode 100644
index 00000000000..ab6d42000fe
--- /dev/null
+++ b/tests/queries/0_stateless/02481_analyzer_optimize_aggregation_arithmetics.reference
@@ -0,0 +1,64 @@
+-- { echoOn }
+
+EXPLAIN QUERY TREE SELECT avg(log(2) * number) FROM numbers(10);
+QUERY id: 0
+ PROJECTION COLUMNS
+ avg(multiply(log(2), number)) Float64
+ PROJECTION
+ LIST id: 1, nodes: 1
+ FUNCTION id: 2, function_name: multiply, function_type: ordinary, result_type: Float64
+ ARGUMENTS
+ LIST id: 3, nodes: 2
+ CONSTANT id: 4, constant_value: Float64_0.6931471805599453, constant_value_type: Float64
+ EXPRESSION
+ FUNCTION id: 5, function_name: log, function_type: ordinary, result_type: Float64
+ ARGUMENTS
+ LIST id: 6, nodes: 1
+ CONSTANT id: 7, constant_value: UInt64_2, constant_value_type: UInt8
+ FUNCTION id: 8, function_name: avg, function_type: aggregate, result_type: Float64
+ ARGUMENTS
+ LIST id: 9, nodes: 1
+ COLUMN id: 10, column_name: number, result_type: UInt64, source_id: 11
+ JOIN TREE
+ TABLE_FUNCTION id: 11, table_function_name: numbers
+ ARGUMENTS
+ LIST id: 12, nodes: 1
+ CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
+EXPLAIN QUERY TREE SELECT avg(number * log(2)) FROM numbers(10);
+QUERY id: 0
+ PROJECTION COLUMNS
+ avg(multiply(number, log(2))) Float64
+ PROJECTION
+ LIST id: 1, nodes: 1
+ FUNCTION id: 2, function_name: multiply, function_type: ordinary, result_type: Float64
+ ARGUMENTS
+ LIST id: 3, nodes: 2
+ FUNCTION id: 4, function_name: avg, function_type: aggregate, result_type: Float64
+ ARGUMENTS
+ LIST id: 5, nodes: 1
+ COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
+ CONSTANT id: 8, constant_value: Float64_0.6931471805599453, constant_value_type: Float64
+ EXPRESSION
+ FUNCTION id: 9, function_name: log, function_type: ordinary, result_type: Float64
+ ARGUMENTS
+ LIST id: 10, nodes: 1
+ CONSTANT id: 11, constant_value: UInt64_2, constant_value_type: UInt8
+ JOIN TREE
+ TABLE_FUNCTION id: 7, table_function_name: numbers
+ ARGUMENTS
+ LIST id: 12, nodes: 1
+ CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
+SELECT round(avg(log(2) * number), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
+3465734.516505
+3465735.209653
+3465735.9028
+3465736.595947
+3465735.209653
+3465735.9028
+SELECT round(avg(number * log(2)), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
+3465734.516505
+3465735.209653
+3465735.9028
+3465736.595947
+3465735.209653
+3465735.9028
diff --git a/tests/queries/0_stateless/02481_analyzer_optimize_aggregation_arithmetics.sql b/tests/queries/0_stateless/02481_analyzer_optimize_aggregation_arithmetics.sql
new file mode 100644
index 00000000000..ca91d137bf4
--- /dev/null
+++ b/tests/queries/0_stateless/02481_analyzer_optimize_aggregation_arithmetics.sql
@@ -0,0 +1,14 @@
+SET allow_experimental_analyzer = 1;
+SET optimize_arithmetic_operations_in_aggregate_functions = 1;
+
+-- { echoOn }
+
+EXPLAIN QUERY TREE SELECT avg(log(2) * number) FROM numbers(10);
+
+EXPLAIN QUERY TREE SELECT avg(number * log(2)) FROM numbers(10);
+
+SELECT round(avg(log(2) * number), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
+
+SELECT round(avg(number * log(2)), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
+
+-- { echoOff }
diff --git a/tests/queries/0_stateless/02481_optimize_aggregation_arithmetics.reference b/tests/queries/0_stateless/02481_optimize_aggregation_arithmetics.reference
deleted file mode 100644
index 1cc6fc5d4b1..00000000000
--- a/tests/queries/0_stateless/02481_optimize_aggregation_arithmetics.reference
+++ /dev/null
@@ -1,81 +0,0 @@
--- { echoOn }
-EXPLAIN actions=1
- (
- SELECT round(avg(log(2) * number), 6) AS k
- FROM numbers(10000000)
- GROUP BY number % 3, number % 2
- )
-SETTINGS allow_experimental_analyzer=1;
-Expression ((Project names + Projection))
-Actions: INPUT : 0 -> avg(number_0) Float64 : 0
- COLUMN Const(Float64) -> 0.6931471805599453_Float64 Float64 : 1
- COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
- FUNCTION multiply(0.6931471805599453_Float64 :: 1, avg(number_0) :: 0) -> multiply(0.6931471805599453_Float64, avg(number_0)) Float64 : 3
- FUNCTION round(multiply(0.6931471805599453_Float64, avg(number_0)) :: 3, 6_UInt8 :: 2) -> round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) Float64 : 0
- ALIAS round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) :: 0 -> k Float64 : 2
-Positions: 2
- Aggregating
- Keys: modulo(number_0, 3_UInt8), modulo(number_0, 2_UInt8)
- Aggregates:
- avg(number_0)
- Function: avg(UInt64) → Float64
- Arguments: number_0
- Expression ((Before GROUP BY + Change column names to column identifiers))
- Actions: INPUT : 0 -> number UInt64 : 0
- COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1
- COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 2
- ALIAS number :: 0 -> number_0 UInt64 : 3
- FUNCTION modulo(number_0 : 3, 3_UInt8 :: 1) -> modulo(number_0, 3_UInt8) UInt8 : 0
- FUNCTION modulo(number_0 : 3, 2_UInt8 :: 2) -> modulo(number_0, 2_UInt8) UInt8 : 1
- Positions: 0 1 3
- ReadFromStorage (SystemNumbers)
-EXPLAIN actions=1
- (
- SELECT round(log(2) * avg(number), 6) AS k
- FROM numbers(10000000)
- GROUP BY number % 3, number % 2
- )
-SETTINGS allow_experimental_analyzer=1;
-Expression ((Project names + Projection))
-Actions: INPUT : 0 -> avg(number_0) Float64 : 0
- COLUMN Const(Float64) -> 0.6931471805599453_Float64 Float64 : 1
- COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
- FUNCTION multiply(0.6931471805599453_Float64 :: 1, avg(number_0) :: 0) -> multiply(0.6931471805599453_Float64, avg(number_0)) Float64 : 3
- FUNCTION round(multiply(0.6931471805599453_Float64, avg(number_0)) :: 3, 6_UInt8 :: 2) -> round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) Float64 : 0
- ALIAS round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) :: 0 -> k Float64 : 2
-Positions: 2
- Aggregating
- Keys: modulo(number_0, 3_UInt8), modulo(number_0, 2_UInt8)
- Aggregates:
- avg(number_0)
- Function: avg(UInt64) → Float64
- Arguments: number_0
- Expression ((Before GROUP BY + Change column names to column identifiers))
- Actions: INPUT : 0 -> number UInt64 : 0
- COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1
- COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 2
- ALIAS number :: 0 -> number_0 UInt64 : 3
- FUNCTION modulo(number_0 : 3, 3_UInt8 :: 1) -> modulo(number_0, 3_UInt8) UInt8 : 0
- FUNCTION modulo(number_0 : 3, 2_UInt8 :: 2) -> modulo(number_0, 2_UInt8) UInt8 : 1
- Positions: 0 1 3
- ReadFromStorage (SystemNumbers)
-SELECT round(avg(log(2) * number), 6) AS k
-FROM numbers(10000000)
-GROUP BY number % 3, number % 2
-SETTINGS allow_experimental_analyzer=1;
-3465734.516505
-3465735.209653
-3465735.9028
-3465736.595947
-3465735.209653
-3465735.9028
-SELECT round(log(2) * avg(number), 6) AS k
-FROM numbers(10000000)
-GROUP BY number % 3, number % 2
-SETTINGS allow_experimental_analyzer=0;
-3465734.516505
-3465735.209653
-3465735.9028
-3465736.595947
-3465735.209653
-3465735.9028
diff --git a/tests/queries/0_stateless/02481_optimize_aggregation_arithmetics.sql b/tests/queries/0_stateless/02481_optimize_aggregation_arithmetics.sql
deleted file mode 100644
index 5fec5a79813..00000000000
--- a/tests/queries/0_stateless/02481_optimize_aggregation_arithmetics.sql
+++ /dev/null
@@ -1,26 +0,0 @@
--- { echoOn }
-EXPLAIN actions=1
- (
- SELECT round(avg(log(2) * number), 6) AS k
- FROM numbers(10000000)
- GROUP BY number % 3, number % 2
- )
-SETTINGS allow_experimental_analyzer=1;
-
-EXPLAIN actions=1
- (
- SELECT round(log(2) * avg(number), 6) AS k
- FROM numbers(10000000)
- GROUP BY number % 3, number % 2
- )
-SETTINGS allow_experimental_analyzer=1;
-
-SELECT round(avg(log(2) * number), 6) AS k
-FROM numbers(10000000)
-GROUP BY number % 3, number % 2
-SETTINGS allow_experimental_analyzer=1;
-
-SELECT round(log(2) * avg(number), 6) AS k
-FROM numbers(10000000)
-GROUP BY number % 3, number % 2
-SETTINGS allow_experimental_analyzer=0;
diff --git a/tests/queries/0_stateless/02521_avro_union_null_nested.reference b/tests/queries/0_stateless/02521_avro_union_null_nested.reference
new file mode 100644
index 00000000000..e4818b4bcac
--- /dev/null
+++ b/tests/queries/0_stateless/02521_avro_union_null_nested.reference
@@ -0,0 +1,15 @@
+manifest_path String
+manifest_length Int64
+partition_spec_id Int32
+added_snapshot_id Nullable(Int64)
+added_data_files_count Nullable(Int32)
+existing_data_files_count Nullable(Int32)
+deleted_data_files_count Nullable(Int32)
+partitions Array(Tuple(contains_null Bool, contains_nan Nullable(Bool), lower_bound Nullable(String), upper_bound Nullable(String)))
+added_rows_count Nullable(Int64)
+existing_rows_count Nullable(Int64)
+deleted_rows_count Nullable(Int64)
+file:/warehouse/nyc.db/taxis/metadata/f9e891e9-fbd3-4411-a5c6-0cc14a2f1392-m0.avro 6488 0 1793608066486471262 8 0 0 [(false,false,'\0\0\0\0\0\0\0','\b\0\0\0\0\0\0\0')] 12 0 0
+file:/warehouse/nyc.db/taxis/metadata/a51dd31d-ea86-42dd-82d1-1981332a0f6d-m0.avro 6363 0 5735460159761889536 4 0 0 [(false,false,'\0\0\0\0\0\0\0','\b\0\0\0\0\0\0\0')] 4 0 0
+file:/warehouse/nyc.db/taxis/metadata/7ae325bd-fe20-4a55-917c-36cb8f6a488c-m0.avro 6370 0 7171740521400098346 4 0 0 [(false,false,'\0\0\0\0\0\0\0','\0\0\0\0\0\0\0')] 4 0 0
+file:/warehouse/nyc.db/taxis/metadata/5e3c62a9-1537-455f-98e5-0a067af5752a-m0.avro 6324 0 6850377589038341628 2 0 0 [(false,false,'\0\0\0\0\0\0\0','\0\0\0\0\0\0\0')] 4 0 0
diff --git a/tests/queries/0_stateless/02521_avro_union_null_nested.sh b/tests/queries/0_stateless/02521_avro_union_null_nested.sh
new file mode 100755
index 00000000000..d4d14c8ca8e
--- /dev/null
+++ b/tests/queries/0_stateless/02521_avro_union_null_nested.sh
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+# Tags: no-parallel, no-fasttest
+
+set -e
+
+CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CUR_DIR"/../shell_config.sh
+
+DATA_DIR=$CUR_DIR/data_avro
+
+$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/union_null_nested.avro')"
+$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/union_null_nested.avro')"
diff --git a/tests/queries/0_stateless/02522_avro_complicate_schema.reference b/tests/queries/0_stateless/02522_avro_complicate_schema.reference
new file mode 100644
index 00000000000..55c0369020f
--- /dev/null
+++ b/tests/queries/0_stateless/02522_avro_complicate_schema.reference
@@ -0,0 +1,5 @@
+status Int32
+snapshot_id Nullable(Int64)
+data_file Tuple(file_path String, file_format String, partition Tuple(vendor_id Nullable(Int64)), record_count Int64, file_size_in_bytes Int64, block_size_in_bytes Int64, column_sizes Array(Tuple(key Int32, value Int64)), value_counts Array(Tuple(key Int32, value Int64)), null_value_counts Array(Tuple(key Int32, value Int64)), nan_value_counts Array(Tuple(key Int32, value Int64)), lower_bounds Array(Tuple(key Int32, value String)), upper_bounds Array(Tuple(key Int32, value String)), key_metadata Nullable(String), split_offsets Array(Int64), sort_order_id Nullable(Int32))
+1 6850377589038341628 ('file:/warehouse/nyc.db/taxis/data/vendor_id=1/00000-0-c070e655-dc44-43d2-a01a-484f107210cb-00001.parquet','PARQUET',(1),2,1565,67108864,[(1,87),(2,51),(3,51),(4,57),(5,51)],[(1,2),(2,2),(3,2),(4,2),(5,2)],[(1,0),(2,0),(3,0),(4,0),(5,0)],[(3,0),(4,0)],[(1,'\0\0\0\0\0\0\0'),(2,'C\0\0\0\0\0'),(3,'ff?'),(4,'p=\nף.@'),(5,'N')],[(1,'\0\0\0\0\0\0\0'),(2,'C\0\0\0\0\0'),(3,'ffA'),(4,'q=\nףE@'),(5,'Y')],NULL,[4],0)
+1 6850377589038341628 ('file:/warehouse/nyc.db/taxis/data/vendor_id=2/00000-0-c070e655-dc44-43d2-a01a-484f107210cb-00002.parquet','PARQUET',(2),2,1620,67108864,[(1,87),(2,51),(3,51),(4,57),(5,89)],[(1,2),(2,2),(3,2),(4,2),(5,2)],[(1,0),(2,0),(3,0),(4,0),(5,0)],[(3,0),(4,0)],[(1,'\0\0\0\0\0\0\0'),(2,'C\0\0\0\0\0'),(3,'fff?'),(4,'Q"@'),(5,'N')],[(1,'\0\0\0\0\0\0\0'),(2,'C\0\0\0\0\0'),(3,'\0\0 @'),(4,'fffff&6@'),(5,'N')],NULL,[4],0)
diff --git a/tests/queries/0_stateless/02522_avro_complicate_schema.sh b/tests/queries/0_stateless/02522_avro_complicate_schema.sh
new file mode 100755
index 00000000000..fa23c7e6f34
--- /dev/null
+++ b/tests/queries/0_stateless/02522_avro_complicate_schema.sh
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+# Tags: no-parallel, no-fasttest
+
+set -e
+
+CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CUR_DIR"/../shell_config.sh
+
+DATA_DIR=$CUR_DIR/data_avro
+
+$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/complicated_schema.avro')"
+$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/complicated_schema.avro')"
diff --git a/tests/queries/0_stateless/02534_s3_heap_use_after_free.reference b/tests/queries/0_stateless/02534_s3_heap_use_after_free.reference
new file mode 100644
index 00000000000..acd7c60768b
--- /dev/null
+++ b/tests/queries/0_stateless/02534_s3_heap_use_after_free.reference
@@ -0,0 +1,4 @@
+1 2 3
+4 5 6
+7 8 9
+0 0 0
diff --git a/tests/queries/0_stateless/02534_s3_heap_use_after_free.sql b/tests/queries/0_stateless/02534_s3_heap_use_after_free.sql
new file mode 100644
index 00000000000..b9f815e5a07
--- /dev/null
+++ b/tests/queries/0_stateless/02534_s3_heap_use_after_free.sql
@@ -0,0 +1,5 @@
+-- Tags: no-fasttest
+-- Tag no-fasttest: Depends on AWS
+
+select * from s3('http://localhost:11111/test/a.tsv', CustomSeparated);
+
diff --git a/tests/queries/0_stateless/data_avro/complicated_schema.avro b/tests/queries/0_stateless/data_avro/complicated_schema.avro
new file mode 100644
index 00000000000..a3385f4b23b
Binary files /dev/null and b/tests/queries/0_stateless/data_avro/complicated_schema.avro differ
diff --git a/tests/queries/0_stateless/data_avro/union_null_nested.avro b/tests/queries/0_stateless/data_avro/union_null_nested.avro
new file mode 100644
index 00000000000..c5246c3dd64
Binary files /dev/null and b/tests/queries/0_stateless/data_avro/union_null_nested.avro differ
diff --git a/utils/self-extracting-executable/decompressor.cpp b/utils/self-extracting-executable/decompressor.cpp
index be25d315d68..cd9cc51c45b 100644
--- a/utils/self-extracting-executable/decompressor.cpp
+++ b/utils/self-extracting-executable/decompressor.cpp
@@ -363,7 +363,7 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
-uint32_t getInode(const char * self)
+uint64_t getInode(const char * self)
{
std::ifstream maps("/proc/self/maps");
if (maps.fail())
@@ -380,7 +380,7 @@ uint32_t getInode(const char * self)
{
std::stringstream ss(line); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::string addr, mode, offset, id, path;
- uint32_t inode = 0;
+ uint64_t inode = 0;
if (ss >> addr >> mode >> offset >> id >> inode >> path && path == self)
return inode;
}
@@ -415,7 +415,7 @@ int main(int/* argc*/, char* argv[])
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// get inode of this executable
- uint32_t inode = getInode(self);
+ uint64_t inode = getInode(self);
if (inode == 0)
{
std::cerr << "Unable to obtain inode." << std::endl;
@@ -447,6 +447,11 @@ int main(int/* argc*/, char* argv[])
return 1;
}
+ /// inconsistency in WSL1 Ubuntu - inode reported in /proc/self/maps is a 64bit to
+ /// 32bit conversion of input_info.st_ino
+ if (input_info.st_ino & 0xFFFFFFFF00000000 && !(inode & 0xFFFFFFFF00000000))
+ input_info.st_ino &= 0x00000000FFFFFFFF;
+
/// if decompression was performed by another process since this copy was started
/// then file referred by path "self" is already pointing to different inode
if (input_info.st_ino != inode)