From 2ab884359ebbd2e4091252312096ef6f10fc9b90 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 3 Oct 2022 22:15:04 +0200 Subject: [PATCH] More efficient WriteBufferFromAzureBlobStorage --- .../IO/WriteBufferFromAzureBlobStorage.cpp | 54 +++++++++++++------ .../IO/WriteBufferFromAzureBlobStorage.h | 7 +++ .../configs/config.d/storage_conf.xml | 9 +--- .../test.py | 34 +++++++++++- 4 files changed, 78 insertions(+), 26 deletions(-) diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 3b5ca89d224..8a049725e3f 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -66,38 +66,60 @@ void WriteBufferFromAzureBlobStorage::finalizeImpl() { execWithRetry([this](){ next(); }, DEFAULT_RETRY_NUM); + if (tmp_buffer_write_offset > 0) + uploadBlock(tmp_buffer->data(), tmp_buffer_write_offset); + auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path); execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, DEFAULT_RETRY_NUM); LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path); } +void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size) +{ + auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path); + const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64)); + + Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast(data), size); + execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM); + tmp_buffer_write_offset = 0; + + LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path); +} + +WriteBufferFromAzureBlobStorage::MemoryBufferPtr WriteBufferFromAzureBlobStorage::allocateBuffer() const +{ + return std::make_unique>(max_single_part_upload_size); +} + void WriteBufferFromAzureBlobStorage::nextImpl() { - if (!offset()) + size_t size_to_upload = offset(); + + if (size_to_upload == 0) return; - char * buffer_begin = working_buffer.begin(); - size_t total_size = offset(); + if (!tmp_buffer) + tmp_buffer = allocateBuffer(); - auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path); - - size_t current_size = 0; - - while (current_size < total_size) + size_t uploaded_size = 0; + while (uploaded_size != size_to_upload) { - size_t part_len = std::min(total_size - current_size, max_single_part_upload_size); - const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64)); + size_t memory_buffer_remaining_size = max_single_part_upload_size - tmp_buffer_write_offset; + if (memory_buffer_remaining_size == 0) + uploadBlock(tmp_buffer->data(), tmp_buffer->size()); - Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast(buffer_begin + current_size), part_len); - execWithRetry([&](){ block_blob_client.StageBlock(block_id, tmp_buffer); }, DEFAULT_RETRY_NUM); - - current_size += part_len; - LOG_TRACE(log, "Staged block (id: {}) of size {} (written {}/{}, blob path: {}).", block_id, part_len, current_size, total_size, blob_path); + size_t size = std::min(memory_buffer_remaining_size, size_to_upload - uploaded_size); + memcpy(tmp_buffer->data() + tmp_buffer_write_offset, working_buffer.begin() + uploaded_size, size); + uploaded_size += size; + tmp_buffer_write_offset += size; } + if (tmp_buffer_write_offset == max_single_part_upload_size) + uploadBlock(tmp_buffer->data(), tmp_buffer->size()); + if (write_settings.remote_throttler) - write_settings.remote_throttler->add(total_size); + write_settings.remote_throttler->add(size_to_upload); } } diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h index e7eaef86fa0..ebefe19dade 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h @@ -40,6 +40,7 @@ public: private: void finalizeImpl() override; void execWithRetry(std::function func, size_t num_tries); + void uploadBlock(const char * data, size_t size); Poco::Logger * log; @@ -49,6 +50,12 @@ private: AzureClientPtr blob_container_client; std::vector block_ids; + + using MemoryBufferPtr = std::unique_ptr>; + MemoryBufferPtr tmp_buffer; + size_t tmp_buffer_write_offset = 0; + + MemoryBufferPtr allocateBuffer() const; }; } diff --git a/tests/integration/test_merge_tree_azure_blob_storage/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_azure_blob_storage/configs/config.d/storage_conf.xml index 09fa0d6c767..02945b619c2 100644 --- a/tests/integration/test_merge_tree_azure_blob_storage/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_azure_blob_storage/configs/config.d/storage_conf.xml @@ -10,12 +10,8 @@ devstoreaccount1 Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== - 33554432 + 100000 - - local - / - @@ -23,9 +19,6 @@
blob_storage_disk
- - hdd -
diff --git a/tests/integration/test_merge_tree_azure_blob_storage/test.py b/tests/integration/test_merge_tree_azure_blob_storage/test.py index 68a783d2427..1f2290f0351 100644 --- a/tests/integration/test_merge_tree_azure_blob_storage/test.py +++ b/tests/integration/test_merge_tree_azure_blob_storage/test.py @@ -6,6 +6,7 @@ import pytest from helpers.cluster import ClickHouseCluster from helpers.utility import generate_values, replace_config, SafeThread +from azure.storage.blob import BlobServiceClient SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -571,10 +572,39 @@ def test_restart_during_load(cluster): def test_big_insert(cluster): node = cluster.instances[NODE_NAME] create_table(node, TABLE_NAME) + + check_query = "SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)" + azure_query( node, - f"INSERT INTO {TABLE_NAME} SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)", + f"INSERT INTO {TABLE_NAME} {check_query}", ) assert azure_query(node, f"SELECT * FROM {TABLE_NAME} ORDER BY id") == node.query( - "SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)" + check_query ) + + blob_container_client = cluster.blob_service_client.get_container_client( + CONTAINER_NAME + ) + + blobs = blob_container_client.list_blobs() + max_single_part_upload_size = 100000 + + for blob in blobs: + blob_client = cluster.blob_service_client.get_blob_client( + CONTAINER_NAME, blob.name + ) + committed, uncommited = blob_client.get_block_list() + + blocks = committed + assert len(blocks) > 1 + last_id = len(blocks) + id = 1 + + for block in blocks: + print(f"blob: {blob.name}, block size: {block.size}") + if id == last_id: + assert max_single_part_upload_size >= block.size + else: + assert max_single_part_upload_size == block.size + id += 1