Starting insertion pipeline

This commit is contained in:
Jakub Kuklis 2021-10-01 07:52:30 +00:00 committed by Jakub Kuklis
parent 6436f5b04d
commit fb4c1d6686

View File

@ -3,11 +3,26 @@
#if USE_AZURE_BLOB_STORAGE
#include <iostream>
#include <random>
#include <common/logger_useful.h>
namespace DB
{
namespace
{
String getRandomName()
{
std::uniform_int_distribution<int> distribution('a', 'z');
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
}
class BlobStoragePathKeeper : public RemoteFSPathKeeper
{
public:
@ -49,22 +64,16 @@ DiskBlobStorage::DiskBlobStorage() : IDiskRemote("blob_storage", "https://sadttm
DiskBlobStorage::DiskBlobStorage(
const String & name_,
const String & metadata_path_,
const String & endpoint_url,
std::shared_ptr<Azure::Identity::ManagedIdentityCredential>,
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
const String & /* endpoint_url */,
std::shared_ptr<Azure::Identity::ManagedIdentityCredential> /* managed_identity_credential_ */,
Azure::Storage::Blobs::BlobContainerClient /* blob_container_client_ */,
size_t thread_pool_size_) :
IDiskRemote(name_, endpoint_url /* or maybe "" ? */, metadata_path_, "DiskBlobStorage", thread_pool_size_)
IDiskRemote(name_, "" /* or maybe endpoint_url ?*/, metadata_path_, "DiskBlobStorage", thread_pool_size_)
{
// list blobs in the container
auto list_blobs = blob_container_client_.ListBlobs();
// print information about the container
std::cout << "Storage account: " << list_blobs.ServiceEndpoint
<< ", container: " << list_blobs.BlobContainerName << "\n\n\n";
}
std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
const String &,
size_t,
@ -85,18 +94,18 @@ std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
const String &,
const String & path,
size_t,
WriteMode)
WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
auto blob_path = getRandomName();
LOG_DEBUG(log, "{} to file by path: {}. Blob Storage path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + blob_path);
auto buffer = std::make_unique<WriteBufferFromBlobStorage>();
std::string path = "/home/jkuklis/blob_storage/file";
std::string arg1 = "/home/jkuklis/blob_storage/file";
std::string arg2 = "/home/jkuklis/blob_storage/file";
std::string arg3 = "/home/jkuklis/blob_storage/file";
IDiskRemote::Metadata metadata(arg1, arg2, arg3, true);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>>(std::move(buffer), std::move(metadata), path);
}