More efficient WriteBufferFromAzureBlobStorage

This commit is contained in:
kssenii 2022-10-03 22:15:04 +02:00
parent c6d7cd5820
commit 2ab884359e
4 changed files with 78 additions and 26 deletions

View File

@ -66,38 +66,60 @@ void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
execWithRetry([this](){ next(); }, DEFAULT_RETRY_NUM);
if (tmp_buffer_write_offset > 0)
uploadBlock(tmp_buffer->data(), tmp_buffer_write_offset);
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, DEFAULT_RETRY_NUM);
LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path);
}
void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size)
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM);
tmp_buffer_write_offset = 0;
LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);
}
WriteBufferFromAzureBlobStorage::MemoryBufferPtr WriteBufferFromAzureBlobStorage::allocateBuffer() const
{
return std::make_unique<Memory<>>(max_single_part_upload_size);
}
void WriteBufferFromAzureBlobStorage::nextImpl()
{
if (!offset())
size_t size_to_upload = offset();
if (size_to_upload == 0)
return;
char * buffer_begin = working_buffer.begin();
size_t total_size = offset();
if (!tmp_buffer)
tmp_buffer = allocateBuffer();
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
size_t current_size = 0;
while (current_size < total_size)
size_t uploaded_size = 0;
while (uploaded_size != size_to_upload)
{
size_t part_len = std::min(total_size - current_size, max_single_part_upload_size);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
size_t memory_buffer_remaining_size = max_single_part_upload_size - tmp_buffer_write_offset;
if (memory_buffer_remaining_size == 0)
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(buffer_begin + current_size), part_len);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, tmp_buffer); }, DEFAULT_RETRY_NUM);
current_size += part_len;
LOG_TRACE(log, "Staged block (id: {}) of size {} (written {}/{}, blob path: {}).", block_id, part_len, current_size, total_size, blob_path);
size_t size = std::min(memory_buffer_remaining_size, size_to_upload - uploaded_size);
memcpy(tmp_buffer->data() + tmp_buffer_write_offset, working_buffer.begin() + uploaded_size, size);
uploaded_size += size;
tmp_buffer_write_offset += size;
}
if (tmp_buffer_write_offset == max_single_part_upload_size)
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(total_size);
write_settings.remote_throttler->add(size_to_upload);
}
}

View File

@ -40,6 +40,7 @@ public:
private:
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries);
void uploadBlock(const char * data, size_t size);
Poco::Logger * log;
@ -49,6 +50,12 @@ private:
AzureClientPtr blob_container_client;
std::vector<std::string> block_ids;
using MemoryBufferPtr = std::unique_ptr<Memory<>>;
MemoryBufferPtr tmp_buffer;
size_t tmp_buffer_write_offset = 0;
MemoryBufferPtr allocateBuffer() const;
};
}

View File

@ -10,12 +10,8 @@
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>33554432</max_single_part_upload_size>
<max_single_part_upload_size>100000</max_single_part_upload_size>
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<blob_storage_policy>
@ -23,9 +19,6 @@
<main>
<disk>blob_storage_disk</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</blob_storage_policy>
</policies>

View File

@ -6,6 +6,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
from azure.storage.blob import BlobServiceClient
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -571,10 +572,39 @@ def test_restart_during_load(cluster):
def test_big_insert(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
check_query = "SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)"
azure_query(
node,
f"INSERT INTO {TABLE_NAME} SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)",
f"INSERT INTO {TABLE_NAME} {check_query}",
)
assert azure_query(node, f"SELECT * FROM {TABLE_NAME} ORDER BY id") == node.query(
"SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)"
check_query
)
blob_container_client = cluster.blob_service_client.get_container_client(
CONTAINER_NAME
)
blobs = blob_container_client.list_blobs()
max_single_part_upload_size = 100000
for blob in blobs:
blob_client = cluster.blob_service_client.get_blob_client(
CONTAINER_NAME, blob.name
)
committed, uncommited = blob_client.get_block_list()
blocks = committed
assert len(blocks) > 1
last_id = len(blocks)
id = 1
for block in blocks:
print(f"blob: {blob.name}, block size: {block.size}")
if id == last_id:
assert max_single_part_upload_size >= block.size
else:
assert max_single_part_upload_size == block.size
id += 1