Merge branch 'master' into replicas_status_api_optimize

This commit is contained in:
mateng915 2023-01-18 11:06:36 +08:00 committed by GitHub
commit e22673b1c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 845 additions and 395 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 799234226187c0ae0b8c90f23465b25ed7956e56
Subproject commit 0ab9bba7ccad3c8dacce04a35cb3b78218547ab4

View File

@ -128,6 +128,7 @@ function run_tests()
if [[ "${HIGH_LEVEL_COVERAGE}" = "YES" ]]; then
ADDITIONAL_OPTIONS+=('--report-coverage')
ADDITIONAL_OPTIONS+=('--report-logs-stats')
fi
set +e

View File

@ -289,6 +289,7 @@ if __name__ == "__main__":
"--database=system",
"--hung-check",
"--stress",
"--report-logs-stats",
"00001_select_1",
]
)

View File

@ -182,6 +182,31 @@ No matter what pool is used for a job, at start `ThreadStatus` instance is creat
If thread is related to query execution, then the most important thing attached to `ThreadStatus` is query context `ContextPtr`. Every query has its master thread in the server pool. Master thread does the attachment by holding an `ThreadStatus::QueryScope query_scope(query_context)` object. Master thread also creates a thread group represented with `ThreadGroupStatus` object. Every additional thread that is allocated during this query execution is attached to its thread group by `CurrentThread::attachTo(thread_group)` call. Thread groups are used to aggregate profile event counters and track memory consumption by all threads dedicated to a single task (see `MemoryTracker` and `ProfileEvents::Counters` classes for more information).
## Concurrency control {#concurrency-control}
Query that can be parallelized uses `max_threads` setting to limit itself. Default value for this setting is selected in a way that allows single query to utilize all CPU cores in the best way. But what if there are multiple concurrent queries and each of them uses default `max_threads` setting value? Then queries will share CPU resources. OS will ensure fairness by constantly switching threads, which introduce some performance penalty. `ConcurrencyControl` helps to deal with this penalty and avoid allocating a lot of threads. Configuration setting `concurrent_threads_soft_limit_num` is used to limit how many concurrent thread can be allocated before applying some kind of CPU pressure.
:::note
`concurrent_threads_soft_limit_num` and `concurrent_threads_soft_limit_ratio_to_cores` are disabled (equal 0) by default. So this feature must be enabled before use.
:::
Notion of CPU `slot` is introduced. Slot is a unit of concurrency: to run a thread query has to acquire a slot in advance and release it when thread stops. The number of slots is globally limited in a server. Multiple concurrent queries are competing for CPU slots if the total demand exceeds the total number of slots. `ConcurrencyControl` is responsible to resolve this competition by doing CPU slot scheduling in a fair manner.
Each slot can be seen as an independent state machine with the following states:
* `free`: slot is available to be allocated by any query.
* `granted`: slot is `allocated` by specific query, but not yet acquired by any thread.
* `acquired`: slot is `allocated` by specific query and acquired by a thread.
Note that `allocated` slot can be in two different states: `granted` and `acquired`. The former is a transitional state, that actually should be short (from the instant when a slot is allocated to a query till the moment when the up-scaling procedure is run by any thread of that query).
![state diagram](@site/docs/en/development/images/concurrency.png)
API of `ConcurrencyControl` consists of the following functions:
1. Create a resource allocation for a query: `auto slots = ConcurrencyControl::instance().allocate(1, max_threads);`. It will allocate at least 1 and at most `max_threads` slots. Note that the first slot is granted immediately, but the remaining slots may be granted later. Thus limit is soft, because every query will obtain at least one thread.
2. For every thread a slot has to be acquired from an allocation: `while (auto slot = slots->tryAcquire()) spawnThread([slot = std::move(slot)] { ... });`.
3. Update the total amount of slots: `ConcurrencyControl::setMaxConcurrency(concurrent_threads_soft_limit_num)`. Can be done in runtime, w/o server restart.
This API allows queries to start with at least one thread (in presence of CPU pressure) and later scale up to `max_threads`.
## Distributed Query Execution {#distributed-query-execution}
Servers in a cluster setup are mostly independent. You can create a `Distributed` table on one or all servers in a cluster. The `Distributed` table does not store data itself it only provides a “view” to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The `Distributed` table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers and does not send much intermediate data over the network.

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

View File

