Code cleanup addressing PR remarks

This commit is contained in:
Jakub Kuklis 2021-12-01 12:31:25 +00:00
parent 850c9a7e7a
commit 910db6ca57
14 changed files with 82 additions and 98 deletions

View File

@ -11,4 +11,4 @@ services:
- 10002
volumes:
data1-1:
data1-1:

View File

@ -2,6 +2,7 @@
#if USE_AZURE_BLOB_STORAGE
#include <Disks/RemoteDisksCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
@ -17,46 +18,30 @@ namespace ErrorCodes
}
// TODO: abstract this function from DiskS3.cpp, from where it was copy-pasted
// NOTE: name suffixed with -Disk, getting errors with getRandomName defined also in WriteBuffer
String getRandomNameDisk(char first = 'a', char last = 'z', size_t len = 64)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
DiskBlobStorageSettings::DiskBlobStorageSettings(
UInt64 max_single_part_upload_size_,
UInt64 min_bytes_for_seek_,
int max_single_read_retries_,
int max_single_download_retries_,
int thread_pool_size_,
int objects_chunk_size_to_delete_) :
int thread_pool_size_) :
max_single_part_upload_size(max_single_part_upload_size_),
min_bytes_for_seek(min_bytes_for_seek_),
max_single_read_retries(max_single_read_retries_),
max_single_download_retries(max_single_download_retries_),
thread_pool_size(thread_pool_size_),
objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {}
thread_pool_size(thread_pool_size_) {}
class BlobStoragePathKeeper : public RemoteFSPathKeeper
{
public:
// NOTE : chunk_limit unused
BlobStoragePathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
/// RemoteFSPathKeeper constructed with a placeholder argument for chunk_limit, it is unused in this class
BlobStoragePathKeeper() : RemoteFSPathKeeper(1000) {}
void addPath(const String & path) override
{
paths.push_back(path);
}
// TODO: maybe introduce a getter?
// private:
std::vector<String> paths;
};
@ -108,14 +93,14 @@ std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
auto blob_path = path + "_" + getRandomNameDisk('a', 'z', 8); // TODO: maybe use getRandomName() or modify the path (now it contains the tmp_* directory part)
auto blob_path = path + "_" + getRandomName(8); /// NOTE: path contains the tmp_* prefix in the blob name
LOG_TRACE(log, "{} to file by path: {}. Blob Storage path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + blob_path);
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), blob_path);
auto buffer = std::make_unique<WriteBufferFromBlobStorage>(
blob_container_client,
metadata.remote_fs_root_path + blob_path,
blob_path,
current_settings.get()->max_single_part_upload_size,
buf_size);
@ -187,7 +172,7 @@ void DiskBlobStorage::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
RemoteFSPathKeeperPtr DiskBlobStorage::createFSPathKeeper() const
{
return std::make_shared<BlobStoragePathKeeper>(current_settings.get()->objects_chunk_size_to_delete);
return std::make_shared<BlobStoragePathKeeper>();
}
// NOTE: applyNewSettings - direct copy-paste from DiskS3

View File

@ -25,15 +25,13 @@ struct DiskBlobStorageSettings final
UInt64 min_bytes_for_seek_,
int max_single_read_retries,
int max_single_download_retries,
int thread_pool_size_,
int objects_chunk_size_to_delete_);
int thread_pool_size_);
size_t max_single_part_upload_size; // NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
UInt64 min_bytes_for_seek;
size_t max_single_read_retries;
size_t max_single_download_retries;
size_t thread_pool_size;
size_t objects_chunk_size_to_delete;
};

View File

@ -9,6 +9,7 @@
#include <Disks/BlobStorage/DiskBlobStorage.h>
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskCacheWrapper.h>
#include <Disks/RemoteDisksCommon.h>
#include <azure/identity/managed_identity_credential.hpp>
#include <re2/re2.h>
@ -46,7 +47,6 @@ void checkReadAccess(IDisk & disk)
}
// TODO: make it a unit test ?
void checkReadWithOffset(IDisk & disk)
{
auto file = disk.readFile(test_file);
@ -62,14 +62,7 @@ void checkReadWithOffset(IDisk & disk)
void checkRemoveAccess(IDisk & disk)
{
// TODO: remove these checks if the names of blobs will be changed
// if (!disk.checkUniqueId(test_file))
// throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Expected the file to exist, but did not find it: {}", test_file);
disk.removeFile(test_file);
// if (disk.checkUniqueId(test_file))
// throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Expected the file not to exist, but found it: {}", test_file);
}
@ -85,17 +78,16 @@ void validate_endpoint_url(const String & endpoint_url)
}
std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /* context */)
std::unique_ptr<DiskBlobStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
{
return std::make_unique<DiskBlobStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000)
// TODO: use context for global settings from Settings.h
);
config.getInt(config_prefix + ".thread_pool_size", 16)
// TODO: maybe use context for global settings from Settings.h
);
}
@ -127,7 +119,6 @@ void registerDiskBlobStorage(DiskFactory & factory)
getSettings
);
// NOTE: test - almost direct copy-paste from registerDiskS3
if (!config.getBool(config_prefix + ".skip_access_check", false))
{
checkWriteAccess(*blob_storage_disk);
@ -136,23 +127,10 @@ void registerDiskBlobStorage(DiskFactory & factory)
checkRemoveAccess(*blob_storage_disk);
}
// NOTE: cache - direct copy-paste from registerDiskS3
if (config.getBool(config_prefix + ".cache_enabled", true))
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
if (metadata_path == cache_path)
throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>("blob-storage-cache", cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") /// mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
blob_storage_disk = std::make_shared<DiskCacheWrapper>(blob_storage_disk, cache_disk, cache_file_predicate);
blob_storage_disk = wrapWithCache(blob_storage_disk, "blob-storage-cache", cache_path, metadata_path);
}
return std::make_shared<DiskRestartProxy>(blob_storage_disk);

