Adding multipart upload to WriteBuffer, retries in ReadBuffer, additional settings for buffers

This commit is contained in:
Jakub Kuklis 2021-10-08 14:34:40 +00:00
parent 02060791e3
commit 63ec8d4244
7 changed files with 123 additions and 26 deletions

View File

@ -6,24 +6,24 @@
#include <random>
#include <common/logger_useful.h>
namespace DB
{
// // TODO: abstract this function from DiskS3.cpp, from where it was copy-pasted
// String getRandomName()
// {
// std::uniform_int_distribution<int> distribution('a', 'z');
// String res(32, ' '); /// The number of bits of entropy should be not less than 128.
// for (auto & c : res)
// c = distribution(thread_local_rng);
// return res;
// }
DiskBlobStorageSettings::DiskBlobStorageSettings(
int thread_pool_size_) :
thread_pool_size(thread_pool_size_) {}
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int thread_pool_size_,
int objects_chunk_size_to_delete_) :
max_single_read_retries(max_single_read_retries_),
min_upload_part_size(min_upload_part_size_),
max_single_part_upload_size(max_single_part_upload_size_),
min_bytes_for_seek(min_bytes_for_seek_),
thread_pool_size(thread_pool_size_),
objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {}
class BlobStoragePathKeeper : public RemoteFSPathKeeper
@ -44,19 +44,22 @@ public:
ReadIndirectBufferFromBlobStorage(
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
IDiskRemote::Metadata metadata_,
UInt64 max_single_read_retries_,
size_t buf_size_) :
ReadIndirectBufferFromRemoteFS<ReadBufferFromBlobStorage>(metadata_),
blob_container_client(blob_container_client_),
max_single_read_retries(max_single_read_retries_),
buf_size(buf_size_)
{}
std::unique_ptr<ReadBufferFromBlobStorage> createReadBuffer(const String & path) override
{
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, metadata.remote_fs_root_path + path, buf_size);
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, metadata.remote_fs_root_path + path, max_single_read_retries, buf_size);
}
private:
Azure::Storage::Blobs::BlobContainerClient blob_container_client;
UInt64 max_single_read_retries;
size_t buf_size;
};
@ -85,9 +88,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
LOG_DEBUG(log, "Read from file by path: {}", backQuote(metadata_path + path));
auto reader = std::make_unique<ReadIndirectBufferFromBlobStorage>(blob_container_client, metadata, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromBlobStorage>(
blob_container_client, metadata, current_settings.get()->max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), buf_size); // TODO: last one is the min bytes read, to change
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), current_settings.get()->min_bytes_for_seek);
}
@ -102,7 +106,12 @@ std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
LOG_DEBUG(log, "{} to file by path: {}. Blob Storage path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + blob_path);
auto buffer = std::make_unique<WriteBufferFromBlobStorage>(blob_container_client, metadata.remote_fs_root_path + blob_path, buf_size);
auto buffer = std::make_unique<WriteBufferFromBlobStorage>(
blob_container_client,
metadata.remote_fs_root_path + blob_path,
current_settings.get()->min_upload_part_size,
current_settings.get()->max_single_part_upload_size,
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
}
@ -134,10 +143,10 @@ void DiskBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr)
RemoteFSPathKeeperPtr DiskBlobStorage::createFSPathKeeper() const
{
return std::make_shared<BlobStoragePathKeeper>(1024);
return std::make_shared<BlobStoragePathKeeper>(current_settings.get()->objects_chunk_size_to_delete);
}
// TODO: applyNewSettings - copy-paste from DiskS3
// NOTE: applyNewSettings - direct copy-paste from DiskS3
void DiskBlobStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String &, const DisksMap &)
{
auto new_settings = settings_getter(config, "storage_configuration.disks." + name, context);

View File

@ -27,9 +27,19 @@ void blob_storage_demo();
struct DiskBlobStorageSettings final
{
DiskBlobStorageSettings(
int thread_pool_size_);
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int thread_pool_size_,
int objects_chunk_size_to_delete_);
UInt64 max_single_read_retries;
UInt64 min_upload_part_size;
UInt64 max_single_part_upload_size;
UInt64 min_bytes_for_seek;
int thread_pool_size;
int objects_chunk_size_to_delete;
};

View File

@ -62,7 +62,12 @@ void checkRemoveAccess(IDisk & disk) {
std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /* context */)
{
return std::make_unique<DiskBlobStorageSettings>(
config.getInt(config_prefix + ".thread_pool_size", 16)
config.getUInt64(config_prefix + ".max_single_read_retries", 3),
config.getUInt64(config_prefix + ".min_upload_part_size", 32),
config.getUInt64(config_prefix + ".max_single_part_upload_size", 32),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000)
// TODO: use context for global settings from Settings.h
);
}

View File

@ -8,6 +8,8 @@
#include <IO/ReadBufferFromBlobStorage.h>
#include <IO/ReadBufferFromString.h>
#include <common/logger_useful.h>
namespace DB
{
@ -22,9 +24,11 @@ namespace ErrorCodes
ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
const String & path_,
UInt64 max_single_read_retries_,
size_t /* buf_size_ */) :
SeekableReadBuffer(nullptr, 0),
blob_container_client(blob_container_client_),
max_single_read_retries(max_single_read_retries_),
path(path_) {}
@ -45,10 +49,33 @@ bool ReadBufferFromBlobStorage::nextImpl()
next_result = impl->hasPendingData();
}
if (!next_result)
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
{
/// Try to read a next portion of data.
next_result = impl->next();
try
{
/// Try to read a next portion of data.
next_result = impl->next();
break;
}
catch (const Exception & e)
{
// TODO: can't get this to compile, getting "error: reference to overloaded function could not be resolved; did you mean to call it?"
// LOG_DEBUG(log, "Caught exception while reading Blob Storage object. Object: {}, Offset: {}, Attempt: {}, Message: {}",
// path, getPosition(), attempt, e.message());
std::cout << "Caught exception while reading Blob Storage object. Object: " << path << ", Offset: "
<< getPosition() << ", Attempt: " << attempt << ", Message: " << e.message() << "\n";
/// Pause before next attempt.
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
/// Try to reinitialize `impl`.
impl.reset();
impl = initialize();
next_result = impl->hasPendingData();
}
}
if (!next_result)

View File

@ -19,6 +19,7 @@ public:
explicit ReadBufferFromBlobStorage(
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
const String & path_,
UInt64 max_single_read_retries_,
size_t buf_size_
);
@ -34,6 +35,7 @@ private:
Azure::Storage::Blobs::BlobContainerClient blob_container_client;
std::unique_ptr<ReadBuffer> impl;
std::vector<uint8_t> tmp_buffer;
UInt64 max_single_read_retries;
const String & path;
off_t offset = 0;

View File

@ -10,12 +10,27 @@
namespace DB
{
// TODO: abstract this function from DiskS3.cpp, from where it was copy-pasted
String getRandomName(char first = 'a', char last = 'z', size_t len = 64)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
WriteBufferFromBlobStorage::WriteBufferFromBlobStorage(
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
const String & blob_path_,
UInt64 /* min_upload_part_size_ */,
UInt64 max_single_part_upload_size_,
size_t buf_size_) :
BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0),
blob_container_client(blob_container_client_),
// min_upload_part_size(min_upload_part_size_),
max_single_part_upload_size(max_single_part_upload_size_),
blob_path(blob_path_) {}
@ -38,9 +53,33 @@ void WriteBufferFromBlobStorage::nextImpl() {
std::cout << "\n";
#endif
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(pos), len);
auto block_blob_client = blob_container_client.GetBlockBlobClient(blob_path);
blob_container_client.UploadBlob(blob_path, tmp_buffer);
if (len <= max_single_part_upload_size)
{
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(pos), len);
blob_container_client.UploadBlob(blob_path, tmp_buffer);
}
else
{
size_t read = 0;
while (read < len)
{
auto part_len = std::min(len - read, max_single_part_upload_size);
auto block_id = getRandomName();
block_ids.push_back(block_id);
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(pos + read), part_len);
block_blob_client.StageBlock(block_ids.back(), tmp_buffer);
read += part_len;
}
block_blob_client.CommitBlockList(block_ids);
}
}
}

View File

@ -24,13 +24,18 @@ public:
explicit WriteBufferFromBlobStorage(
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
const String & blob_path_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
size_t buf_size_);
void nextImpl() override;
private:
std::vector<std::string> block_ids;
Azure::Storage::Blobs::BlobContainerClient blob_container_client;
// UInt64 min_upload_part_size; // TODO: currently not used
UInt64 max_single_part_upload_size;
const String blob_path;
};