@ -757,6 +757,10 @@ Possible values:
Default value: `0`.
**See Also**
- [Concurrency Control](/docs/en/development/architecture.md#concurrency-control)
## concurrent_threads_soft_limit_ratio_to_cores {#concurrent_threads_soft_limit_ratio_to_cores}
The maximum number of query processing threads as multiple of number of logical cores.
More details: [concurrent_threads_soft_limit_num](#concurrent-threads-soft-limit-num).
@ -768,6 +772,12 @@ Possible values:
Default value: `0`.
**Example**
``` xml
<concurrent_threads_soft_limit_ratio_to_cores>3</concurrent_threads_soft_limit_ratio_to_cores>
```
## max_concurrent_queries {#max-concurrent-queries}
The maximum number of simultaneously processed queries.

View File

@ -2,6 +2,7 @@
#include <Common/SipHash.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/IDataType.h>
#include <Analyzer/ConstantNode.h>
#include <IO/WriteBufferFromString.h>
@ -31,6 +32,15 @@ FunctionNode::FunctionNode(String function_name_)
children[arguments_child_index] = std::make_shared<ListNode>();
}
const DataTypes & FunctionNode::getArgumentTypes() const
{
if (!function)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} is not resolved",
function_name);
return function->getArgumentTypes();
}
ColumnsWithTypeAndName FunctionNode::getArgumentColumns() const
{
const auto & arguments = getArguments().getNodes();

View File

@ -85,6 +85,7 @@ public:
/// Get arguments node
QueryTreeNodePtr & getArgumentsNode() { return children[arguments_child_index]; }
const DataTypes & getArgumentTypes() const;
ColumnsWithTypeAndName getArgumentColumns() const;
/// Returns true if function node has window, false otherwise

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
@ -47,19 +48,23 @@ Field zeroField(const Field & value)
class AggregateFunctionsArithmericOperationsVisitor : public InDepthQueryTreeVisitor<AggregateFunctionsArithmericOperationsVisitor>
{
public:
explicit AggregateFunctionsArithmericOperationsVisitor(ContextPtr context_)
: context(std::move(context_))
{}
/// Traverse tree bottom to top
static bool shouldTraverseTopToBottom()
{
return false;
}
static void visitImpl(QueryTreeNodePtr & node)
void visitImpl(QueryTreeNodePtr & node)
{
auto * aggregate_function_node = node->as<FunctionNode>();
if (!aggregate_function_node || !aggregate_function_node->isAggregateFunction())
return;
static std::unordered_map<std::string_view, std::unordered_set<std::string_view>> supported_functions
static std::unordered_map<std::string_view, std::unordered_set<std::string_view>> supported_aggregate_functions
= {{"sum", {"multiply", "divide"}},
{"min", {"multiply", "divide", "plus", "minus"}},
{"max", {"multiply", "divide", "plus", "minus"}},
@ -69,88 +74,111 @@ public:
if (aggregate_function_arguments_nodes.size() != 1)
return;
auto * inner_function_node = aggregate_function_arguments_nodes[0]->as<FunctionNode>();
if (!inner_function_node)
const auto & arithmetic_function_node = aggregate_function_arguments_nodes[0];
auto * arithmetic_function_node_typed = arithmetic_function_node->as<FunctionNode>();
if (!arithmetic_function_node_typed)
return;
const auto & inner_function_arguments_nodes = inner_function_node->getArguments().getNodes();
if (inner_function_arguments_nodes.size() != 2)
const auto & arithmetic_function_arguments_nodes = arithmetic_function_node_typed->getArguments().getNodes();
if (arithmetic_function_arguments_nodes.size() != 2)
return;
/// Aggregate functions[sum|min|max|avg] is case-insensitive, so we use lower cases name
auto lower_function_name = Poco::toLower(aggregate_function_node->getFunctionName());
auto lower_aggregate_function_name = Poco::toLower(aggregate_function_node->getFunctionName());
auto supported_function_it = supported_functions.find(lower_function_name);
if (supported_function_it == supported_functions.end())
auto supported_aggregate_function_it = supported_aggregate_functions.find(lower_aggregate_function_name);
if (supported_aggregate_function_it == supported_aggregate_functions.end())
return;
const auto & inner_function_name = inner_function_node->getFunctionName();
if (!supported_function_it->second.contains(inner_function_name))
const auto & arithmetic_function_name = arithmetic_function_node_typed->getFunctionName();
if (!supported_aggregate_function_it->second.contains(arithmetic_function_name))
return;
const auto * left_argument_constant_node = inner_function_arguments_nodes[0]->as<ConstantNode>();
const auto * right_argument_constant_node = inner_function_arguments_nodes[1]->as<ConstantNode>();
const auto * left_argument_constant_node = arithmetic_function_arguments_nodes[0]->as<ConstantNode>();
const auto * right_argument_constant_node = arithmetic_function_arguments_nodes[1]->as<ConstantNode>();
/** If we extract negative constant, aggregate function name must be updated.
*
* Example: SELECT min(-1 * id);
* Result: SELECT -1 * max(id);
*/
std::string function_name_if_constant_is_negative;
if (inner_function_name == "multiply" || inner_function_name == "divide")
std::string aggregate_function_name_if_constant_is_negative;
if (arithmetic_function_name == "multiply" || arithmetic_function_name == "divide")
{
if (lower_function_name == "min")
function_name_if_constant_is_negative = "max";
else if (lower_function_name == "max")
function_name_if_constant_is_negative = "min";
if (lower_aggregate_function_name == "min")
aggregate_function_name_if_constant_is_negative = "max";
else if (lower_aggregate_function_name == "max")
aggregate_function_name_if_constant_is_negative = "min";
}
size_t arithmetic_function_argument_index = 0;
if (left_argument_constant_node && !right_argument_constant_node)
{
/// Do not rewrite `sum(1/n)` with `sum(1) * div(1/n)` because of lose accuracy
if (inner_function_name == "divide")
if (arithmetic_function_name == "divide")
return;
/// Rewrite `aggregate_function(inner_function(constant, argument))` into `inner_function(constant, aggregate_function(argument))`
const auto & left_argument_constant_value_literal = left_argument_constant_node->getValue();
if (!function_name_if_constant_is_negative.empty() &&
if (!aggregate_function_name_if_constant_is_negative.empty() &&
left_argument_constant_value_literal < zeroField(left_argument_constant_value_literal))
{
lower_function_name = function_name_if_constant_is_negative;
lower_aggregate_function_name = aggregate_function_name_if_constant_is_negative;
}
auto inner_function_clone = inner_function_node->clone();
auto & inner_function_clone_arguments = inner_function_clone->as<FunctionNode &>().getArguments();
auto & inner_function_clone_arguments_nodes = inner_function_clone_arguments.getNodes();
auto inner_function_clone_right_argument = inner_function_clone_arguments_nodes[1];
aggregate_function_arguments_nodes = {inner_function_clone_right_argument};
resolveAggregateFunctionNode(*aggregate_function_node, inner_function_clone_right_argument, lower_function_name);
inner_function_clone_arguments_nodes[1] = node;
node = std::move(inner_function_clone);
arithmetic_function_argument_index = 1;
}
else if (right_argument_constant_node)
{
/// Rewrite `aggregate_function(inner_function(argument, constant))` into `inner_function(aggregate_function(argument), constant)`
const auto & right_argument_constant_value_literal = right_argument_constant_node->getValue();
if (!function_name_if_constant_is_negative.empty() &&
if (!aggregate_function_name_if_constant_is_negative.empty() &&
right_argument_constant_value_literal < zeroField(right_argument_constant_value_literal))
{
lower_function_name = function_name_if_constant_is_negative;
lower_aggregate_function_name = aggregate_function_name_if_constant_is_negative;
}
auto inner_function_clone = inner_function_node->clone();
auto & inner_function_clone_arguments = inner_function_clone->as<FunctionNode &>().getArguments();
auto & inner_function_clone_arguments_nodes = inner_function_clone_arguments.getNodes();
auto inner_function_clone_left_argument = inner_function_clone_arguments_nodes[0];
aggregate_function_arguments_nodes = {inner_function_clone_left_argument};
resolveAggregateFunctionNode(*aggregate_function_node, inner_function_clone_left_argument, lower_function_name);
inner_function_clone_arguments_nodes[0] = node;
node = std::move(inner_function_clone);
arithmetic_function_argument_index = 0;
}
auto optimized_function_node = cloneArithmeticFunctionAndWrapArgumentIntoAggregateFunction(arithmetic_function_node,
arithmetic_function_argument_index,
node,
lower_aggregate_function_name);
if (optimized_function_node->getResultType()->equals(*node->getResultType()))
node = std::move(optimized_function_node);
}
private:
QueryTreeNodePtr cloneArithmeticFunctionAndWrapArgumentIntoAggregateFunction(
const QueryTreeNodePtr & arithmetic_function,
size_t arithmetic_function_argument_index,
const QueryTreeNodePtr & aggregate_function,
const std::string & result_aggregate_function_name)
{
auto arithmetic_function_clone = arithmetic_function->clone();
auto & arithmetic_function_clone_typed = arithmetic_function_clone->as<FunctionNode &>();
auto & arithmetic_function_clone_arguments_nodes = arithmetic_function_clone_typed.getArguments().getNodes();
auto & arithmetic_function_clone_argument = arithmetic_function_clone_arguments_nodes[arithmetic_function_argument_index];
auto aggregate_function_clone = aggregate_function->clone();
auto & aggregate_function_clone_typed = aggregate_function_clone->as<FunctionNode &>();
aggregate_function_clone_typed.getArguments().getNodes() = { arithmetic_function_clone_argument };
resolveAggregateFunctionNode(aggregate_function_clone_typed, arithmetic_function_clone_argument, result_aggregate_function_name);
arithmetic_function_clone_arguments_nodes[arithmetic_function_argument_index] = std::move(aggregate_function_clone);
resolveOrdinaryFunctionNode(arithmetic_function_clone_typed, arithmetic_function_clone_typed.getFunctionName());
return arithmetic_function_clone;
}
inline void resolveOrdinaryFunctionNode(FunctionNode & function_node, const String & function_name) const
{
auto function = FunctionFactory::instance().get(function_name, context);
function_node.resolveAsFunction(function->build(function_node.getArgumentColumns()));
}
static inline void resolveAggregateFunctionNode(FunctionNode & function_node, const QueryTreeNodePtr & argument, const String & aggregate_function_name)
{
auto function_aggregate_function = function_node.getAggregateFunction();
@ -163,13 +191,15 @@ private:
function_node.resolveAsAggregateFunction(std::move(aggregate_function));
}
ContextPtr context;
};
}
void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr)
void AggregateFunctionsArithmericOperationsPass::run(QueryTreeNodePtr query_tree_node, ContextPtr context)
{
AggregateFunctionsArithmericOperationsVisitor visitor;
AggregateFunctionsArithmericOperationsVisitor visitor(std::move(context));
visitor.visit(query_tree_node);
}

View File

@ -1,6 +1,7 @@
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Common/NamePrompter.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
@ -66,6 +67,14 @@
#include <Analyzer/UnionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/HashUtils.h>
namespace ProfileEvents
{
extern const Event ScalarSubqueriesGlobalCacheHit;
extern const Event ScalarSubqueriesCacheMiss;
}
#include <Common/checkStackSize.h>
@ -1097,7 +1106,7 @@ private:
static QueryTreeNodePtr tryGetLambdaFromSQLUserDefinedFunctions(const std::string & function_name, ContextPtr context);
static void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, size_t subquery_depth, ContextPtr context);
void evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & query_tree_node, size_t subquery_depth, ContextPtr context);
static void mergeWindowWithParentWindow(const QueryTreeNodePtr & window_node, const QueryTreeNodePtr & parent_window_node, IdentifierResolveScope & scope);
@ -1207,6 +1216,9 @@ private:
/// Global resolve expression node to projection names map
std::unordered_map<QueryTreeNodePtr, ProjectionNames> resolved_expressions;
/// Results of scalar sub queries
std::unordered_map<QueryTreeNodeConstRawPtrWithHash, std::shared_ptr<ConstantValue>> scalars;
};
/// Utility functions implementation
@ -1687,6 +1699,16 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
node->getNodeTypeName(),
node->formatASTForErrorMessage());
auto scalars_iterator = scalars.find(node.get());
if (scalars_iterator != scalars.end())
{
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
node = std::make_shared<ConstantNode>(scalars_iterator->second, node);
return;
}
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
auto subquery_context = Context::createCopy(context);
Settings subquery_settings = context->getSettings();
@ -1699,10 +1721,11 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
auto io = interpreter->execute();
Block block;
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
Block block;
while (block.rows() == 0 && executor.pull(block))
{
}
@ -1743,7 +1766,6 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
block = materializeBlock(block);
size_t columns = block.columns();
// Block scalar;
Field scalar_value;
DataTypePtr scalar_type;
@ -1770,6 +1792,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, size
}
auto constant_value = std::make_shared<ConstantValue>(std::move(scalar_value), std::move(scalar_type));
scalars[node.get()] = constant_value;
node = std::make_shared<ConstantNode>(std::move(constant_value), node);
}

