Merge remote-tracking branch 'origin/master' into pr-plan-rewrite

This commit is contained in:
Igor Nikonov 2024-05-22 12:38:44 +00:00
commit b47eb39d56
40 changed files with 646 additions and 76 deletions

View File

@ -68,8 +68,9 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
set (RLIMIT_AS 20000000000)
endif()
# For some files currently building RISCV64 might be too slow. TODO: Improve compilation times per file
if (ARCH_RISCV64)
# For some files currently building RISCV64/LOONGARCH64 might be too slow.
# TODO: Improve compilation times per file
if (ARCH_RISCV64 OR ARCH_LOONGARCH64)
set (RLIMIT_CPU 1800)
endif()

View File

@ -998,17 +998,170 @@ SELECT version()
Returns the build ID generated by a compiler for the running ClickHouse server binary.
If executed in the context of a distributed table, this function generates a normal column with values relevant to each shard. Otherwise it produces a constant value.
## blockNumber()
## blockNumber
Returns the sequence number of the data block where the row is located.
Returns a monotonically increasing sequence number of the [block](../../development/architecture.md#block) containing the row.
The returned block number is updated on a best-effort basis, i.e. it may not be fully accurate.
## rowNumberInBlock() {#rowNumberInBlock}
**Syntax**
```sql
blockNumber()
```
**Returned value**
- Sequence number of the data block where the row is located. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT blockNumber()
FROM
(
SELECT *
FROM system.numbers
LIMIT 10
) SETTINGS max_block_size = 2
```
Result:
```response
┌─blockNumber()─┐
│ 7 │
│ 7 │
└───────────────┘
┌─blockNumber()─┐
│ 8 │
│ 8 │
└───────────────┘
┌─blockNumber()─┐
│ 9 │
│ 9 │
└───────────────┘
┌─blockNumber()─┐
│ 10 │
│ 10 │
└───────────────┘
┌─blockNumber()─┐
│ 11 │
│ 11 │
└───────────────┘
```
## rowNumberInBlock {#rowNumberInBlock}
Returns for each [block](../../development/architecture.md#block) processed by `rowNumberInBlock` the number of the current row.
The returned number starts for each block at 0.
**Syntax**
```sql
rowNumberInBlock()
```
**Returned value**
- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT rowNumberInBlock()
FROM
(
SELECT *
FROM system.numbers_mt
LIMIT 10
) SETTINGS max_block_size = 2
```
Result:
```response
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
```
## rowNumberInAllBlocks
Returns a unique row number for each row processed by `rowNumberInAllBlocks`. The returned numbers start at 0.
**Syntax**
```sql
rowNumberInAllBlocks()
```
**Returned value**
- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT rowNumberInAllBlocks()
FROM
(
SELECT *
FROM system.numbers_mt
LIMIT 10
)
SETTINGS max_block_size = 2
```
Result:
```response
┌─rowNumberInAllBlocks()─┐
│ 0 │
│ 1 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 4 │
│ 5 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 2 │
│ 3 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 6 │
│ 7 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 8 │
│ 9 │
└────────────────────────┘
```
Returns the ordinal number of the row in the data block. Different data blocks are always recalculated.
## rowNumberInAllBlocks()
Returns the ordinal number of the row in the data block. This function only considers the affected data blocks.
## neighbor

View File

@ -14,7 +14,7 @@ struct Settings;
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int INCORRECT_DATA;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
}
@ -198,7 +198,7 @@ public:
this->data(place).value().read(buf, *serialization_val, arena);
if (unlikely(this->data(place).value().has() != this->data(place).result().has()))
throw Exception(
ErrorCodes::CORRUPTED_DATA,
ErrorCodes::INCORRECT_DATA,
"Invalid state of the aggregate function {}: has_value ({}) != has_result ({})",
getName(),
this->data(place).value().has(),

View File

@ -42,7 +42,7 @@ private:
return;
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
bool is_final_supported = storage && storage->supportsFinal();
bool is_final_supported = storage && !storage->isRemote() && storage->supportsFinal();
if (!is_final_supported)
return;

View File

@ -192,7 +192,7 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node)
void QueryTreePassManager::runOnlyResolve(QueryTreeNodePtr query_tree_node)
{
// Run only QueryAnalysisPass and GroupingFunctionsResolvePass passes.
run(query_tree_node, 2);
run(query_tree_node, 3);
}
void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pass_index)
@ -249,6 +249,7 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
{
manager.addPass(std::make_unique<QueryAnalysisPass>(only_analyze));
manager.addPass(std::make_unique<GroupingFunctionsResolvePass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<RemoveUnusedProjectionColumnsPass>());
manager.addPass(std::make_unique<FunctionToSubcolumnsPass>());
@ -294,7 +295,6 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<LogicalExpressionOptimizerPass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>());
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());

View File

@ -40,6 +40,7 @@ static struct InitFiu
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
REGULAR(replicated_sends_failpoint) \
REGULAR(stripe_log_sink_write_fallpoint)\
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \
@ -58,6 +59,7 @@ static struct InitFiu
ONCE(execute_query_calling_empty_set_result_func_on_exception) \
ONCE(receive_timeout_on_table_status_response)
namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";

View File

@ -80,6 +80,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_blocks_in_multipart_upload, 50000, "Maximum number of blocks in multipart upload for Azure.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \

View File

@ -93,6 +93,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"http_max_chunk_size", 0, 0, "Internal limitation"},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},

View File

@ -721,11 +721,10 @@ public:
if (!block.checkCheckSum())
{
std::string calculated_check_sum = std::to_string(block.calculateCheckSum());
std::string check_sum = std::to_string(block.getCheckSum());
std::string expected_check_sum = std::to_string(block.getCheckSum());
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Cache data corrupted. Checksum validation failed. Calculated {} in block {}",
calculated_check_sum,
check_sum);
"Cache data corrupted. Checksum validation failed. Calculated {} expected in block {}, in file {}",
calculated_check_sum, expected_check_sum, file_path);
}
func(blocks_to_fetch[block_to_fetch_index], block.getBlockData());

View File

@ -244,6 +244,13 @@ public:
return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings);
}
/// Truncate file to the target size.
void truncateFile(const std::string & src_path, size_t target_size) override
{
auto wrapped_path = wrappedPath(src_path);
delegate_transaction->truncateFile(wrapped_path, target_size);
}
private:

View File

@ -2,10 +2,16 @@
#include <Disks/IDiskTransaction.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// Fake disk transaction implementation.
/// Just execute all operations immediately, commit is noop operation.
/// No support for atomicity and rollback.
@ -134,6 +140,11 @@ public:
disk.createHardLink(src_path, dst_path);
}
void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented");
}
private:
IDisk & disk;
};

View File

@ -128,6 +128,9 @@ public:
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;
/// Truncate file to the target size.
virtual void truncateFile(const std::string & src_path, size_t target_size) = 0;
};
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;

View File

@ -257,6 +257,7 @@ std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Po
settings->max_upload_part_size = config.getUInt64(config_prefix + ".max_upload_part_size", context->getSettings().azure_max_upload_part_size);
settings->max_single_part_copy_size = config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size);
settings->use_native_copy = config.getBool(config_prefix + ".use_native_copy", false);
settings->max_blocks_in_multipart_upload = config.getUInt64(config_prefix + ".max_blocks_in_multipart_upload", 50000);
settings->max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries);
settings->max_inflight_parts_for_one_file = config.getUInt64(config_prefix + ".max_inflight_parts_for_one_file", context->getSettings().azure_max_inflight_parts_for_one_file);
settings->strict_upload_part_size = config.getUInt64(config_prefix + ".strict_upload_part_size", context->getSettings().azure_strict_upload_part_size);

View File

@ -63,6 +63,7 @@ struct AzureObjectStorageSettings
bool use_native_copy = false;
size_t max_unexpected_write_error_retries = 4;
size_t max_inflight_parts_for_one_file = 20;
size_t max_blocks_in_multipart_upload = 50000;
size_t strict_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;

View File

@ -133,6 +133,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
void DiskObjectStorage::truncateFile(const String & path, size_t size)
{
LOG_TEST(log, "Truncate file operation {} to size : {}", path, size);
auto transaction = createObjectStorageTransaction();
transaction->truncateFile(path, size);
transaction->commit();
}
void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,

View File

@ -84,6 +84,8 @@ public:
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void truncateFile(const String & path, size_t size) override;
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override;

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
}
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
}
ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()
{
if (keys_with_meta.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path);
ObjectKeyWithMetadata object = std::move(keys_with_meta.back());
keys_with_meta.pop_back();
total_size -= object.metadata.size_bytes;
return object;
}
bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting()
{
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD

View File

@ -52,6 +52,7 @@ public:
void addObject(ObjectStorageKey key, size_t size);
ObjectKeyWithMetadata popLastObject();
void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);

View File

@ -559,6 +559,54 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
}
};
struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
size_t size;
TruncateFileOperationOutcomePtr truncate_outcome;
TruncateFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_,
size_t size_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, size(size_)
{}
std::string getInfoForLog() const override
{
return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size);
}
void execute(MetadataTransactionPtr tx) override
{
if (metadata_storage.exists(path))
{
if (!metadata_storage.isFile(path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path);
truncate_outcome = tx->truncateFile(path, size);
}
}
void undo() override
{
}
void finalize() override
{
if (!truncate_outcome)
return;
if (!truncate_outcome->objects_to_remove.empty())
object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove);
}
};
}
void DiskObjectStorageTransaction::createDirectory(const std::string & path)
@ -598,6 +646,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
}));
}
void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size)
{
operations_to_execute.emplace_back(
std::make_unique<TruncateFileObjectStorageOperation>(object_storage, metadata_storage, path, size)
);
}
void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path)
{
auto operation = std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path);

View File

@ -92,6 +92,8 @@ public:
void createFile(const String & path) override;
void truncateFile(const String & path, size_t size) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
/// writeFile is a difficult function for transactions.

View File

@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome
UInt32 num_hardlinks = std::numeric_limits<UInt32>::max();
};
struct TruncateFileOperationOutcome
{
StoredObjects objects_to_remove;
};
using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
using TruncateFileOperationOutcomePtr = std::shared_ptr<TruncateFileOperationOutcome>;
/// Tries to provide some "transactions" interface, which allow
/// to execute (commit) operations simultaneously. We don't provide
@ -143,6 +151,11 @@ public:
return nullptr;
}
virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */)
{
throwNotImplemented();
}
virtual ~IMetadataTransaction() = default;
protected:

View File

@ -259,4 +259,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink
return result;
}
TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size)
{
auto operation = std::make_unique<TruncateMetadataFileOperation>(path, target_size, metadata_storage, *metadata_storage.getDisk());
auto result = operation->outcome;
addOperation(std::move(operation));
return result;
}
}

View File

@ -129,6 +129,8 @@ public:
UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;
TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override;
};

View File

@ -4,9 +4,12 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <optional>
#include <ranges>
#include <filesystem>
#include <utility>
namespace fs = std::filesystem;
@ -14,6 +17,11 @@ namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString(32);
@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
outcome->num_hardlinks++;
}
void TruncateMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
if (metadata_storage.exists(path))
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
while (metadata->getTotalSizeBytes() > target_size)
{
auto object_key_with_metadata = metadata->popLastObject();
outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes);
}
if (metadata->getTotalSizeBytes() != target_size)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size);
}
LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size());
write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());
write_operation->execute(metadata_lock);
}
}
void TruncateMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
{
if (write_operation)
write_operation->undo(lock);
}
void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);

View File

@ -282,4 +282,34 @@ private:
std::unique_ptr<WriteFileOperation> write_operation;
};
struct TruncateMetadataFileOperation final : public IMetadataOperation
{
const TruncateFileOperationOutcomePtr outcome = std::make_shared<TruncateFileOperationOutcome>();
TruncateMetadataFileOperation(
const std::string & path_,
size_t target_size_,
const MetadataStorageFromDisk & metadata_storage_,
IDisk & disk_)
: path(path_)
, target_size(target_size_)
, metadata_storage(metadata_storage_)
, disk(disk_)
{
}
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo(std::unique_lock<SharedMutex> & lock) override;
private:
std::string path;
size_t target_size;
const MetadataStorageFromDisk & metadata_storage;
IDisk & disk;
std::unique_ptr<WriteFileOperation> write_operation;
};
}

View File

@ -31,7 +31,7 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} DIRECTORY .. PROPERTIES COMPILE_FLAGS "-g0")
endif()
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
@ -94,11 +95,56 @@ namespace
void calculatePartSize()
{
auto max_upload_part_size = settings->max_upload_part_size;
if (!max_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be 0");
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
auto max_part_number = settings->max_blocks_in_multipart_upload;
const auto min_upload_part_size = settings->min_upload_part_size;
const auto max_upload_part_size = settings->max_upload_part_size;
if (!max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_blocks_in_multipart_upload must not be 0");
else if (!min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
else if (max_upload_part_size < min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
size_t part_size = min_upload_part_size;
auto num_parts = (total_size + part_size - 1) / part_size;
if (num_parts > max_part_number)
{
part_size = (total_size + max_part_number - 1) / max_part_number;
num_parts = (total_size + part_size - 1) / part_size;
}
if (part_size > max_upload_part_size)
{
part_size = max_upload_part_size;
num_parts = (total_size + part_size - 1) / part_size;
}
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to Azure. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = max_upload_part_size;
normal_part_size = part_size;
}
public:
@ -219,21 +265,22 @@ namespace
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), task.part_offset, task.part_size);
while (!read_buffer->eof())
{
auto size = read_buffer->available();
if (size > 0)
{
auto block_id = getRandomASCIIString(64);
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(read_buffer->position()), size);
block_blob_client.StageBlock(block_id, memory);
task.block_ids.emplace_back(block_id);
read_buffer->ignore(size);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}", dest_container_for_logging, dest_blob, block_id);
}
}
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, Parts: {}", dest_container_for_logging, dest_blob, bg_tasks.size());
/// task.part_size is already normalized according to min_upload_part_size and max_upload_part_size.
size_t size_to_stage = task.part_size;
PODArray<char> memory;
memory.resize(size_to_stage);
WriteBufferFromVector<PODArray<char>> wb(memory);
copyData(*read_buffer, wb, size_to_stage);
Azure::Core::IO::MemoryBodyStream stream(reinterpret_cast<const uint8_t *>(memory.data()), size_to_stage);
const auto & block_id = task.block_ids.emplace_back(getRandomASCIIString(64));
block_blob_client.StageBlock(block_id, stream);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}, size: {}",
dest_container_for_logging, dest_blob, block_id, size_to_stage);
}
@ -333,8 +380,8 @@ void copyAzureBlobStorageFile(
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_blob, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
return std::make_unique<ReadBufferFromAzureBlobStorage>(
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")};

View File

@ -316,23 +316,23 @@ namespace
num_parts = (total_size + part_size - 1) / part_size;
}
if (num_parts < 1 || num_parts > max_part_number || part_size < min_upload_part_size || part_size > max_upload_part_size)
{
String msg;
if (num_parts < 1)
msg = "Number of parts is zero";
else if (num_parts > max_part_number)
msg = fmt::format("Number of parts exceeds {}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
msg = fmt::format("Size of a part is less than {}", part_size, min_upload_part_size);
else
msg = fmt::format("Size of a part exceeds {}", part_size, max_upload_part_size);
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to S3. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
msg, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).

View File

@ -1393,7 +1393,16 @@ void executeQuery(
const char * begin;
const char * end;
istr.nextIfAtEnd();
try
{
istr.nextIfAtEnd();
}
catch (...)
{
/// If buffer contains invalid data and we failed to decompress, we still want to have some information about the query in the log.
logQuery("<cannot parse>", context, /* internal = */ false, QueryProcessingStage::Complete);
throw;
}
size_t max_query_size = context->getSettingsRef().max_query_size;

View File

@ -707,11 +707,11 @@ void HTTPHandler::processQuery(
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
bool in_post_compressed = false;
bool is_in_post_compressed = false;
if (params.getParsed<bool>("decompress", false))
{
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
in_post_compressed = true;
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post, /* allow_different_codecs_ = */ false, /* external_data_ = */ true);
is_in_post_compressed = true;
}
else
in_post_maybe_compressed = std::move(in_post);
@ -845,7 +845,7 @@ void HTTPHandler::processQuery(
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
if (is_in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin

View File

@ -254,6 +254,10 @@ AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(const ContextPt
auto settings_ptr = std::make_unique<AzureObjectStorageSettings>();
settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size;
settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries;
settings_ptr->strict_upload_part_size = context_settings.azure_strict_upload_part_size;
settings_ptr->max_upload_part_size = context_settings.azure_max_upload_part_size;
settings_ptr->max_blocks_in_multipart_upload = context_settings.azure_max_blocks_in_multipart_upload;
settings_ptr->min_upload_part_size = context_settings.azure_min_upload_part_size;
settings_ptr->list_object_keys_size = static_cast<int32_t>(context_settings.azure_list_object_keys_size);
return settings_ptr;

View File

@ -5,6 +5,7 @@
#include <Common/escapeForFileName.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h>
@ -53,8 +54,13 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int FAULT_INJECTED;
}
namespace FailPoints
{
extern const char stripe_log_sink_write_fallpoint[];
}
/// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading,
/// because we read ranges of data that do not change.
@ -234,6 +240,11 @@ public:
/// Save the new indices.
storage.saveIndices(lock);
// While executing save file sizes the exception might occurs. S3::TooManyRequests for example.
fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint,
{
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table");
});
/// Save the new file sizes.
storage.saveFileSizes(lock);

View File

@ -281,7 +281,10 @@ def test_backup_restore_on_merge_tree(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'",
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'
""",
)
azure_query(node, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
@ -299,3 +302,85 @@ def test_backup_restore_on_merge_tree(cluster):
)
azure_query(node, f"DROP TABLE test_simple_merge_tree")
azure_query(node, f"DROP TABLE test_simple_merge_tree_restored")
def test_backup_restore_correct_block_ids(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String)
Engine = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy='blob_storage_policy'""",
)
data_query = "SELECT number, repeat('a', 100) FROM numbers(1000)"
azure_query(
node,
f"INSERT INTO test_simple_merge_tree {data_query}",
)
for min_upload_size, max_upload_size, max_blocks, expected_block_size in [
(42, 100, 1000, 42),
(42, 52, 86, 52),
]:
data_path = f"test_backup_correct_block_ids_{max_blocks}"
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{data_path}')"
azure_query(
node,
f"""
SET azure_min_upload_part_size = {min_upload_size};
SET azure_max_upload_part_size = {max_upload_size};
SET azure_max_blocks_in_multipart_upload = {max_blocks};
BACKUP TABLE test_simple_merge_tree TO {backup_destination} SETTINGS allow_azure_native_copy = 0;
""",
)
port = cluster.azurite_port
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
container_name = "cont"
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
container_client = blob_service_client.get_container_client(container_name)
blobs = container_client.list_blobs()
data_blob = (
f"{data_path}/data/default/test_simple_merge_tree/all_1_1_0/data.bin"
)
found = False
for blob in blobs:
if data_blob == blob.get("name"):
found = True
break
assert found
blob_client = blob_service_client.get_blob_client(
blob=data_blob, container=container_name
)
blocks_num = len(blob_client.get_block_list()[0])
assert blocks_num > 50
count = 0
for block in blob_client.get_block_list()[0]:
count += 1
if count < blocks_num:
assert block.get("size") == expected_block_size
else:
assert block.get("size") < expected_block_size
azure_query(
node,
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored_{max_blocks} FROM {backup_destination};",
)
assert azure_query(
node,
f"SELECT * from test_simple_merge_tree_restored_{max_blocks} ORDER BY key",
) == node.query(data_query)

View File

@ -1,12 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,34 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<s3_no_retries>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>1</retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>
</disks>
<policies>
<s3_no_retries>
<volumes>
<main>
<disk>s3_no_retries</disk>
</main>
</volumes>
</s3_no_retries>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -11,7 +11,7 @@ def cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/minio.xml", "configs/ssl.xml"],
main_configs=["configs/storage_configuration.xml", "configs/ssl.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
@ -84,3 +84,39 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i
assert_objects_count(cluster, 0)
finally:
node.query("DROP TABLE s3_test")
# Imitate case when error occurs while inserting into table.
# For examle S3::TooManyRequests.
# In that case we can update data file, but not the size file.
# So due to exception we should do truncate of the data file to undo the insert query.
# See FileChecker::repair().
def test_stripe_log_truncate(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE stripe_table (
a int
) ENGINE = StripeLog()
SETTINGS storage_policy='s3_no_retries'
"""
)
node.query("SYSTEM ENABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query(
"""
INSERT INTO stripe_table SELECT number FROM numbers(10)
""",
ignore_error=True,
)
node.query("SYSTEM DISABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query("SELECT count(*) FROM stripe_table") == "0\n"
node.query("INSERT INTO stripe_table SELECT number FROM numbers(10)")
node.query("SELECT count(*) FROM stripe_table") == "10\n"
# Make sure that everything is okey with the table after restart.
node.query("DETACH TABLE stripe_table")
node.query("ATTACH TABLE stripe_table")
assert node.query("DROP TABLE stripe_table") == ""

View File

@ -132,3 +132,7 @@ SELECT * FROM merge_table ORDER BY id, val;
2 a
2 b
3 c
select sum(number) from numbers(10) settings final=1;
45
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;
90

View File

@ -102,3 +102,6 @@ insert into table_to_merge_c values (3,'c');
-- expected output:
-- 1 c, 2 a, 2 b, 3 c
SELECT * FROM merge_table ORDER BY id, val;
select sum(number) from numbers(10) settings final=1;
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;

View File

@ -103,11 +103,11 @@ SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString
SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x);
SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x);
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError INCORRECT_DATA }
SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x);
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError INCORRECT_DATA }
drop table if exists aggr;

View File

@ -7,8 +7,6 @@ export LC_ALL=C # The "total" should be printed without localization
TU_EXCLUDES=(
AggregateFunctionUniq
Aggregator
# FIXME: Exclude for now
FunctionsConversion
)
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \