mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
Merge remote-tracking branch 'origin/master' into igor/remove_redundant_order_by
This commit is contained in:
commit
1bed5064e7
@ -448,12 +448,7 @@ else()
|
||||
link_libraries(global-group)
|
||||
endif ()
|
||||
|
||||
if (NOT (OS_LINUX OR OS_DARWIN))
|
||||
# Using system libs can cause a lot of warnings in includes (on macro expansion).
|
||||
option(WERROR "Enable -Werror compiler option" OFF)
|
||||
else ()
|
||||
option(WERROR "Enable -Werror compiler option" ON)
|
||||
endif ()
|
||||
option(WERROR "Enable -Werror compiler option" ON)
|
||||
|
||||
if (WERROR)
|
||||
# Don't pollute CMAKE_CXX_FLAGS with -Werror as it will break some CMake checks.
|
||||
|
@ -153,7 +153,7 @@ restart:
|
||||
|
||||
/* Restore old terminal settings and signals. */
|
||||
if (memcmp(&term, &oterm, sizeof(term)) != 0) {
|
||||
const int sigttou = signo[SIGTTOU];
|
||||
const int sigttou = (int)signo[SIGTTOU];
|
||||
|
||||
/* Ignore SIGTTOU generated when we are not the fg pgrp. */
|
||||
while (tcsetattr(input, TCSAFLUSH|TCSASOFT, &oterm) == -1 &&
|
||||
|
@ -6,6 +6,26 @@ sidebar_label: Float32, Float64
|
||||
|
||||
# Float32, Float64
|
||||
|
||||
:::warning
|
||||
If you need accurate calculations, in particular if you work with financial or business data requiring a high precision you should consider using Decimal instead. Floats might lead to inaccurate results as illustrated below:
|
||||
|
||||
```
|
||||
CREATE TABLE IF NOT EXISTS float_vs_decimal
|
||||
(
|
||||
my_float Float64,
|
||||
my_decimal Decimal64(3)
|
||||
)Engine=MergeTree ORDER BY tuple()
|
||||
|
||||
INSERT INTO float_vs_decimal SELECT round(canonicalRand(), 3) AS res, res FROM system.numbers LIMIT 1000000; # Generate 1 000 000 random number with 2 decimal places and store them as a float and as a decimal
|
||||
|
||||
SELECT sum(my_float), sum(my_decimal) FROM float_vs_decimal;
|
||||
> 500279.56300000014 500279.563
|
||||
|
||||
SELECT sumKahan(my_float), sumKahan(my_decimal) FROM float_vs_decimal;
|
||||
> 500279.563 500279.563
|
||||
```
|
||||
:::
|
||||
|
||||
[Floating point numbers](https://en.wikipedia.org/wiki/IEEE_754).
|
||||
|
||||
Types are equivalent to types of C:
|
||||
@ -13,8 +33,6 @@ Types are equivalent to types of C:
|
||||
- `Float32` — `float`.
|
||||
- `Float64` — `double`.
|
||||
|
||||
We recommend that you store data in integer form whenever possible. For example, convert fixed precision numbers to integer values, such as monetary amounts or page load times in milliseconds.
|
||||
|
||||
Aliases:
|
||||
|
||||
- `Float32` — `FLOAT`.
|
||||
|
@ -41,7 +41,7 @@ ORDER BY (postcode1, postcode2, addr1, addr2);
|
||||
我们将使用 `url` 函数将数据流式传输到 ClickHouse。我们需要首先预处理一些传入的数据,其中包括:
|
||||
|
||||
- 将`postcode` 拆分为两个不同的列 - `postcode1` 和 `postcode2`,因为这更适合存储和查询
|
||||
- 将`time` 字段转换为日期为它只包含 00:00 时间
|
||||
- 将`time` 字段转换为日期因为它只包含 00:00 时间
|
||||
- 忽略 [UUid](/docs/zh/sql-reference/data-types/uuid.md) 字段,因为我们不需要它进行分析
|
||||
- 使用 [transform](/docs/zh/sql-reference/functions/other-functions.md#transform) 函数将 `Enum` 字段 `type` 和 `duration` 转换为更易读的 `Enum` 字段
|
||||
- 将 `is_new` 字段从单字符串(` Y`/`N`) 到 [UInt8](/docs/zh/sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64 -int128-int256) 字段为 0 或 1
|
||||
|
@ -7,6 +7,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
class IFunctionOverloadResolver;
|
||||
using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
|
||||
|
||||
@ -189,6 +194,11 @@ public:
|
||||
|
||||
DataTypePtr getResultType() const override
|
||||
{
|
||||
if (!result_type)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Function node with name '{}' is not resolved",
|
||||
function_name);
|
||||
|
||||
return result_type;
|
||||
}
|
||||
|
||||
|
@ -3983,7 +3983,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
const auto * constant_node = parameter_node->as<ConstantNode>();
|
||||
if (!constant_node)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Parameter for function {} expected to have constant value. Actual {}. In scope {}",
|
||||
"Parameter for function '{}' expected to have constant value. Actual {}. In scope {}",
|
||||
function_name,
|
||||
parameter_node->formatASTForErrorMessage(),
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
@ -4079,7 +4079,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
{
|
||||
auto & function_in_arguments_nodes = function_node.getArguments().getNodes();
|
||||
if (function_in_arguments_nodes.size() != 2)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} expects 2 arguments", function_name);
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function '{}' expects 2 arguments", function_name);
|
||||
|
||||
auto & in_second_argument = function_in_arguments_nodes[1];
|
||||
auto * table_node = in_second_argument->as<TableNode>();
|
||||
@ -4169,8 +4169,8 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
|
||||
if (!argument_column.type)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Function {} argument is not resolved. In scope {}",
|
||||
function_node.getFunctionName(),
|
||||
"Function '{}' argument is not resolved. In scope {}",
|
||||
function_name,
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
|
||||
const auto * constant_node = function_argument->as<ConstantNode>();
|
||||
@ -4220,7 +4220,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
auto * lambda_expression = lambda_expression_untyped->as<LambdaNode>();
|
||||
if (!lambda_expression)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Function identifier {} must be resolved as lambda. Actual {}. In scope {}",
|
||||
"Function identifier '{}' must be resolved as lambda. Actual {}. In scope {}",
|
||||
function_node.getFunctionName(),
|
||||
lambda_expression_untyped->formatASTForErrorMessage(),
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
@ -4253,7 +4253,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
const auto * tuple_data_type = typeid_cast<const DataTypeTuple *>(result_type.get());
|
||||
if (!tuple_data_type)
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Function untuple argument must be have compound type. Actual type {}. In scope {}",
|
||||
"Function 'untuple' argument must have compound type. Actual type {}. In scope {}",
|
||||
result_type->getName(),
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
|
||||
@ -4311,7 +4311,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
{
|
||||
if (!AggregateFunctionFactory::instance().isAggregateFunctionName(function_name))
|
||||
{
|
||||
std::string error_message = fmt::format("Aggregate function with name {} does not exists. In scope {}",
|
||||
std::string error_message = fmt::format("Aggregate function with name '{}' does not exists. In scope {}",
|
||||
function_name,
|
||||
scope.scope_node->formatASTForErrorMessage());
|
||||
|
||||
@ -4319,6 +4319,11 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
throw Exception(ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION, error_message);
|
||||
}
|
||||
|
||||
if (!function_lambda_arguments_indexes.empty())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Window function '{}' does not support lambda arguments",
|
||||
function_name);
|
||||
|
||||
AggregateFunctionProperties properties;
|
||||
auto aggregate_function = AggregateFunctionFactory::instance().get(function_name, argument_types, parameters, properties);
|
||||
|
||||
@ -4368,12 +4373,17 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
auto hints = name_prompter.getHints(function_name, possible_function_names);
|
||||
|
||||
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
|
||||
"Function with name {} does not exists. In scope {}{}",
|
||||
"Function with name '{}' does not exists. In scope {}{}",
|
||||
function_name,
|
||||
scope.scope_node->formatASTForErrorMessage(),
|
||||
getHintsErrorMessageSuffix(hints));
|
||||
}
|
||||
|
||||
if (!function_lambda_arguments_indexes.empty())
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Aggregate function '{}' does not support lambda arguments",
|
||||
function_name);
|
||||
|
||||
AggregateFunctionProperties properties;
|
||||
auto aggregate_function = AggregateFunctionFactory::instance().get(function_name, argument_types, parameters, properties);
|
||||
function_node.resolveAsAggregateFunction(aggregate_function, aggregate_function->getReturnType());
|
||||
@ -4404,7 +4414,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
const auto * function_data_type = typeid_cast<const DataTypeFunction *>(argument_types[function_lambda_argument_index].get());
|
||||
if (!function_data_type)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Function {} expected function data type for lambda argument with index {}. Actual {}. In scope {}",
|
||||
"Function '{}' expected function data type for lambda argument with index {}. Actual {}. In scope {}",
|
||||
function_name,
|
||||
function_lambda_argument_index,
|
||||
argument_types[function_lambda_argument_index]->getName(),
|
||||
@ -4414,7 +4424,7 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
|
||||
size_t function_data_type_arguments_size = function_data_type_argument_types.size();
|
||||
if (function_data_type_arguments_size != lambda_arguments_size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Function {} function data type for lambda argument with index {} arguments size mismatch. Actual {}. Expected {}. In scope {}",
|
||||
"Function '{}' function data type for lambda argument with index {} arguments size mismatch. Actual {}. Expected {}. In scope {}",
|
||||
function_name,
|
||||
function_data_type_arguments_size,
|
||||
lambda_arguments_size,
|
||||
|
@ -23,6 +23,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
@ -222,8 +223,21 @@ void BackupWriterS3::copyObjectMultipartImpl(
|
||||
|
||||
for (size_t part_number = 1; position < size; ++part_number)
|
||||
{
|
||||
/// Check that part number is not too big.
|
||||
if (part_number > request_settings.max_part_number)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER,
|
||||
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
|
||||
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_operation_copy_size = {}",
|
||||
request_settings.max_part_number, size, request_settings.min_upload_part_size, request_settings.max_upload_part_size,
|
||||
request_settings.upload_part_size_multiply_factor, request_settings.upload_part_size_multiply_parts_count_threshold,
|
||||
request_settings.max_single_operation_copy_size);
|
||||
}
|
||||
|
||||
size_t next_position = std::min(position + upload_part_size, size);
|
||||
|
||||
/// Make a copy request to copy a part.
|
||||
Aws::S3::Model::UploadPartCopyRequest part_request;
|
||||
part_request.SetCopySource(src_bucket + "/" + src_key);
|
||||
part_request.SetBucket(dst_bucket);
|
||||
@ -250,6 +264,7 @@ void BackupWriterS3::copyObjectMultipartImpl(
|
||||
|
||||
position = next_position;
|
||||
|
||||
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
|
||||
if (part_number % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
|
||||
{
|
||||
upload_part_size *= request_settings.upload_part_size_multiply_factor;
|
||||
|
@ -22,17 +22,29 @@ struct StringKey24
|
||||
inline StringRef ALWAYS_INLINE toStringRef(const StringKey8 & n)
|
||||
{
|
||||
assert(n != 0);
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
return {reinterpret_cast<const char *>(&n), 8ul - (std::countr_zero(n) >> 3)};
|
||||
#else
|
||||
return {reinterpret_cast<const char *>(&n), 8ul - (std::countl_zero(n) >> 3)};
|
||||
#endif
|
||||
}
|
||||
inline StringRef ALWAYS_INLINE toStringRef(const StringKey16 & n)
|
||||
{
|
||||
assert(n.items[1] != 0);
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
return {reinterpret_cast<const char *>(&n), 16ul - (std::countr_zero(n.items[1]) >> 3)};
|
||||
#else
|
||||
return {reinterpret_cast<const char *>(&n), 16ul - (std::countl_zero(n.items[1]) >> 3)};
|
||||
#endif
|
||||
}
|
||||
inline StringRef ALWAYS_INLINE toStringRef(const StringKey24 & n)
|
||||
{
|
||||
assert(n.c != 0);
|
||||
#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
|
||||
return {reinterpret_cast<const char *>(&n), 24ul - (std::countr_zero(n.c) >> 3)};
|
||||
#else
|
||||
return {reinterpret_cast<const char *>(&n), 24ul - (std::countl_zero(n.c) >> 3)};
|
||||
#endif
|
||||
}
|
||||
|
||||
struct StringHashTableHash
|
||||
@ -238,7 +250,6 @@ public:
|
||||
// 2. Use switch case extension to generate fast dispatching table
|
||||
// 3. Funcs are named callables that can be force_inlined
|
||||
//
|
||||
// NOTE: It relies on Little Endianness
|
||||
//
|
||||
// NOTE: It requires padded to 8 bytes keys (IOW you cannot pass
|
||||
// std::string here, but you can pass i.e. ColumnString::getDataAt()),
|
||||
@ -280,13 +291,19 @@ public:
|
||||
if ((reinterpret_cast<uintptr_t>(p) & 2048) == 0)
|
||||
{
|
||||
memcpy(&n[0], p, 8);
|
||||
if constexpr (std::endian::native == std::endian::little)
|
||||
n[0] &= -1ULL >> s;
|
||||
else
|
||||
n[0] &= -1ULL << s;
|
||||
}
|
||||
else
|
||||
{
|
||||
const char * lp = x.data + x.size - 8;
|
||||
memcpy(&n[0], lp, 8);
|
||||
if constexpr (std::endian::native == std::endian::little)
|
||||
n[0] >>= s;
|
||||
else
|
||||
n[0] <<= s;
|
||||
}
|
||||
keyHolderDiscardKey(key_holder);
|
||||
return func(self.m1, k8, hash(k8));
|
||||
@ -296,7 +313,10 @@ public:
|
||||
memcpy(&n[0], p, 8);
|
||||
const char * lp = x.data + x.size - 8;
|
||||
memcpy(&n[1], lp, 8);
|
||||
if constexpr (std::endian::native == std::endian::little)
|
||||
n[1] >>= s;
|
||||
else
|
||||
n[1] <<= s;
|
||||
keyHolderDiscardKey(key_holder);
|
||||
return func(self.m2, k16, hash(k16));
|
||||
}
|
||||
@ -305,7 +325,10 @@ public:
|
||||
memcpy(&n[0], p, 16);
|
||||
const char * lp = x.data + x.size - 8;
|
||||
memcpy(&n[2], lp, 8);
|
||||
if constexpr (std::endian::native == std::endian::little)
|
||||
n[2] >>= s;
|
||||
else
|
||||
n[2] <<= s;
|
||||
keyHolderDiscardKey(key_holder);
|
||||
return func(self.m3, k24, hash(k24));
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ private:
|
||||
int fd;
|
||||
#endif
|
||||
#if defined(OS_FREEBSD)
|
||||
int pagesize;
|
||||
size_t pagesize;
|
||||
pid_t self;
|
||||
#endif
|
||||
};
|
||||
|
@ -136,7 +136,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
|
||||
return;
|
||||
|
||||
S3Settings::RequestSettings request_settings_1;
|
||||
request_settings_1.upload_part_size_multiply_parts_count_threshold = 10000;
|
||||
request_settings_1.setEmptyFieldsByDefault();
|
||||
|
||||
const auto create_writer = [&](const auto & key)
|
||||
{
|
||||
|
@ -37,8 +37,10 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
|
||||
S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", settings.s3_max_single_read_retries);
|
||||
request_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", settings.s3_min_upload_part_size);
|
||||
request_settings.max_upload_part_size = config.getUInt64(config_prefix + ".s3_max_upload_part_size", S3Settings::RequestSettings::DEFAULT_MAX_UPLOAD_PART_SIZE);
|
||||
request_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", settings.s3_upload_part_size_multiply_factor);
|
||||
request_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", settings.s3_upload_part_size_multiply_parts_count_threshold);
|
||||
request_settings.max_part_number = config.getUInt64(config_prefix + ".s3_max_part_number", S3Settings::RequestSettings::DEFAULT_MAX_PART_NUMBER);
|
||||
request_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", settings.s3_max_single_part_upload_size);
|
||||
request_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", settings.s3_check_objects_after_upload);
|
||||
request_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", settings.s3_max_unexpected_write_error_retries);
|
||||
|
@ -124,12 +124,12 @@ int io_submit(int ctx, long nr, struct iocb * iocbpp[])
|
||||
}
|
||||
}
|
||||
|
||||
return nr;
|
||||
return static_cast<int>(nr);
|
||||
}
|
||||
|
||||
int io_getevents(int ctx, long, long max_nr, struct kevent * events, struct timespec * timeout)
|
||||
{
|
||||
return kevent(ctx, nullptr, 0, events, max_nr, timeout);
|
||||
return kevent(ctx, nullptr, 0, events, static_cast<int>(max_nr), timeout);
|
||||
}
|
||||
|
||||
|
||||
|
@ -250,7 +250,7 @@ size_t ReadBufferFromS3::getFileSize()
|
||||
if (file_size)
|
||||
return *file_size;
|
||||
|
||||
auto object_size = S3::getObjectSize(client_ptr, bucket, key, version_id, true, read_settings.for_object_storage);
|
||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, true, read_settings.for_object_storage);
|
||||
|
||||
file_size = object_size;
|
||||
return *file_size;
|
||||
|
@ -852,7 +852,7 @@ namespace S3
|
||||
}
|
||||
|
||||
|
||||
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
||||
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3HeadObject);
|
||||
if (for_disk_s3)
|
||||
@ -865,7 +865,7 @@ namespace S3
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
Aws::S3::Model::HeadObjectOutcome outcome = client_ptr->HeadObject(req);
|
||||
Aws::S3::Model::HeadObjectOutcome outcome = client.HeadObject(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
@ -879,9 +879,9 @@ namespace S3
|
||||
return {};
|
||||
}
|
||||
|
||||
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
||||
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
||||
{
|
||||
return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error, for_disk_s3).size;
|
||||
return getObjectInfo(client, bucket, key, version_id, throw_on_error, for_disk_s3).size;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -130,9 +130,9 @@ struct ObjectInfo
|
||||
time_t last_modification_time = 0;
|
||||
};
|
||||
|
||||
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
|
||||
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
|
||||
|
||||
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
|
||||
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
|
||||
|
||||
}
|
||||
#endif
|
||||
|
@ -50,6 +50,7 @@ const int S3_WARN_MAX_PARTS = 10000;
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
}
|
||||
|
||||
struct WriteBufferFromS3::UploadPartTask
|
||||
@ -122,12 +123,6 @@ void WriteBufferFromS3::nextImpl()
|
||||
|
||||
void WriteBufferFromS3::allocateBuffer()
|
||||
{
|
||||
if (total_parts_uploaded != 0 && total_parts_uploaded % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
|
||||
{
|
||||
upload_part_size *= request_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
|
||||
}
|
||||
|
||||
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
|
||||
temporary_buffer->exceptions(std::ios::badbit);
|
||||
last_part_size = 0;
|
||||
@ -257,13 +252,10 @@ void WriteBufferFromS3::writePart()
|
||||
{
|
||||
UploadPartTask * task = nullptr;
|
||||
|
||||
int part_number;
|
||||
{
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
|
||||
task = &upload_object_tasks.emplace_back();
|
||||
++num_added_bg_tasks;
|
||||
part_number = num_added_bg_tasks;
|
||||
}
|
||||
|
||||
/// Notify waiting thread when task finished
|
||||
@ -281,7 +273,7 @@ void WriteBufferFromS3::writePart()
|
||||
|
||||
try
|
||||
{
|
||||
fillUploadRequest(task->req, part_number);
|
||||
fillUploadRequest(task->req);
|
||||
|
||||
schedule([this, task, task_finish_notify]()
|
||||
{
|
||||
@ -308,23 +300,44 @@ void WriteBufferFromS3::writePart()
|
||||
UploadPartTask task;
|
||||
auto & tags = TSA_SUPPRESS_WARNING_FOR_WRITE(part_tags); /// Suppress warning because schedule == false.
|
||||
|
||||
fillUploadRequest(task.req, static_cast<int>(tags.size() + 1));
|
||||
fillUploadRequest(task.req);
|
||||
processUploadRequest(task);
|
||||
tags.push_back(task.tag);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number)
|
||||
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req)
|
||||
{
|
||||
/// Increase part number.
|
||||
++part_number;
|
||||
if (!multipart_upload_id.empty() && (part_number > request_settings.max_part_number))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER,
|
||||
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
|
||||
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}",
|
||||
request_settings.max_part_number, count(), request_settings.min_upload_part_size, request_settings.max_upload_part_size,
|
||||
request_settings.upload_part_size_multiply_factor, request_settings.upload_part_size_multiply_parts_count_threshold,
|
||||
request_settings.max_single_part_upload_size);
|
||||
}
|
||||
|
||||
/// Setup request.
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
req.SetPartNumber(part_number);
|
||||
req.SetPartNumber(static_cast<int>(part_number));
|
||||
req.SetUploadId(multipart_upload_id);
|
||||
req.SetContentLength(temporary_buffer->tellp());
|
||||
req.SetBody(temporary_buffer);
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
req.SetContentType("binary/octet-stream");
|
||||
|
||||
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
|
||||
if (!multipart_upload_id.empty() && (part_number % request_settings.upload_part_size_multiply_parts_count_threshold == 0))
|
||||
{
|
||||
upload_part_size *= request_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
|
||||
@ -343,8 +356,6 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
|
||||
}
|
||||
else
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
|
||||
total_parts_uploaded++;
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::completeMultipartUpload()
|
||||
|
@ -75,7 +75,7 @@ private:
|
||||
void finalizeImpl() override;
|
||||
|
||||
struct UploadPartTask;
|
||||
void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number);
|
||||
void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req);
|
||||
void processUploadRequest(UploadPartTask & task);
|
||||
|
||||
struct PutObjectTask;
|
||||
@ -95,7 +95,7 @@ private:
|
||||
size_t upload_part_size = 0;
|
||||
std::shared_ptr<Aws::StringStream> temporary_buffer; /// Buffer to accumulate data.
|
||||
size_t last_part_size = 0;
|
||||
std::atomic<size_t> total_parts_uploaded = 0;
|
||||
size_t part_number = 0;
|
||||
|
||||
/// Upload in S3 is made in parts.
|
||||
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
|
||||
|
@ -203,7 +203,7 @@ void ClientInfo::setInitialQuery()
|
||||
void ClientInfo::fillOSUserHostNameAndVersionInfo()
|
||||
{
|
||||
os_user.resize(256, '\0');
|
||||
if (0 == getlogin_r(os_user.data(), os_user.size() - 1))
|
||||
if (0 == getlogin_r(os_user.data(), static_cast<int>(os_user.size() - 1)))
|
||||
os_user.resize(strlen(os_user.c_str()));
|
||||
else
|
||||
os_user.clear(); /// Don't mind if we cannot determine user login.
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Core/Names.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <QueryPipeline/PipelineResourcesHolder.h>
|
||||
#include <QueryPipeline/QueryPlanResourceHolder.h>
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <QueryPipeline/PipelineResourcesHolder.h>
|
||||
#include <QueryPipeline/QueryPlanResourceHolder.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <QueryPipeline/PipelineResourcesHolder.h>
|
||||
#include <QueryPipeline/QueryPlanResourceHolder.h>
|
||||
#include <QueryPipeline/Chain.h>
|
||||
#include <QueryPipeline/SizeLimits.h>
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
#pragma once
|
||||
#include <QueryPipeline/PipelineResourcesHolder.h>
|
||||
#include <QueryPipeline/QueryPlanResourceHolder.h>
|
||||
#include <QueryPipeline/SizeLimits.h>
|
||||
#include <QueryPipeline/StreamLocalLimits.h>
|
||||
#include <functional>
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include <QueryPipeline/PipelineResourcesHolder.h>
|
||||
#include <QueryPipeline/QueryPlanResourceHolder.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/QueryIdHolder.h>
|
||||
|
@ -21,16 +21,12 @@ limitations under the License. */
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using Time = std::chrono::time_point<std::chrono::system_clock>;
|
||||
using Seconds = std::chrono::seconds;
|
||||
using MilliSeconds = std::chrono::milliseconds;
|
||||
|
||||
|
||||
struct BlocksMetadata
|
||||
{
|
||||
String hash;
|
||||
UInt64 version;
|
||||
Time time;
|
||||
std::chrono::time_point<std::chrono::system_clock> time;
|
||||
};
|
||||
|
||||
struct MergeableBlocks
|
||||
@ -54,6 +50,10 @@ friend class LiveViewSource;
|
||||
friend class LiveViewEventsSource;
|
||||
friend class LiveViewSink;
|
||||
|
||||
using Time = std::chrono::time_point<std::chrono::system_clock>;
|
||||
using Seconds = std::chrono::seconds;
|
||||
using MilliSeconds = std::chrono::milliseconds;
|
||||
|
||||
public:
|
||||
StorageLiveView(
|
||||
const StorageID & table_id_,
|
||||
|
@ -4525,6 +4525,7 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
|
||||
auto read_buffer = backup_entry->getReadBuffer();
|
||||
auto write_buffer = disk->writeFile(temp_part_dir / filename);
|
||||
copyData(*read_buffer, *write_buffer);
|
||||
write_buffer->finalize();
|
||||
reservation->update(reservation->getSize() - backup_entry->getSize());
|
||||
}
|
||||
|
||||
|
53
src/Storages/ReadFromStorageProgress.cpp
Normal file
53
src/Storages/ReadFromStorageProgress.cpp
Normal file
@ -0,0 +1,53 @@
|
||||
#include <Storages/ReadFromStorageProgress.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <QueryPipeline/StreamLocalLimits.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void updateRowsProgressApprox(
|
||||
ISource & source,
|
||||
const Chunk & chunk,
|
||||
UInt64 total_result_size,
|
||||
UInt64 & total_rows_approx_accumulated,
|
||||
size_t & total_rows_count_times,
|
||||
UInt64 & total_rows_approx_max)
|
||||
{
|
||||
if (!total_result_size)
|
||||
return;
|
||||
|
||||
const size_t num_rows = chunk.getNumRows();
|
||||
|
||||
if (!num_rows)
|
||||
return;
|
||||
|
||||
const auto progress = source.getReadProgress();
|
||||
if (progress && !progress->limits.empty())
|
||||
{
|
||||
for (const auto & limit : progress->limits)
|
||||
{
|
||||
if (limit.leaf_limits.max_rows || limit.leaf_limits.max_bytes
|
||||
|| limit.local_limits.size_limits.max_rows || limit.local_limits.size_limits.max_bytes)
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const auto bytes_per_row = std::ceil(static_cast<double>(chunk.bytes()) / num_rows);
|
||||
size_t total_rows_approx = static_cast<size_t>(std::ceil(static_cast<double>(total_result_size) / bytes_per_row));
|
||||
total_rows_approx_accumulated += total_rows_approx;
|
||||
++total_rows_count_times;
|
||||
total_rows_approx = total_rows_approx_accumulated / total_rows_count_times;
|
||||
|
||||
/// We need to add diff, because total_rows_approx is incremental value.
|
||||
/// It would be more correct to send total_rows_approx as is (not a diff),
|
||||
/// but incrementation of total_rows_to_read does not allow that.
|
||||
/// A new counter can be introduced for that to be sent to client, but it does not worth it.
|
||||
if (total_rows_approx > total_rows_approx_max)
|
||||
{
|
||||
size_t diff = total_rows_approx - total_rows_approx_max;
|
||||
source.addTotalRowsApprox(diff);
|
||||
total_rows_approx_max = total_rows_approx;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
18
src/Storages/ReadFromStorageProgress.h
Normal file
18
src/Storages/ReadFromStorageProgress.h
Normal file
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ISource;
|
||||
class Chunk;
|
||||
|
||||
void updateRowsProgressApprox(
|
||||
ISource & source,
|
||||
const Chunk & chunk,
|
||||
UInt64 total_result_size,
|
||||
UInt64 & total_rows_approx_accumulated,
|
||||
size_t & total_rows_count_times,
|
||||
UInt64 & total_rows_approx_max);
|
||||
|
||||
}
|
@ -5,6 +5,7 @@
|
||||
#include <Storages/PartitionedSink.h>
|
||||
#include <Storages/Distributed/DirectoryMonitor.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/ReadFromStorageProgress.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
@ -592,22 +593,8 @@ public:
|
||||
|
||||
if (num_rows)
|
||||
{
|
||||
auto bytes_per_row = std::ceil(static_cast<double>(chunk.bytes()) / num_rows);
|
||||
size_t total_rows_approx = static_cast<size_t>(std::ceil(static_cast<double>(files_info->total_bytes_to_read) / bytes_per_row));
|
||||
total_rows_approx_accumulated += total_rows_approx;
|
||||
++total_rows_count_times;
|
||||
total_rows_approx = total_rows_approx_accumulated / total_rows_count_times;
|
||||
|
||||
/// We need to add diff, because total_rows_approx is incremental value.
|
||||
/// It would be more correct to send total_rows_approx as is (not a diff),
|
||||
/// but incrementation of total_rows_to_read does not allow that.
|
||||
/// A new field can be introduces for that to be sent to client, but it does not worth it.
|
||||
if (total_rows_approx > total_rows_approx_prev)
|
||||
{
|
||||
size_t diff = total_rows_approx - total_rows_approx_prev;
|
||||
addTotalRowsApprox(diff);
|
||||
total_rows_approx_prev = total_rows_approx;
|
||||
}
|
||||
updateRowsProgressApprox(
|
||||
*this, chunk, files_info->total_bytes_to_read, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
|
||||
}
|
||||
return chunk;
|
||||
}
|
||||
@ -648,7 +635,7 @@ private:
|
||||
|
||||
UInt64 total_rows_approx_accumulated = 0;
|
||||
size_t total_rows_count_times = 0;
|
||||
UInt64 total_rows_approx_prev = 0;
|
||||
UInt64 total_rows_approx_max = 0;
|
||||
};
|
||||
|
||||
|
||||
|
@ -1032,6 +1032,7 @@ void StorageLog::restoreDataImpl(const BackupPtr & backup, const String & data_p
|
||||
auto in = backup_entry->getReadBuffer();
|
||||
auto out = disk->writeFile(data_file.path, max_compress_block_size, WriteMode::Append);
|
||||
copyData(*in, *out);
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
if (use_marks_file)
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include <Storages/getVirtualsForStorage.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Storages/ReadFromStorageProgress.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
@ -153,6 +154,11 @@ public:
|
||||
return nextAssumeLocked();
|
||||
}
|
||||
|
||||
size_t getTotalSize() const
|
||||
{
|
||||
return total_size;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
String nextAssumeLocked()
|
||||
@ -198,19 +204,27 @@ private:
|
||||
if (block.has("_file"))
|
||||
file_column = block.getByName("_file").column->assumeMutable();
|
||||
|
||||
for (const auto & row : result_batch)
|
||||
std::unordered_map<String, S3::ObjectInfo> all_object_infos;
|
||||
for (const auto & key_info : result_batch)
|
||||
{
|
||||
const String & key = row.GetKey();
|
||||
const String & key = key_info.GetKey();
|
||||
if (recursive || re2::RE2::FullMatch(key, *matcher))
|
||||
{
|
||||
String path = fs::path(globbed_uri.bucket) / key;
|
||||
if (object_infos)
|
||||
(*object_infos)[path] = {.size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000};
|
||||
String file = path.substr(path.find_last_of('/') + 1);
|
||||
const size_t key_size = key_info.GetSize();
|
||||
|
||||
all_object_infos.emplace(path, S3::ObjectInfo{.size = key_size, .last_modification_time = key_info.GetLastModified().Millis() / 1000});
|
||||
|
||||
if (path_column)
|
||||
{
|
||||
path_column->insert(path);
|
||||
}
|
||||
if (file_column)
|
||||
{
|
||||
String file = path.substr(path.find_last_of('/') + 1);
|
||||
file_column->insert(file);
|
||||
}
|
||||
|
||||
key_column->insert(key);
|
||||
}
|
||||
}
|
||||
@ -220,18 +234,37 @@ private:
|
||||
size_t rows = block.rows();
|
||||
buffer.reserve(rows);
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
buffer.emplace_back(keys.getDataAt(i).toString());
|
||||
{
|
||||
auto key = keys.getDataAt(i).toString();
|
||||
std::string path = fs::path(globbed_uri.bucket) / key;
|
||||
|
||||
const auto & object_info = all_object_infos.at(path);
|
||||
total_size += object_info.size;
|
||||
if (object_infos)
|
||||
object_infos->emplace(path, object_info);
|
||||
|
||||
buffer.emplace_back(key);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer.reserve(result_batch.size());
|
||||
for (const auto & row : result_batch)
|
||||
for (const auto & key_info : result_batch)
|
||||
{
|
||||
String key = row.GetKey();
|
||||
String key = key_info.GetKey();
|
||||
if (recursive || re2::RE2::FullMatch(key, *matcher))
|
||||
{
|
||||
const size_t key_size = key_info.GetSize();
|
||||
total_size += key_size;
|
||||
if (object_infos)
|
||||
{
|
||||
const std::string path = fs::path(globbed_uri.bucket) / key;
|
||||
(*object_infos)[path] = {.size = key_size, .last_modification_time = key_info.GetLastModified().Millis() / 1000};
|
||||
}
|
||||
buffer.emplace_back(std::move(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Set iterator only after the whole batch is processed
|
||||
buffer_iter = buffer.begin();
|
||||
@ -261,6 +294,7 @@ private:
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos;
|
||||
Strings * read_keys;
|
||||
S3Settings::RequestSettings request_settings;
|
||||
size_t total_size = 0;
|
||||
};
|
||||
|
||||
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
|
||||
@ -281,12 +315,28 @@ String StorageS3Source::DisclosedGlobIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
size_t StorageS3Source::DisclosedGlobIterator::getTotalSize() const
|
||||
{
|
||||
return pimpl->getTotalSize();
|
||||
}
|
||||
|
||||
class StorageS3Source::KeysIterator::Impl : WithContext
|
||||
{
|
||||
public:
|
||||
explicit Impl(
|
||||
const std::vector<String> & keys_, const String & bucket_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_)
|
||||
: WithContext(context_), keys(keys_), bucket(bucket_), query(query_), virtual_header(virtual_header_)
|
||||
const Aws::S3::S3Client & client_,
|
||||
const std::string & version_id_,
|
||||
const std::vector<String> & keys_,
|
||||
const String & bucket_,
|
||||
ASTPtr query_,
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos_)
|
||||
: WithContext(context_)
|
||||
, keys(keys_)
|
||||
, bucket(bucket_)
|
||||
, query(query_)
|
||||
, virtual_header(virtual_header_)
|
||||
{
|
||||
/// Create a virtual block with one row to construct filter
|
||||
if (query && virtual_header)
|
||||
@ -316,14 +366,28 @@ public:
|
||||
if (block.has("_file"))
|
||||
file_column = block.getByName("_file").column->assumeMutable();
|
||||
|
||||
std::unordered_map<String, S3::ObjectInfo> all_object_infos;
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
String path = fs::path(bucket) / key;
|
||||
String file = path.substr(path.find_last_of('/') + 1);
|
||||
const String path = fs::path(bucket) / key;
|
||||
|
||||
/// To avoid extra requests update total_size only if object_infos != nullptr
|
||||
/// (which means we eventually need this info anyway, so it should be ok to do it now).
|
||||
if (object_infos_)
|
||||
{
|
||||
auto key_info = S3::getObjectInfo(client_, bucket, key, version_id_, true, false);
|
||||
all_object_infos.emplace(path, S3::ObjectInfo{.size = key_info.size, .last_modification_time = key_info.last_modification_time});
|
||||
}
|
||||
|
||||
if (path_column)
|
||||
{
|
||||
path_column->insert(path);
|
||||
}
|
||||
if (file_column)
|
||||
{
|
||||
const String file = path.substr(path.find_last_of('/') + 1);
|
||||
file_column->insert(file);
|
||||
}
|
||||
key_column->insert(key);
|
||||
}
|
||||
|
||||
@ -333,7 +397,19 @@ public:
|
||||
Strings filtered_keys;
|
||||
filtered_keys.reserve(rows);
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
filtered_keys.emplace_back(keys_col.getDataAt(i).toString());
|
||||
{
|
||||
auto key = keys_col.getDataAt(i).toString();
|
||||
|
||||
if (object_infos_)
|
||||
{
|
||||
std::string path = fs::path(bucket) / key;
|
||||
const auto & object_info = all_object_infos.at(path);
|
||||
total_size += object_info.size;
|
||||
object_infos_->emplace(path, object_info);
|
||||
}
|
||||
|
||||
filtered_keys.emplace_back(key);
|
||||
}
|
||||
|
||||
keys = std::move(filtered_keys);
|
||||
}
|
||||
@ -348,6 +424,11 @@ public:
|
||||
return keys[current_index];
|
||||
}
|
||||
|
||||
size_t getTotalSize() const
|
||||
{
|
||||
return total_size;
|
||||
}
|
||||
|
||||
private:
|
||||
Strings keys;
|
||||
std::atomic_size_t index = 0;
|
||||
@ -355,11 +436,21 @@ private:
|
||||
String bucket;
|
||||
ASTPtr query;
|
||||
Block virtual_header;
|
||||
|
||||
size_t total_size = 0;
|
||||
};
|
||||
|
||||
StorageS3Source::KeysIterator::KeysIterator(
|
||||
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context)
|
||||
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_, bucket_, query, virtual_header, context))
|
||||
const Aws::S3::S3Client & client_,
|
||||
const std::string & version_id_,
|
||||
const std::vector<String> & keys_,
|
||||
const String & bucket_,
|
||||
ASTPtr query,
|
||||
const Block & virtual_header,
|
||||
ContextPtr context,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos_)
|
||||
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
|
||||
client_, version_id_, keys_, bucket_, query, virtual_header, context, object_infos_))
|
||||
{
|
||||
}
|
||||
|
||||
@ -368,6 +459,11 @@ String StorageS3Source::KeysIterator::next()
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
size_t StorageS3Source::KeysIterator::getTotalSize() const
|
||||
{
|
||||
return pimpl->getTotalSize();
|
||||
}
|
||||
|
||||
Block StorageS3Source::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
@ -390,7 +486,7 @@ StorageS3Source::StorageS3Source(
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & client_,
|
||||
const String & bucket_,
|
||||
const String & version_id_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
std::shared_ptr<IIterator> file_iterator_,
|
||||
const size_t download_thread_num_,
|
||||
const std::unordered_map<String, S3::ObjectInfo> & object_infos_)
|
||||
: ISource(getHeader(sample_block_, requested_virtual_columns_))
|
||||
@ -459,7 +555,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
|
||||
if (it != object_infos.end())
|
||||
object_size = it->second.size;
|
||||
else
|
||||
object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false, false);
|
||||
object_size = DB::S3::getObjectSize(*client, bucket, key, version_id, false, false);
|
||||
|
||||
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
|
||||
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
|
||||
@ -503,6 +599,13 @@ Chunk StorageS3Source::generate()
|
||||
{
|
||||
UInt64 num_rows = chunk.getNumRows();
|
||||
|
||||
auto it = object_infos.find(file_path);
|
||||
if (num_rows && it != object_infos.end())
|
||||
{
|
||||
updateRowsProgressApprox(
|
||||
*this, chunk, file_iterator->getTotalSize(), total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max);
|
||||
}
|
||||
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
{
|
||||
if (virtual_column.name == "_path")
|
||||
@ -797,7 +900,7 @@ StorageS3::StorageS3(
|
||||
virtual_block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
|
||||
std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
|
||||
const S3Configuration & s3_configuration,
|
||||
const std::vector<String> & keys,
|
||||
bool is_key_with_globs,
|
||||
@ -810,25 +913,22 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
|
||||
{
|
||||
if (distributed_processing)
|
||||
{
|
||||
return std::make_shared<StorageS3Source::IteratorWrapper>(
|
||||
[callback = local_context->getReadTaskCallback()]() -> String {
|
||||
return callback();
|
||||
});
|
||||
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback());
|
||||
}
|
||||
else if (is_key_with_globs)
|
||||
{
|
||||
/// Iterate through disclosed globs and make a source for each file
|
||||
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
|
||||
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys, s3_configuration.request_settings);
|
||||
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
|
||||
return std::make_shared<StorageS3Source::DisclosedGlobIterator>(
|
||||
*s3_configuration.client, s3_configuration.uri, query, virtual_block,
|
||||
local_context, object_infos, read_keys, s3_configuration.request_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto keys_iterator
|
||||
= std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context);
|
||||
if (read_keys)
|
||||
*read_keys = keys;
|
||||
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); });
|
||||
|
||||
return std::make_shared<StorageS3Source::KeysIterator>(
|
||||
*s3_configuration.client, s3_configuration.uri.version_id, keys, s3_configuration.uri.bucket, query, virtual_block, local_context, object_infos);
|
||||
}
|
||||
}
|
||||
|
||||
@ -869,7 +969,7 @@ Pipe StorageS3::read(
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(
|
||||
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
|
||||
s3_configuration,
|
||||
keys,
|
||||
is_key_with_globs,
|
||||
@ -1369,7 +1469,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
|
||||
/// Note that in case of exception in getObjectInfo returned info will be empty,
|
||||
/// but schema cache will handle this case and won't return columns from cache
|
||||
/// because we can't say that it's valid without last modification time.
|
||||
info = S3::getObjectInfo(s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false, false);
|
||||
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false, false);
|
||||
if (object_infos)
|
||||
(*object_infos)[path] = info;
|
||||
}
|
||||
|
@ -33,7 +33,17 @@ class StorageS3SequentialSource;
|
||||
class StorageS3Source : public ISource, WithContext
|
||||
{
|
||||
public:
|
||||
class DisclosedGlobIterator
|
||||
class IIterator
|
||||
{
|
||||
public:
|
||||
virtual ~IIterator() = default;
|
||||
virtual String next() = 0;
|
||||
virtual size_t getTotalSize() const = 0;
|
||||
|
||||
String operator ()() { return next(); }
|
||||
};
|
||||
|
||||
class DisclosedGlobIterator : public IIterator
|
||||
{
|
||||
public:
|
||||
DisclosedGlobIterator(
|
||||
@ -46,7 +56,9 @@ public:
|
||||
Strings * read_keys_ = nullptr,
|
||||
const S3Settings::RequestSettings & request_settings_ = {});
|
||||
|
||||
String next();
|
||||
String next() override;
|
||||
|
||||
size_t getTotalSize() const override;
|
||||
|
||||
private:
|
||||
class Impl;
|
||||
@ -54,12 +66,22 @@ public:
|
||||
std::shared_ptr<Impl> pimpl;
|
||||
};
|
||||
|
||||
class KeysIterator
|
||||
class KeysIterator : public IIterator
|
||||
{
|
||||
public:
|
||||
explicit KeysIterator(
|
||||
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context);
|
||||
String next();
|
||||
const Aws::S3::S3Client & client_,
|
||||
const std::string & version_id_,
|
||||
const std::vector<String> & keys_,
|
||||
const String & bucket_,
|
||||
ASTPtr query,
|
||||
const Block & virtual_header,
|
||||
ContextPtr context,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
|
||||
|
||||
String next() override;
|
||||
|
||||
size_t getTotalSize() const override;
|
||||
|
||||
private:
|
||||
class Impl;
|
||||
@ -67,7 +89,18 @@ public:
|
||||
std::shared_ptr<Impl> pimpl;
|
||||
};
|
||||
|
||||
using IteratorWrapper = std::function<String()>;
|
||||
class ReadTaskIterator : public IIterator
|
||||
{
|
||||
public:
|
||||
explicit ReadTaskIterator(const ReadTaskCallback & callback_) : callback(callback_) {}
|
||||
|
||||
String next() override { return callback(); }
|
||||
|
||||
size_t getTotalSize() const override { return 0; }
|
||||
|
||||
private:
|
||||
ReadTaskCallback callback;
|
||||
};
|
||||
|
||||
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
|
||||
|
||||
@ -85,7 +118,7 @@ public:
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & client_,
|
||||
const String & bucket,
|
||||
const String & version_id,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
std::shared_ptr<IIterator> file_iterator_,
|
||||
size_t download_thread_num,
|
||||
const std::unordered_map<String, S3::ObjectInfo> & object_infos_);
|
||||
|
||||
@ -116,11 +149,15 @@ private:
|
||||
/// onCancel and generate can be called concurrently
|
||||
std::mutex reader_mutex;
|
||||
std::vector<NameAndTypePair> requested_virtual_columns;
|
||||
std::shared_ptr<IteratorWrapper> file_iterator;
|
||||
std::shared_ptr<IIterator> file_iterator;
|
||||
size_t download_thread_num = 1;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
|
||||
|
||||
UInt64 total_rows_approx_max = 0;
|
||||
size_t total_rows_count_times = 0;
|
||||
UInt64 total_rows_approx_accumulated = 0;
|
||||
|
||||
std::unordered_map<String, S3::ObjectInfo> object_infos;
|
||||
|
||||
/// Recreate ReadBuffer and Pipeline for each file.
|
||||
@ -233,7 +270,7 @@ private:
|
||||
|
||||
static void updateS3Configuration(ContextPtr, S3Configuration &);
|
||||
|
||||
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(
|
||||
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
|
||||
const S3Configuration & s3_configuration,
|
||||
const std::vector<String> & keys,
|
||||
bool is_key_with_globs,
|
||||
|
@ -102,7 +102,7 @@ Pipe StorageS3Cluster::read(
|
||||
|
||||
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
|
||||
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
|
||||
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
|
||||
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next(); });
|
||||
|
||||
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
|
||||
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
|
||||
|
@ -6,23 +6,12 @@
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <base/unit.h>
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
/// An object up to 5 GB can be copied in a single atomic operation.
|
||||
constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5_GiB;
|
||||
|
||||
/// The maximum size of an uploaded part.
|
||||
constexpr UInt64 DEFAULT_MAX_UPLOAD_PART_SIZE = 5_GiB;
|
||||
}
|
||||
|
||||
|
||||
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -61,11 +50,12 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
|
||||
S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
|
||||
request_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
|
||||
request_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
|
||||
request_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, S3Settings::RequestSettings::DEFAULT_MAX_UPLOAD_PART_SIZE);
|
||||
request_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
|
||||
request_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
|
||||
request_settings.max_part_number = get_uint_for_key(key, "max_part_number", true, S3Settings::RequestSettings::DEFAULT_MAX_PART_NUMBER);
|
||||
request_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
|
||||
request_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
|
||||
request_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, S3Settings::RequestSettings::DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
|
||||
request_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
|
||||
request_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
|
||||
|
||||
@ -128,6 +118,8 @@ void S3Settings::RequestSettings::updateFromSettingsIfEmpty(const Settings & set
|
||||
upload_part_size_multiply_factor = settings.s3_upload_part_size_multiply_factor;
|
||||
if (!upload_part_size_multiply_parts_count_threshold)
|
||||
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
|
||||
if (!max_part_number)
|
||||
max_part_number = DEFAULT_MAX_PART_NUMBER;
|
||||
if (!max_single_part_upload_size)
|
||||
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
|
||||
if (!max_single_operation_copy_size)
|
||||
|
@ -31,6 +31,7 @@ struct S3Settings
|
||||
size_t max_upload_part_size = 0;
|
||||
size_t upload_part_size_multiply_factor = 0;
|
||||
size_t upload_part_size_multiply_parts_count_threshold = 0;
|
||||
size_t max_part_number = 0;
|
||||
size_t max_single_part_upload_size = 0;
|
||||
size_t max_single_operation_copy_size = 0;
|
||||
size_t max_connections = 0;
|
||||
@ -49,6 +50,7 @@ struct S3Settings
|
||||
&& max_upload_part_size == other.max_upload_part_size
|
||||
&& upload_part_size_multiply_factor == other.upload_part_size_multiply_factor
|
||||
&& upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold
|
||||
&& max_part_number == other.max_part_number
|
||||
&& max_single_part_upload_size == other.max_single_part_upload_size
|
||||
&& max_single_operation_copy_size == other.max_single_operation_copy_size
|
||||
&& max_connections == other.max_connections
|
||||
@ -58,6 +60,18 @@ struct S3Settings
|
||||
&& put_request_throttler == other.put_request_throttler;
|
||||
}
|
||||
|
||||
static const constexpr UInt64 DEFAULT_SINGLE_READ_RETRIES = 4;
|
||||
static const constexpr UInt64 DEFAULT_MIN_UPLOAD_PART_SIZE = 16 * 1024 * 1024;
|
||||
static const constexpr UInt64 DEFAULT_MAX_UPLOAD_PART_SIZE = 5ULL * 1024 * 1024 * 1024;
|
||||
static const constexpr UInt64 DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR = 2;
|
||||
static const constexpr UInt64 DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD = 500;
|
||||
static const constexpr UInt64 DEFAULT_MAX_PART_NUMBER = 10000;
|
||||
static const constexpr UInt64 DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE = 32 * 1024 * 1024;
|
||||
static const constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5ULL * 1024 * 1024 * 1024;
|
||||
static const constexpr UInt64 DEFAULT_MAX_CONNECTIONS = 1024;
|
||||
static const constexpr UInt64 DEFAULT_MAX_UNEXPECTED_WRITE_ERRORS_RETRIES = 4;
|
||||
|
||||
void setEmptyFieldsByDefault();
|
||||
void updateFromSettingsIfEmpty(const Settings & settings);
|
||||
};
|
||||
|
||||
@ -83,4 +97,28 @@ private:
|
||||
std::map<const String, const S3Settings> s3_settings;
|
||||
};
|
||||
|
||||
inline void S3Settings::RequestSettings::setEmptyFieldsByDefault()
|
||||
{
|
||||
if (!max_single_read_retries)
|
||||
max_single_read_retries = DEFAULT_SINGLE_READ_RETRIES;
|
||||
if (!min_upload_part_size)
|
||||
min_upload_part_size = DEFAULT_MIN_UPLOAD_PART_SIZE;
|
||||
if (!max_upload_part_size)
|
||||
max_upload_part_size = DEFAULT_MAX_UPLOAD_PART_SIZE;
|
||||
if (!upload_part_size_multiply_factor)
|
||||
upload_part_size_multiply_factor = DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_FACTOR;
|
||||
if (!upload_part_size_multiply_parts_count_threshold)
|
||||
upload_part_size_multiply_parts_count_threshold = DEFAULT_UPLOAD_PART_SIZE_MULTIPLY_PARTS_COUNT_THRESHOLD;
|
||||
if (!max_part_number)
|
||||
max_part_number = DEFAULT_MAX_PART_NUMBER;
|
||||
if (!max_single_part_upload_size)
|
||||
max_single_part_upload_size = DEFAULT_MAX_SINGLE_PART_UPLOAD_SIZE;
|
||||
if (!max_single_operation_copy_size)
|
||||
max_single_operation_copy_size = DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE;
|
||||
if (!max_connections)
|
||||
max_connections = DEFAULT_MAX_CONNECTIONS;
|
||||
if (!max_unexpected_write_error_retries)
|
||||
max_unexpected_write_error_retries = DEFAULT_MAX_UNEXPECTED_WRITE_ERRORS_RETRIES;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -626,6 +626,7 @@ void StorageStripeLog::restoreDataImpl(const BackupPtr & backup, const String &
|
||||
auto in = backup_entry->getReadBuffer();
|
||||
auto out = disk->writeFile(data_file_path, max_compress_block_size, WriteMode::Append);
|
||||
copyData(*in, *out);
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
/// Append the index.
|
||||
|
@ -0,0 +1,4 @@
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
SELECT count((t, x_0, x_1) -> ((key_2, x_0, x_1) IN (NULL, NULL, '0.3'))) FROM numbers(10); -- { serverError 1 }
|
||||
SELECT count((t, x_0, x_1) -> ((key_2, x_0, x_1) IN (NULL, NULL, '0.3'))) OVER (PARTITION BY id) FROM numbers(10); -- { serverError 1 }
|
Loading…
Reference in New Issue
Block a user