View File

@ -1,5 +1,18 @@
#include <Analyzer/QueryTreePassManager.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/Context.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
@ -17,15 +30,6 @@
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Common/Exception.h>
namespace DB
{
@ -45,24 +49,6 @@ namespace
*/
class ValidationChecker : public InDepthQueryTreeVisitor<ValidationChecker>
{
String pass_name;
void visitColumn(ColumnNode * column) const
{
if (column->getColumnSourceOrNull() == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} {} query tree node does not have valid source node after running {} pass",
column->getColumnName(), column->getColumnType(), pass_name);
}
void visitFunction(FunctionNode * function) const
{
if (!function->isResolved())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} is not resolved after running {} pass",
function->toAST()->formatForErrorMessage(), pass_name);
}
public:
explicit ValidationChecker(String pass_name_)
: pass_name(std::move(pass_name_))
@ -75,6 +61,57 @@ public:
else if (auto * function = node->as<FunctionNode>())
return visitFunction(function);
}
private:
void visitColumn(ColumnNode * column) const
{
if (column->getColumnSourceOrNull() == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column {} {} query tree node does not have valid source node after running {} pass",
column->getColumnName(), column->getColumnType(), pass_name);
}
void visitFunction(FunctionNode * function) const
{
if (!function->isResolved())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} is not resolved after running {} pass",
function->toAST()->formatForErrorMessage(), pass_name);
if (isNameOfInFunction(function->getFunctionName()))
return;
const auto & expected_argument_types = function->getArgumentTypes();
size_t expected_argument_types_size = expected_argument_types.size();
auto actual_argument_columns = function->getArgumentColumns();
if (expected_argument_types_size != actual_argument_columns.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} expects {} arguments but has {} after running {} pass",
function->toAST()->formatForErrorMessage(),
expected_argument_types_size,
actual_argument_columns.size(),
pass_name);
for (size_t i = 0; i < expected_argument_types_size; ++i)
{
// Skip lambdas
if (WhichDataType(expected_argument_types[i]).isFunction())
continue;
if (!expected_argument_types[i]->equals(*actual_argument_columns[i].type))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Function {} expects {} argument to have {} type but receives {} after running {} pass",
function->toAST()->formatForErrorMessage(),
i + 1,
expected_argument_types[i]->getName(),
actual_argument_columns[i].type->getName(),
pass_name);
}
}
}
String pass_name;
};
#endif

View File

@ -156,10 +156,9 @@ void BackupWriterS3::copyObjectImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
const Aws::S3::Model::HeadObjectResult & head,
size_t size,
const std::optional<ObjectAttributes> & metadata) const
{
size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
Aws::S3::Model::CopyObjectRequest request;
@ -177,7 +176,7 @@ void BackupWriterS3::copyObjectImpl(
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
return;
}
@ -191,10 +190,9 @@ void BackupWriterS3::copyObjectMultipartImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
const Aws::S3::Model::HeadObjectResult & head,
size_t size,
const std::optional<ObjectAttributes> & metadata) const
{
size_t size = head.GetContentLength();
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
String multipart_upload_id;
@ -309,16 +307,16 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
std::string source_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto head = S3::headObject(*client, source_bucket, objects[0].absolute_path).GetResult();
if (static_cast<size_t>(head.GetContentLength()) < request_settings.getUploadSettings().max_single_operation_copy_size)
auto size = S3::getObjectSize(*client, source_bucket, objects[0].absolute_path);
if (size < request_settings.getUploadSettings().max_single_operation_copy_size)
{
copyObjectImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
else
{
copyObjectMultipartImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
}
}

View File

@ -67,7 +67,7 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
const Aws::S3::Model::HeadObjectResult & head,
size_t size,
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
void copyObjectMultipartImpl(
@ -75,7 +75,7 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
const Aws::S3::Model::HeadObjectResult & head,
size_t size,
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
void removeFilesBatch(const Strings & file_names);

View File

@ -309,6 +309,8 @@ The server successfully detected this situation and will download merged part fr
M(S3CopyObject, "Number of S3 API CopyObject calls.") \
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \
M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
@ -321,6 +323,8 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \
M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \

View File

@ -7,6 +7,29 @@
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
/// This wrapper is useful to save formatted message into a String before sending it to a logger
class LogToStrImpl
{
String & out_str;
Poco::Logger * logger;
bool propagate_to_actual_log = true;
public:
LogToStrImpl(String & out_str_, Poco::Logger * logger_) : out_str(out_str_) , logger(logger_) {}
LogToStrImpl & operator -> () { return *this; }
bool is(Poco::Message::Priority priority) { propagate_to_actual_log &= logger->is(priority); return true; }
LogToStrImpl * getChannel() {return this; }
const String & name() const { return logger->name(); }
void log(const Poco::Message & message)
{
out_str = message.getText();
if (!propagate_to_actual_log)
return;
if (auto * channel = logger->getChannel())
channel->log(message);
}
};
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
namespace
{
@ -17,8 +40,37 @@ namespace
[[maybe_unused]] const ::Poco::Logger * getLogger(const ::Poco::Logger * logger) { return logger; };
[[maybe_unused]] const ::Poco::Logger * getLogger(const std::atomic<::Poco::Logger *> & logger) { return logger.load(); };
[[maybe_unused]] std::unique_ptr<LogToStrImpl> getLogger(std::unique_ptr<LogToStrImpl> && logger) { return logger; };
template<typename T> struct is_fmt_runtime : std::false_type {};
template<typename T> struct is_fmt_runtime<fmt::basic_runtime<T>> : std::true_type {};
/// Usually we use LOG_*(...) macros with either string literals or fmt::runtime(whatever) as a format string.
/// This function is useful to get a string_view to a static format string passed to LOG_* macro.
template <typename T> constexpr std::string_view tryGetStaticFormatString(T && x)
{
if constexpr (is_fmt_runtime<T>::value)
{
/// It definitely was fmt::runtime(something).
/// We are not sure about a lifetime of the string, so return empty view.
/// Also it can be arbitrary string, not a formatting pattern.
/// So returning empty pattern will not pollute the set of patterns.
return std::string_view();
}
else
{
/// Most likely it was a string literal.
/// Unfortunately, there's no good way to check if something is a string literal.
/// But fmtlib requires a format string to be compile-time constant unless fmt::runtime is used.
static_assert(std::is_nothrow_convertible<T, const char * const>::value);
static_assert(!std::is_pointer<T>::value);
return std::string_view(x);
}
}
}
#define LOG_IMPL_FIRST_ARG(X, ...) X
/// Logs a message to a specified logger with that level.
/// If more than one argument is provided,
/// the first argument is interpreted as template with {}-substitutions
@ -30,7 +82,7 @@ namespace
auto _logger = ::getLogger(logger); \
const bool _is_clients_log = (DB::CurrentThread::getGroup() != nullptr) && \
(DB::CurrentThread::getGroup()->client_logs_level >= (priority)); \
if (_logger->is((PRIORITY)) || _is_clients_log) \
if (_is_clients_log || _logger->is((PRIORITY))) \
{ \
std::string formatted_message = numArgs(__VA_ARGS__) > 1 ? fmt::format(__VA_ARGS__) : firstArg(__VA_ARGS__); \
if (auto _channel = _logger->getChannel()) \
@ -40,7 +92,7 @@ namespace
file_function += "; "; \
file_function += __PRETTY_FUNCTION__; \
Poco::Message poco_message(_logger->name(), formatted_message, \
(PRIORITY), file_function.c_str(), __LINE__); \
(PRIORITY), file_function.c_str(), __LINE__, tryGetStaticFormatString(LOG_IMPL_FIRST_ARG(__VA_ARGS__))); \
_channel->log(poco_message); \
} \
} \