View File

@ -37,8 +37,8 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
#if USE_AZURE_BLOB_STORAGE
SeekableReadBufferPtr ReadBufferFromBlobStorageGather::createImplementationBuffer(const String & path, size_t read_until_position_) const
{
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, fs::path(metadata.remote_fs_root_path) / path,
max_single_read_retries, max_single_download_retries, settings.remote_fs_buffer_size, threadpool_read, read_until_position_);
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, path, max_single_read_retries,
max_single_download_retries, settings.remote_fs_buffer_size, threadpool_read, read_until_position_);
}
#endif

View File

@ -105,7 +105,7 @@ private:
#if USE_AZURE_BLOB_STORAGE
/// Reads data from Blob Storage using stored paths in metadata.
/// Reads data from Blob Storage using paths stored in metadata.
class ReadBufferFromBlobStorageGather final : public ReadBufferFromRemoteFSGather
{
public:

View File

@ -0,0 +1,39 @@
#include <Disks/RemoteDisksCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
String getRandomName(size_t len, char first, char last)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
std::shared_ptr<DiskCacheWrapper> wrapWithCache(
std::shared_ptr<IDisk> disk, String cache_name, String cache_path, String metadata_path)
{
if (metadata_path == cache_path)
throw Exception("Metadata and cache paths should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>(cache_name, cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") /// mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
return std::make_shared<DiskCacheWrapper>(disk, cache_disk, cache_file_predicate);
}
}

View File

@ -0,0 +1,15 @@
#include <random>
#include <Core/Types.h>
#include <Common/thread_local_rng.h>
#include <Disks/DiskCacheWrapper.h>
namespace DB
{
String getRandomName(size_t len = 32, char first = 'a', char last = 'z');
std::shared_ptr<DiskCacheWrapper> wrapWithCache(
std::shared_ptr<IDisk> disk, String cache_name, String cache_path, String metadata_path);
}

View File

@ -25,6 +25,7 @@
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Disks/RemoteDisksCommon.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
@ -100,15 +101,6 @@ private:
Chunks chunks;
};
String getRandomName()
{
std::uniform_int_distribution<int> distribution('a', 'z');
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
template <typename Result, typename Error>
void throwIfError(Aws::Utils::Outcome<Result, Error> & response)
{

View File

@ -18,6 +18,7 @@
#include "ProxyResolverConfiguration.h"
#include "Disks/DiskRestartProxy.h"
#include "Disks/DiskLocal.h"
#include "Disks/RemoteDisksCommon.h"
namespace DB
{
@ -204,19 +205,7 @@ void registerDiskS3(DiskFactory & factory)
if (cache_enabled)
{
String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/");
if (metadata_path == cache_path)
throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS);
auto cache_disk = std::make_shared<DiskLocal>("s3-cache", cache_path, 0);
auto cache_file_predicate = [] (const String & path)
{
return path.ends_with("idx") // index files.
|| path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files.
|| path.ends_with("txt") || path.ends_with("dat");
};
s3disk = std::make_shared<DiskCacheWrapper>(s3disk, cache_disk, cache_file_predicate);
s3disk = wrapWithCache(s3disk, "s3-cache", cache_path, metadata_path);
}
return std::make_shared<DiskRestartProxy>(s3disk);

View File

@ -51,8 +51,6 @@ ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
bool ReadBufferFromBlobStorage::nextImpl()
{
// TODO: is this "stream" approach better than a single DownloadTo approach (last commit 90fc230c4dfacc1a9d50d2d65b91363150caa784) ?
if (read_until_position)
{
if (read_until_position == offset)

View File

@ -5,22 +5,12 @@
#if USE_AZURE_BLOB_STORAGE
#include <IO/WriteBufferFromBlobStorage.h>
#include <Disks/RemoteDisksCommon.h>
namespace DB
{
// TODO: abstract this function from DiskS3.cpp, from where it was copy-pasted
String getRandomName(char first = 'a', char last = 'z', size_t len = 64)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
WriteBufferFromBlobStorage::WriteBufferFromBlobStorage(
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_,
@ -52,7 +42,7 @@ void WriteBufferFromBlobStorage::nextImpl()
{
auto part_len = std::min(len - read, max_single_part_upload_size);
auto block_id = getRandomName();
auto block_id = getRandomName(64);
block_ids.push_back(block_id);
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(pos + read), part_len);

View File

@ -3,7 +3,7 @@
<disks>
<blob_storage_disk>
<type>blob_storage</type>
<endpoint>?</endpoint>
<endpoint>http://azurite1:10000/</endpoint>
<skip_access_check>true</skip_access_check>
</blob_storage_disk>
</disks>

View File

@ -61,4 +61,4 @@ def create_table(node, table_name, **additional_settings):
)
def test_simple(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "blob_storage_test")
create_table(node, "blob_storage_test")