View File

@ -171,8 +171,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
if (!hasPendingDataToRead())
return false;
size_t size, offset;
chassert(file_offset_of_buffer_end <= impl->getFileSize());
size_t size, offset;
if (prefetch_future.valid())
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
@ -210,8 +211,8 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
assert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
assert(file_offset_of_buffer_end <= impl->getFileSize());
chassert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
chassert(file_offset_of_buffer_end <= impl->getFileSize());
return bytes_read;
}
@ -277,6 +278,15 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
if (read_until_position && new_pos > *read_until_position)
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
file_offset_of_buffer_end = new_pos = *read_until_position; /// read_until_position is a non-included boundary.
return new_pos;
}
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).

View File

@ -256,7 +256,7 @@ size_t ReadBufferFromRemoteFSGather::getFileSize() const
String ReadBufferFromRemoteFSGather::getInfoForLog()
{
if (!current_buf)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get info: buffer not initialized");
return "";
return current_buf->getInfoForLog();
}

View File

@ -125,14 +125,19 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
}
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
size_t S3ObjectStorage::getObjectSize(const std::string & bucket_from, const std::string & key) const
{
return S3::headObject(*client.get(), bucket_from, key, "", true);
return S3::getObjectSize(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true);
}
bool S3ObjectStorage::exists(const StoredObject & object) const
{
return S3::objectExists(*client.get(), bucket, object.absolute_path, "", true);
return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, /* for_disk_s3= */ true);
}
void S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key, std::string_view description) const
{
return S3::checkObjectExists(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true, description);
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@ -409,13 +414,10 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
{
ObjectMetadata result;
auto object_head = requestObjectHeadData(bucket, path);
throwIfError(object_head);
auto & object_head_result = object_head.GetResult();
result.size_bytes = object_head_result.GetContentLength();
result.last_modified = object_head_result.GetLastModified().Millis();
result.attributes = object_head_result.GetMetadata();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, /* for_disk_s3= */ true);
result.size_bytes = object_info.size;
result.last_modified = object_info.last_modification_time;
result.attributes = S3::getObjectMetadata(*client.get(), bucket, path, {}, /* for_disk_s3= */ true);
return result;
}
@ -442,7 +444,7 @@ void S3ObjectStorage::copyObjectImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
size_t size,
std::optional<ObjectAttributes> metadata) const
{
auto client_ptr = client.get();
@ -464,7 +466,7 @@ void S3ObjectStorage::copyObjectImpl(
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
return;
}
@ -472,12 +474,7 @@ void S3ObjectStorage::copyObjectImpl(
auto settings_ptr = s3_settings.get();
if (settings_ptr->request_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
}
checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
}
void S3ObjectStorage::copyObjectMultipartImpl(
@ -485,15 +482,11 @@ void S3ObjectStorage::copyObjectMultipartImpl(
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
size_t size,
std::optional<ObjectAttributes> metadata) const
{
if (!head)
head = requestObjectHeadData(src_bucket, src_key).GetResult();
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
size_t size = head->GetContentLength();
String multipart_upload_id;
@ -569,29 +562,24 @@ void S3ObjectStorage::copyObjectMultipartImpl(
}
if (settings_ptr->request_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
}
checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
}
void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
{
auto head = requestObjectHeadData(bucket, object_from.absolute_path).GetResult();
auto size = getObjectSize(bucket, object_from.absolute_path);
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
if (head.GetContentLength() >= multipart_upload_threashold)
if (size >= multipart_upload_threashold)
{
copyObjectMultipartImpl(
bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
}
else
{
copyObjectImpl(
bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
}
}

View File

@ -172,7 +172,7 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
size_t size,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
void copyObjectMultipartImpl(
@ -180,13 +180,14 @@ private:
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
size_t size,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
size_t getObjectSize(const std::string & bucket_from, const std::string & key) const;
void checkObjectExists(const std::string & bucket_from, const std::string & key, std::string_view description) const;
std::string bucket;

View File

@ -250,7 +250,7 @@ size_t ReadBufferFromS3::getFileSize()
if (file_size)
return *file_size;
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, true, read_settings.for_object_storage);
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, /* for_disk_s3= */ read_settings.for_object_storage);
file_size = object_size;
return *file_size;

View File

@ -27,6 +27,8 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/GetObjectAttributesRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
# include <aws/s3/model/HeadObjectRequest.h>
# include <IO/S3/PocoHTTPClientFactory.h>
@ -40,7 +42,11 @@
namespace ProfileEvents
{
extern const Event S3GetObjectAttributes;
extern const Event S3GetObjectMetadata;
extern const Event S3HeadObject;
extern const Event DiskS3GetObjectAttributes;
extern const Event DiskS3GetObjectMetadata;
extern const Event DiskS3HeadObject;
}
@ -699,6 +705,92 @@ public:
}
};
/// Extracts the endpoint from a constructed S3 client.
String getEndpoint(const Aws::S3::S3Client & client)
{
const auto * endpoint_provider = dynamic_cast<const Aws::S3::Endpoint::S3DefaultEpProviderBase *>(const_cast<Aws::S3::S3Client &>(client).accessEndpointProvider().get());
if (!endpoint_provider)
return {};
String endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
return endpoint;
}
/// Performs a request to get the size and last modification time of an object.
/// The function performs either HeadObject or GetObjectAttributes request depending on the endpoint.
std::pair<std::optional<DB::S3::ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
auto endpoint = getEndpoint(client);
bool use_get_object_attributes_request = (endpoint.find(".amazonaws.com") != String::npos);
if (use_get_object_attributes_request)
{
/// It's better not to use `HeadObject` requests for AWS S3 because they don't work well with the global region.
/// Details: `HeadObject` request never returns a response body (even if there is an error) however
/// if the request was sent without specifying a region in the endpoint (i.e. for example "https://test.s3.amazonaws.com/mydata.csv"
/// instead of "https://test.s3-us-west-2.amazonaws.com/mydata.csv") then that response body is one of the main ways
/// to determine the correct region and try to repeat the request again with the correct region.
/// For any other request type (`GetObject`, `ListObjects`, etc.) AWS SDK does that because they have response bodies,
/// but for `HeadObject` there is no response body so this way doesn't work. That's why we use `GetObjectAttributes` request instead.
/// See https://github.com/aws/aws-sdk-cpp/issues/1558 and also the function S3ErrorMarshaller::ExtractRegion() for more information.
ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes);
Aws::S3::Model::GetObjectAttributesRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize});
auto outcome = client.GetObjectAttributes(req);
if (outcome.IsSuccess())
{
const auto & result = outcome.GetResult();
DB::S3::ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetObjectSize());
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
return {object_info, {}};
}
return {std::nullopt, outcome.GetError()};
}
else
{
/// By default we use `HeadObject` requests.
/// We cannot just use `GetObjectAttributes` requests always because some S3 providers (e.g. Minio)
/// don't support `GetObjectAttributes` requests.
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
Aws::S3::Model::HeadObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
auto outcome = client.HeadObject(req);
if (outcome.IsSuccess())
{
const auto & result = outcome.GetResult();
DB::S3::ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetContentLength());
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
return {object_info, {}};
}
return {std::nullopt, outcome.GetError()};
}
}
}
@ -894,54 +986,33 @@ namespace S3
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
}
Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
{
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
Aws::S3::Model::HeadObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
return client.HeadObject(req);
}
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
{
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
if (outcome.IsSuccess())
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
if (object_info)
{
auto read_result = outcome.GetResultWithOwnership();
return {.size = static_cast<size_t>(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000};
return *object_info;
}
else if (throw_on_error)
{
const auto & error = outcome.GetError();
throw DB::Exception(ErrorCodes::S3_ERROR,
"Failed to HEAD object: {}. HTTP response code: {}",
"Failed to get object attributes: {}. HTTP response code: {}",
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
}
return {};
}
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
{
return getObjectInfo(client, bucket, key, version_id, throw_on_error, for_disk_s3).size;
return getObjectInfo(client, bucket, key, version_id, for_disk_s3, throw_on_error).size;
}
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
if (outcome.IsSuccess())
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
if (object_info)
return true;
const auto & error = outcome.GetError();
if (isNotFoundError(error.GetErrorType()))
return false;
@ -949,6 +1020,48 @@ namespace S3
"Failed to check existence of key {} in bucket {}: {}",
key, bucket, error.GetMessage());
}
void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, std::string_view description)
{
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
if (object_info)
return;
throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",
(description.empty() ? "" : (String(description) + ": ")), key, bucket, error.GetMessage());
}
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
{
ProfileEvents::increment(ProfileEvents::S3GetObjectMetadata);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectMetadata);
/// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h.
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
/// Only the first byte will be read.
/// We don't need that first byte but the range should be set otherwise the entire object will be read.
req.SetRange("bytes=0-0");
if (!version_id.empty())
req.SetVersionId(version_id);
auto outcome = client.GetObject(req);
if (outcome.IsSuccess())
return outcome.GetResult().GetMetadata();
if (!throw_on_error)
return {};
const auto & error = outcome.GetError();
throw S3Exception(error.GetErrorType(),
"Failed to get metadata of key {} in bucket {}: {}",
key, bucket, error.GetMessage());
}
}
}

View File

@ -11,15 +11,15 @@
#if USE_AWS_S3
#include <base/types.h>
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
#include <Poco/URI.h>
#include <aws/core/Aws.h>
#include <aws/s3/S3Errors.h>
namespace Aws::S3 { class S3Client; }
namespace DB
{
@ -121,22 +121,29 @@ struct URI
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
/// WARNING: Don't use `HeadObjectRequest`! Use the functions below instead.
/// For explanation see the comment about `HeadObject` request in the function tryGetObjectInfo().
struct ObjectInfo
{
size_t size = 0;
time_t last_modification_time = 0;
};
bool isNotFoundError(Aws::S3::S3Errors error);
ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, std::string_view description = {});
bool isNotFoundError(Aws::S3::S3Errors error);
/// Returns the object's metadata.
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
}
#endif

View File

@ -182,12 +182,8 @@ void WriteBufferFromS3::finalizeImpl()
if (check_objects_after_upload)
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
auto response = S3::headObject(*client_ptr, bucket, key, "", write_settings.for_object_storage);
if (!response.IsSuccess())
throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), response.GetError().GetErrorType());
else
LOG_TRACE(log, "Object {} exists after upload", key);
S3::checkObjectExists(*client_ptr, bucket, key, {}, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload");
LOG_TRACE(log, "Object {} exists after upload", key);
}
}

View File

@ -49,7 +49,9 @@ NamesAndTypesList TextLogElement::getNamesAndTypes()
{"revision", std::make_shared<DataTypeUInt32>()},
{"source_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"source_line", std::make_shared<DataTypeUInt64>()}
{"source_line", std::make_shared<DataTypeUInt64>()},
{"message_format_string", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
};
}
@ -74,6 +76,8 @@ void TextLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file);
columns[i++]->insert(source_line);
columns[i++]->insert(message_format_string);
}
TextLog::TextLog(ContextPtr context_, const String & database_name_,

View File

@ -28,6 +28,8 @@ struct TextLogElement
String source_file;
UInt64 source_line{};
std::string_view message_format_string;
static std::string name() { return "TextLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }

View File

@ -133,6 +133,8 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
elem.source_file = msg.getSourceFile();
elem.source_line = msg.getSourceLine();
elem.message_format_string = msg.getFormatString();
std::shared_ptr<TextLog> text_log_locked{};
{
std::lock_guard<std::mutex> lock(text_log_mutex);

View File

@ -196,7 +196,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num)
#ifndef NDEBUG
auto & context = tasks.getThreadContext(thread_num);
LOG_TRACE(log,
LOG_TEST(log,
"Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.",
context.total_time_ns / 1e9,
context.execution_time_ns / 1e9,

View File

@ -304,7 +304,9 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
};
}
if (null_as_default)
/// If the Union is ['Null', Nested-Type], since the Nested-Type can not be inside
/// Nullable, so we will get Nested-Type, instead of Nullable type.
if (null_as_default || !target.isNullable())
{
auto nested_deserialize = this->createDeserializeFn(root_node->leafAt(non_null_union_index), target_type);
return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder)
@ -1001,7 +1003,7 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
case avro::Type::AVRO_STRING:
return std::make_shared<DataTypeString>();
case avro::Type::AVRO_BYTES:
return std::make_shared<DataTypeFloat32>();
return std::make_shared<DataTypeString>();
case avro::Type::AVRO_ENUM:
{
if (node->names() < 128)

View File

@ -402,14 +402,10 @@ void TCPHandler::runImpl()
{
auto callback = [this]()
{
{
std::lock_guard task_callback_lock(task_callback_mutex);
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
if (isQueryCancelled())
return true;
}
std::lock_guard lock(fatal_error_mutex);
if (isQueryCancelled())
return true;
sendProgress();
sendSelectProfileEvents();
@ -424,6 +420,9 @@ void TCPHandler::runImpl()
}
state.io.onFinish();
std::lock_guard lock(task_callback_mutex);
/// Send final progress after calling onFinish(), since it will update the progress.
///
/// NOTE: we cannot send Progress for regular INSERT (with VALUES)
@ -446,8 +445,11 @@ void TCPHandler::runImpl()
if (state.is_connection_closed)
break;
sendLogs();
sendEndOfStream();
{
std::lock_guard lock(task_callback_mutex);
sendLogs();
sendEndOfStream();
}
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
@ -760,6 +762,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
}
/// Defer locking to cover a part of the scope below and everything after it
std::unique_lock progress_lock(task_callback_mutex, std::defer_lock);
{
PullingAsyncPipelineExecutor executor(pipeline);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
@ -796,6 +801,11 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
}
/// This lock wasn't acquired before and we make .lock() call here
/// so everything under this line is covered even together
/// with sendProgress() out of the scope
progress_lock.lock();
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.

View File

@ -59,6 +59,7 @@ namespace ErrorCodes
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int CANNOT_COMPILE_REGEXP;
}
namespace
{
@ -75,6 +76,9 @@ namespace
const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);

View File

@ -651,7 +651,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
}
#endif
LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
LOG_WARNING(log, "Will retry fetching part without zero-copy: {}", e.message());
/// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock
/// on http pool.

View File

@ -109,10 +109,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
/// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too)
/// 3. We have two intersecting parts, both cover source_part_name. It's logical error.
/// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else?
constexpr const char * message = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
LOG_WARNING(log, fmt::runtime(message), source_part_name, source_part_or_covering->name, entry.new_part_name);
constexpr auto fmt_string = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
String message;
LOG_WARNING(LogToStr(message, log), fmt_string, source_part_name, source_part_or_covering->name, entry.new_part_name);
if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version)))
throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, source_part_or_covering->name, entry.new_part_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, message);
return PrepareResult{
.prepared_successfully = false,

View File

@ -3899,7 +3899,9 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
if (error)
{
LOG_ERROR(log, "The set of parts restored in place of {} looks incomplete. There might or might not be a data loss.{}", part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
"SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}",
part->name, (error_parts.empty() ? "" : " Suspicious parts: " + error_parts));
}
}

View File

@ -373,9 +373,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
String message = "Part " + part_name + " looks broken. Removing it and will try to fetch.";
LOG_ERROR(log, fmt::runtime(message));
constexpr auto fmt_string = "Part {} looks broken. Removing it and will try to fetch.";
String message = fmt::format(fmt_string, part_name);
LOG_ERROR(log, fmt_string, part_name);
/// Delete part locally.
storage.outdateBrokenPartAndCloneToDetached(part, "broken");
@ -392,9 +392,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// Probably, someone just wrote down the part, and has not yet added to ZK.
/// Therefore, delete only if the part is old (not very reliable).
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
String message = "Unexpected part " + part_name + " in filesystem. Removing.";
LOG_ERROR(log, fmt::runtime(message));
constexpr auto fmt_string = "Unexpected part {} in filesystem. Removing.";
String message = fmt::format(fmt_string, part_name);
LOG_ERROR(log, fmt_string, part_name);
storage.outdateBrokenPartAndCloneToDetached(part, "unexpected");
return {part_name, false, message};
}

View File

@ -1191,12 +1191,10 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
if (entry_for_same_part_it != future_parts.end())
{
const LogEntry & another_entry = *entry_for_same_part_it->second;
out_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because another log entry {} of type {} for the same part ({}) is being processed.",
entry.znode_name, entry.type, entry.new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
LOG_INFO(log, fmt::runtime(out_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another log entry {} of type {} for the same part ({}) is being processed.";
LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
return true;
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
@ -1238,11 +1236,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
{
if (entry.znode_name < future_part_elem.second->znode_name)
{
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing and another entry {} is newer.",
entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
LOG_TRACE(log, fmt::runtime(out_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing and another entry {} is newer.";
LOG_TRACE(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
return true;
}
@ -1250,11 +1246,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
continue;
}
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing.",
entry.znode_name, new_part_name, future_part_elem.first);
LOG_TRACE(log, fmt::runtime(out_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing.";
LOG_TEST(LogToStr(out_reason, log), fmt_string, entry.znode_name, new_part_name, future_part_elem.first);
return true;
}
@ -1337,11 +1331,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (future_parts.contains(name))
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).",
entry.znode_name, entry.typeToString(), entry.new_part_name, name);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
return false;
}
@ -1357,10 +1349,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (merger_mutator.merges_blocker.isCancelled())
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.",
entry.znode_name, entry.typeToString(), entry.new_part_name);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
@ -1375,8 +1365,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!disks.empty() && only_s3_storage && storage.checkZeroCopyLockExists(entry.new_part_name, disks[0]))
{
out_postpone_reason = "Not executing merge/mutation for the part " + entry.new_part_name
+ ", waiting other replica to execute it and will fetch after.";
constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting other replica to execute it and will fetch after.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name);
return false;
}
}
@ -1387,9 +1377,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
out_postpone_reason = reason;
constexpr auto fmt_string = "Not executing merge for the part {}, waiting for {} to execute merge.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}
@ -1411,20 +1400,16 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (merger_mutator.ttl_merges_blocker.isCancelled())
{
out_postpone_reason = fmt::format(
"Not executing log entry {} for part {} because merges with TTL are cancelled now.",
entry.znode_name, entry.new_part_name);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.new_part_name);
return false;
}
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
{
out_postpone_reason = fmt::format(
"Not executing log entry {} for part {} because {} merges with TTL already executing, maximum {}.",
entry.znode_name, entry.new_part_name, total_merges_with_ttl,
data_settings->max_number_of_merges_with_ttl_in_pool);
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} for part {} because {} merges with TTL already executing, maximum {}.";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.new_part_name, total_merges_with_ttl,
data_settings->max_number_of_merges_with_ttl_in_pool);
return false;
}
}
@ -1432,12 +1417,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
out_postpone_reason = fmt::format("Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).",
entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
LOG_DEBUG(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).";
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
return false;
}
@ -1450,10 +1433,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = fmt::format(
"Cannot execute alter metadata {} with version {} because another alter {} must be executed before",
entry.znode_name, entry.alter_version, head_alter);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version, head_alter);
return false;
}
}
@ -1466,17 +1447,13 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
{
out_postpone_reason = fmt::format(
"Cannot execute alter data {} with version {} because metadata still not altered",
entry.znode_name, entry.alter_version);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Cannot execute alter data {} with version {} because metadata still not altered";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version);
}
else
{
out_postpone_reason = fmt::format(
"Cannot execute alter data {} with version {} because another alter {} must be executed before",
entry.znode_name, entry.alter_version, head_alter);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.alter_version, head_alter);
}
return false;
@ -1498,14 +1475,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (drop_range_info.isDisjoint(info))
continue;
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.",
entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
return false;
}
}
@ -1531,11 +1506,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).",
entry.znode_name, entry.typeToString(), entry.new_part_name, replace_entry->znode_name);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
entry.new_part_name, replace_entry->znode_name);
return false;
}
}

View File

@ -80,6 +80,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int CANNOT_COMPILE_REGEXP;
}
namespace
@ -105,6 +106,9 @@ void listFilesWithRegexpMatchingImpl(
auto regexp = makeRegexpPatternFromGlobs(current_glob);
re2::RE2 matcher(regexp);
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
bool skip_regex = current_glob == "/*" ? true : false;
if (!recursive)

View File

@ -1303,10 +1303,10 @@ bool StorageMergeTree::optimize(
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr const char * message = "Cannot OPTIMIZE table: {}";
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, fmt::runtime(message), disable_reason);
LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);
@ -1330,10 +1330,10 @@ bool StorageMergeTree::optimize(
&disable_reason,
local_context->getSettingsRef().optimize_skip_merged_partitions))
{
constexpr const char * message = "Cannot OPTIMIZE table: {}";
constexpr auto message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, fmt::runtime(message), disable_reason);
LOG_INFO(log, message, disable_reason);
if (local_context->getSettingsRef().optimize_throw_if_noop)
throw Exception(ErrorCodes::CANNOT_ASSIGN_OPTIMIZE, message, disable_reason);

View File

@ -1272,12 +1272,12 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
const auto storage_settings_ptr = getSettings();
bool insane = uncovered_unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
constexpr const char * sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
constexpr auto sanity_report_fmt = "The local set of parts of table {} doesn't look like the set of parts in ZooKeeper: "
"{} rows of {} total rows in filesystem are suspicious. "
"There are {} uncovered unexpected parts with {} rows ({} of them is not just-written with {} rows), "
"{} missing parts (with {} blocks), {} covered unexpected parts (with {} rows).";
constexpr const char * sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
constexpr auto sanity_report_debug_fmt = "Uncovered unexpected parts: {}. Missing parts: {}. Covered unexpected parts: {}. Expected parts: {}.";
if (insane && !skip_sanity_checks)
{
@ -1293,7 +1293,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, sanity_report_debug_fmt, fmt::join(uncovered_unexpected_parts, ", "), fmt::join(parts_to_fetch, ", "),
fmt::join(covered_unexpected_parts, ", "), fmt::join(expected_parts, ", "));
LOG_WARNING(log, fmt::runtime(sanity_report_fmt), getStorageID().getNameForLogs(),
LOG_WARNING(log, sanity_report_fmt, getStorageID().getNameForLogs(),
formatReadableQuantity(uncovered_unexpected_parts_rows), formatReadableQuantity(total_rows_on_filesystem),
uncovered_unexpected_parts.size(), uncovered_unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks, covered_unexpected_parts.size(), unexpected_parts_rows - uncovered_unexpected_parts_rows);

View File

@ -109,6 +109,7 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_COMPILE_REGEXP;
}
class IOutputFormat;
@ -174,6 +175,10 @@ public:
outcome_future = listObjectsAsync();
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(globbed_uri.key));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error());
recursive = globbed_uri.key == "/**" ? true : false;
fillInternalBufferAssumeLocked();
}
@ -444,7 +449,7 @@ public:
/// (which means we eventually need this info anyway, so it should be ok to do it now)
if (object_infos_)
{
info = S3::getObjectInfo(client_, bucket, key, version_id_, true, false);
info = S3::getObjectInfo(client_, bucket, key, version_id_);
total_size += info->size;
String path = fs::path(bucket) / key;
@ -569,9 +574,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
if (current_key.empty())
return {};
size_t object_size = info
? info->size
: S3::getObjectSize(*client, bucket, current_key, version_id, true, false);
size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id);
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
auto read_buf = wrapReadBufferWithCompressionMethod(
@ -1523,7 +1526,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false, false);
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, {}, /* throw_on_error= */ false);
if (object_infos)
(*object_infos)[path] = info;
}

View File

@ -176,6 +176,24 @@ private:
}
ReaderHolder() = default;
ReaderHolder(const ReaderHolder & other) = delete;
ReaderHolder & operator=(const ReaderHolder & other) = delete;
ReaderHolder(ReaderHolder && other) noexcept
{
*this = std::move(other);
}
ReaderHolder & operator=(ReaderHolder && other) noexcept
{
/// The order of destruction is important.
/// reader uses pipeline, pipeline uses read_buf.
reader = std::move(other.reader);
pipeline = std::move(other.pipeline);
read_buf = std::move(other.read_buf);
path = std::move(other.path);
return *this;
}
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }

View File

@ -345,9 +345,11 @@ namespace
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
if (supports_ranges)
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is supported");
else
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT

View File

@ -1817,6 +1817,56 @@ def reportCoverage(args):
"""
)
def reportLogStats(args):
query = """
WITH
120 AS mins,
(
SELECT (count(), sum(length(message)))
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
) AS total
SELECT
count() AS count,
round(count / (total.1), 3) AS `count_%`,
formatReadableSize(sum(length(message))) AS size,
round(sum(length(message)) / (total.2), 3) AS `size_%`,
countDistinct(logger_name) AS uniq_loggers,
countDistinct(thread_id) AS uniq_threads,
groupArrayDistinct(toString(level)) AS levels,
round(sum(query_id = '') / count, 3) AS `background_%`,
message_format_string
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
GROUP BY message_format_string
ORDER BY count DESC
LIMIT 100
FORMAT TSVWithNamesAndTypes
"""
value = clickhouse_execute(args, query).decode()
print("\nTop patterns of log messages:\n")
print(value)
print("\n")
query = """
WITH
120 AS mins
SELECT
count() AS count,
substr(replaceRegexpAll(message, '[^A-Za-z]+', ''), 1, 32) AS pattern,
substr(any(message), 1, 256) as runtime_message
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
GROUP BY pattern
ORDER BY count DESC
LIMIT 50
FORMAT TSVWithNamesAndTypes
"""
value = clickhouse_execute(args, query).decode()
print("\nTop messages without format string (fmt::runtime):\n")
print(value)
print("\n")
def main(args):
global server_died
@ -1951,6 +2001,9 @@ def main(args):
else:
print("All tests have finished.")
if args.report_logs_stats:
reportLogStats(args)
if args.report_coverage and not reportCoverage(args):
exit_code.value = 1
@ -2242,6 +2295,12 @@ if __name__ == "__main__":
default=False,
help="Check what high-level server components were covered by tests",
)
parser.add_argument(
"--report-logs-stats",
action="store_true",
default=False,
help="Report statistics about log messages",
)
args = parser.parse_args()
if args.queries and not os.path.isdir(args.queries):

View File

@ -1,2 +1,3 @@
02177_CTE_GLOBAL_ON 5 500 11 0 5
02177_CTE_GLOBAL_OFF 1 100 5 0 1
02177_CTE_NEW_ANALYZER 2 200 3 0 2

View File

@ -18,6 +18,16 @@ SELECT '02177_CTE_GLOBAL_OFF', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS enable_global_with_statement = 0;
WITH
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ),
( SELECT sleep(0.0001) FROM system.one ) as a5
SELECT '02177_CTE_NEW_ANALYZER', a5 FROM system.numbers LIMIT 100
FORMAT Null
SETTINGS allow_experimental_analyzer = 1;
SYSTEM FLUSH LOGS;
SELECT
'02177_CTE_GLOBAL_ON',
@ -46,3 +56,17 @@ WHERE
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_GLOBAL_OFF%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
SELECT
'02177_CTE_NEW_ANALYZER',
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_NEW_ANALYZER%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;

View File

@ -0,0 +1,64 @@
-- { echoOn }
EXPLAIN QUERY TREE SELECT avg(log(2) * number) FROM numbers(10);
QUERY id: 0
PROJECTION COLUMNS
avg(multiply(log(2), number)) Float64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: multiply, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 3, nodes: 2
CONSTANT id: 4, constant_value: Float64_0.6931471805599453, constant_value_type: Float64
EXPRESSION
FUNCTION id: 5, function_name: log, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 6, nodes: 1
CONSTANT id: 7, constant_value: UInt64_2, constant_value_type: UInt8
FUNCTION id: 8, function_name: avg, function_type: aggregate, result_type: Float64
ARGUMENTS
LIST id: 9, nodes: 1
COLUMN id: 10, column_name: number, result_type: UInt64, source_id: 11
JOIN TREE
TABLE_FUNCTION id: 11, table_function_name: numbers
ARGUMENTS
LIST id: 12, nodes: 1
CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
EXPLAIN QUERY TREE SELECT avg(number * log(2)) FROM numbers(10);
QUERY id: 0
PROJECTION COLUMNS
avg(multiply(number, log(2))) Float64
PROJECTION
LIST id: 1, nodes: 1
FUNCTION id: 2, function_name: multiply, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 3, nodes: 2
FUNCTION id: 4, function_name: avg, function_type: aggregate, result_type: Float64
ARGUMENTS
LIST id: 5, nodes: 1
COLUMN id: 6, column_name: number, result_type: UInt64, source_id: 7
CONSTANT id: 8, constant_value: Float64_0.6931471805599453, constant_value_type: Float64
EXPRESSION
FUNCTION id: 9, function_name: log, function_type: ordinary, result_type: Float64
ARGUMENTS
LIST id: 10, nodes: 1
CONSTANT id: 11, constant_value: UInt64_2, constant_value_type: UInt8
JOIN TREE
TABLE_FUNCTION id: 7, table_function_name: numbers
ARGUMENTS
LIST id: 12, nodes: 1
CONSTANT id: 13, constant_value: UInt64_10, constant_value_type: UInt8
SELECT round(avg(log(2) * number), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028
SELECT round(avg(number * log(2)), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028

View File

@ -0,0 +1,14 @@
SET allow_experimental_analyzer = 1;
SET optimize_arithmetic_operations_in_aggregate_functions = 1;
-- { echoOn }
EXPLAIN QUERY TREE SELECT avg(log(2) * number) FROM numbers(10);
EXPLAIN QUERY TREE SELECT avg(number * log(2)) FROM numbers(10);
SELECT round(avg(log(2) * number), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
SELECT round(avg(number * log(2)), 6) AS k FROM numbers(10000000) GROUP BY number % 3, number % 2;
-- { echoOff }

View File

@ -1,81 +0,0 @@
-- { echoOn }
EXPLAIN actions=1
(
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
Expression ((Project names + Projection))
Actions: INPUT : 0 -> avg(number_0) Float64 : 0
COLUMN Const(Float64) -> 0.6931471805599453_Float64 Float64 : 1
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
FUNCTION multiply(0.6931471805599453_Float64 :: 1, avg(number_0) :: 0) -> multiply(0.6931471805599453_Float64, avg(number_0)) Float64 : 3
FUNCTION round(multiply(0.6931471805599453_Float64, avg(number_0)) :: 3, 6_UInt8 :: 2) -> round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) Float64 : 0
ALIAS round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) :: 0 -> k Float64 : 2
Positions: 2
Aggregating
Keys: modulo(number_0, 3_UInt8), modulo(number_0, 2_UInt8)
Aggregates:
avg(number_0)
Function: avg(UInt64) → Float64
Arguments: number_0
Expression ((Before GROUP BY + Change column names to column identifiers))
Actions: INPUT : 0 -> number UInt64 : 0
COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1
COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 2
ALIAS number :: 0 -> number_0 UInt64 : 3
FUNCTION modulo(number_0 : 3, 3_UInt8 :: 1) -> modulo(number_0, 3_UInt8) UInt8 : 0
FUNCTION modulo(number_0 : 3, 2_UInt8 :: 2) -> modulo(number_0, 2_UInt8) UInt8 : 1
Positions: 0 1 3
ReadFromStorage (SystemNumbers)
EXPLAIN actions=1
(
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
Expression ((Project names + Projection))
Actions: INPUT : 0 -> avg(number_0) Float64 : 0
COLUMN Const(Float64) -> 0.6931471805599453_Float64 Float64 : 1
COLUMN Const(UInt8) -> 6_UInt8 UInt8 : 2
FUNCTION multiply(0.6931471805599453_Float64 :: 1, avg(number_0) :: 0) -> multiply(0.6931471805599453_Float64, avg(number_0)) Float64 : 3
FUNCTION round(multiply(0.6931471805599453_Float64, avg(number_0)) :: 3, 6_UInt8 :: 2) -> round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) Float64 : 0
ALIAS round(multiply(0.6931471805599453_Float64, avg(number_0)), 6_UInt8) :: 0 -> k Float64 : 2
Positions: 2
Aggregating
Keys: modulo(number_0, 3_UInt8), modulo(number_0, 2_UInt8)
Aggregates:
avg(number_0)
Function: avg(UInt64) → Float64
Arguments: number_0
Expression ((Before GROUP BY + Change column names to column identifiers))
Actions: INPUT : 0 -> number UInt64 : 0
COLUMN Const(UInt8) -> 3_UInt8 UInt8 : 1
COLUMN Const(UInt8) -> 2_UInt8 UInt8 : 2
ALIAS number :: 0 -> number_0 UInt64 : 3
FUNCTION modulo(number_0 : 3, 3_UInt8 :: 1) -> modulo(number_0, 3_UInt8) UInt8 : 0
FUNCTION modulo(number_0 : 3, 2_UInt8 :: 2) -> modulo(number_0, 2_UInt8) UInt8 : 1
Positions: 0 1 3
ReadFromStorage (SystemNumbers)
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=1;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=0;
3465734.516505
3465735.209653
3465735.9028
3465736.595947
3465735.209653
3465735.9028

View File

@ -1,26 +0,0 @@
-- { echoOn }
EXPLAIN actions=1
(
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
EXPLAIN actions=1
(
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
)
SETTINGS allow_experimental_analyzer=1;
SELECT round(avg(log(2) * number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=1;
SELECT round(log(2) * avg(number), 6) AS k
FROM numbers(10000000)
GROUP BY number % 3, number % 2
SETTINGS allow_experimental_analyzer=0;

View File

@ -0,0 +1,15 @@
manifest_path String
manifest_length Int64
partition_spec_id Int32
added_snapshot_id Nullable(Int64)
added_data_files_count Nullable(Int32)
existing_data_files_count Nullable(Int32)
deleted_data_files_count Nullable(Int32)
partitions Array(Tuple(contains_null Bool, contains_nan Nullable(Bool), lower_bound Nullable(String), upper_bound Nullable(String)))
added_rows_count Nullable(Int64)
existing_rows_count Nullable(Int64)
deleted_rows_count Nullable(Int64)
file:/warehouse/nyc.db/taxis/metadata/f9e891e9-fbd3-4411-a5c6-0cc14a2f1392-m0.avro 6488 0 1793608066486471262 8 0 0 [(false,false,'\0\0\0\0\0\0\0','\b\0\0\0\0\0\0\0')] 12 0 0
file:/warehouse/nyc.db/taxis/metadata/a51dd31d-ea86-42dd-82d1-1981332a0f6d-m0.avro 6363 0 5735460159761889536 4 0 0 [(false,false,'\0\0\0\0\0\0\0','\b\0\0\0\0\0\0\0')] 4 0 0
file:/warehouse/nyc.db/taxis/metadata/7ae325bd-fe20-4a55-917c-36cb8f6a488c-m0.avro 6370 0 7171740521400098346 4 0 0 [(false,false,'\0\0\0\0\0\0\0','\0\0\0\0\0\0\0')] 4 0 0
file:/warehouse/nyc.db/taxis/metadata/5e3c62a9-1537-455f-98e5-0a067af5752a-m0.avro 6324 0 6850377589038341628 2 0 0 [(false,false,'\0\0\0\0\0\0\0','\0\0\0\0\0\0\0')] 4 0 0

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_DIR=$CUR_DIR/data_avro
$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/union_null_nested.avro')"
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/union_null_nested.avro')"

View File

@ -0,0 +1,5 @@
status Int32
snapshot_id Nullable(Int64)
data_file Tuple(file_path String, file_format String, partition Tuple(vendor_id Nullable(Int64)), record_count Int64, file_size_in_bytes Int64, block_size_in_bytes Int64, column_sizes Array(Tuple(key Int32, value Int64)), value_counts Array(Tuple(key Int32, value Int64)), null_value_counts Array(Tuple(key Int32, value Int64)), nan_value_counts Array(Tuple(key Int32, value Int64)), lower_bounds Array(Tuple(key Int32, value String)), upper_bounds Array(Tuple(key Int32, value String)), key_metadata Nullable(String), split_offsets Array(Int64), sort_order_id Nullable(Int32))
1 6850377589038341628 ('file:/warehouse/nyc.db/taxis/data/vendor_id=1/00000-0-c070e655-dc44-43d2-a01a-484f107210cb-00001.parquet','PARQUET',(1),2,1565,67108864,[(1,87),(2,51),(3,51),(4,57),(5,51)],[(1,2),(2,2),(3,2),(4,2),(5,2)],[(1,0),(2,0),(3,0),(4,0),(5,0)],[(3,0),(4,0)],[(1,'\0\0\0\0\0\0\0'),(2,'³C\0\0\0\0\0'),(3,'ffæ?'),(4,'¤p=\n×£.@'),(5,'N')],[(1,'\0\0\0\0\0\0\0'),(2,'¶C\0\0\0\0\0'),(3,'ffA'),(4,'q=\n×£E@'),(5,'Y')],NULL,[4],0)
1 6850377589038341628 ('file:/warehouse/nyc.db/taxis/data/vendor_id=2/00000-0-c070e655-dc44-43d2-a01a-484f107210cb-00002.parquet','PARQUET',(2),2,1620,67108864,[(1,87),(2,51),(3,51),(4,57),(5,89)],[(1,2),(2,2),(3,2),(4,2),(5,2)],[(1,0),(2,0),(3,0),(4,0),(5,0)],[(3,0),(4,0)],[(1,'\0\0\0\0\0\0\0'),(2,'´C\0\0\0\0\0'),(3,'fff?'),(4,'…ëQ¸"@'),(5,'N')],[(1,'\0\0\0\0\0\0\0'),(2,'µC\0\0\0\0\0'),(3,'\0\0 @'),(4,'fffff&6@'),(5,'N')],NULL,[4],0)

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_DIR=$CUR_DIR/data_avro
$CLICKHOUSE_LOCAL -q "desc file('$DATA_DIR/complicated_schema.avro')"
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_DIR/complicated_schema.avro')"

View File

@ -0,0 +1,4 @@
1 2 3
4 5 6
7 8 9
0 0 0

View File

@ -0,0 +1,5 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
select * from s3('http://localhost:11111/test/a.tsv', CustomSeparated);

View File

@ -363,7 +363,7 @@ int decompressFiles(int input_fd, char * path, char * name, bool & have_compress
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
uint32_t getInode(const char * self)
uint64_t getInode(const char * self)
{
std::ifstream maps("/proc/self/maps");
if (maps.fail())
@ -380,7 +380,7 @@ uint32_t getInode(const char * self)
{
std::stringstream ss(line); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::string addr, mode, offset, id, path;
uint32_t inode = 0;
uint64_t inode = 0;
if (ss >> addr >> mode >> offset >> id >> inode >> path && path == self)
return inode;
}
@ -415,7 +415,7 @@ int main(int/* argc*/, char* argv[])
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
/// get inode of this executable
uint32_t inode = getInode(self);
uint64_t inode = getInode(self);
if (inode == 0)
{
std::cerr << "Unable to obtain inode." << std::endl;
@ -447,6 +447,11 @@ int main(int/* argc*/, char* argv[])
return 1;
}
/// inconsistency in WSL1 Ubuntu - inode reported in /proc/self/maps is a 64bit to
/// 32bit conversion of input_info.st_ino
if (input_info.st_ino & 0xFFFFFFFF00000000 && !(inode & 0xFFFFFFFF00000000))
input_info.st_ino &= 0x00000000FFFFFFFF;
/// if decompression was performed by another process since this copy was started
/// then file referred by path "self" is already pointing to different inode
if (input_info.st_ino != inode)