mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Managed to read markers file in SELECT, getting error for CODEC version now
This commit is contained in:
parent
482a10e62e
commit
03f85dee41
@ -9,19 +9,19 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
// namespace
|
||||
// {
|
||||
|
||||
String getRandomName()
|
||||
{
|
||||
std::uniform_int_distribution<int> distribution('a', 'z');
|
||||
String res(32, ' '); /// The number of bits of entropy should be not less than 128.
|
||||
for (auto & c : res)
|
||||
c = distribution(thread_local_rng);
|
||||
return res;
|
||||
}
|
||||
// String getRandomName()
|
||||
// {
|
||||
// std::uniform_int_distribution<int> distribution('a', 'z');
|
||||
// String res(32, ' '); /// The number of bits of entropy should be not less than 128.
|
||||
// for (auto & c : res)
|
||||
// c = distribution(thread_local_rng);
|
||||
// return res;
|
||||
// }
|
||||
|
||||
}
|
||||
// }
|
||||
|
||||
class BlobStoragePathKeeper : public RemoteFSPathKeeper
|
||||
{
|
||||
@ -38,14 +38,26 @@ public:
|
||||
class ReadIndirectBufferFromBlobStorage final : public ReadIndirectBufferFromRemoteFS<ReadBufferFromBlobStorage>
|
||||
{
|
||||
public:
|
||||
ReadIndirectBufferFromBlobStorage(IDiskRemote::Metadata metadata_) :
|
||||
ReadIndirectBufferFromRemoteFS<ReadBufferFromBlobStorage>(metadata_)
|
||||
ReadIndirectBufferFromBlobStorage(
|
||||
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
|
||||
const String & main_path_,
|
||||
IDiskRemote::Metadata metadata_,
|
||||
size_t buf_size_) :
|
||||
ReadIndirectBufferFromRemoteFS<ReadBufferFromBlobStorage>(metadata_),
|
||||
blob_container_client(blob_container_client_),
|
||||
main_path(main_path_),
|
||||
buf_size(buf_size_)
|
||||
{}
|
||||
|
||||
std::unique_ptr<ReadBufferFromBlobStorage> createReadBuffer(const String &) override
|
||||
std::unique_ptr<ReadBufferFromBlobStorage> createReadBuffer(const String & path) override
|
||||
{
|
||||
return std::make_unique<ReadBufferFromBlobStorage>();
|
||||
return std::make_unique<ReadBufferFromBlobStorage>(blob_container_client, main_path, metadata.remote_fs_root_path + path, buf_size);
|
||||
}
|
||||
|
||||
private:
|
||||
Azure::Storage::Blobs::BlobContainerClient blob_container_client;
|
||||
const String & main_path;
|
||||
size_t buf_size;
|
||||
};
|
||||
|
||||
|
||||
@ -76,20 +88,19 @@ DiskBlobStorage::DiskBlobStorage(
|
||||
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> DiskBlobStorage::readFile(
|
||||
const String &,
|
||||
size_t,
|
||||
const String & path,
|
||||
size_t buf_size,
|
||||
size_t,
|
||||
size_t,
|
||||
size_t,
|
||||
MMappedFileCache *) const
|
||||
{
|
||||
std::string arg1 = "/home/jkuklis/blob_storage/file";
|
||||
std::string arg2 = "/home/jkuklis/blob_storage/file";
|
||||
std::string arg3 = "/home/jkuklis/blob_storage/file";
|
||||
auto metadata = readMeta(path);
|
||||
|
||||
IDiskRemote::Metadata metadata(arg1, arg2, arg3, true);
|
||||
LOG_DEBUG(log, "Read from file by path: {}. Existing Blob Storage objects: {}",
|
||||
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
|
||||
|
||||
auto reader = std::make_unique<ReadIndirectBufferFromBlobStorage>(metadata);
|
||||
auto reader = std::make_unique<ReadIndirectBufferFromBlobStorage>(blob_container_client, path, metadata, buf_size);
|
||||
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), 32);
|
||||
}
|
||||
|
||||
@ -101,14 +112,14 @@ std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
|
||||
{
|
||||
auto metadata = readOrCreateMetaForWriting(path, mode);
|
||||
|
||||
auto blob_path = getRandomName();
|
||||
auto blob_path = path; // getRandomName();
|
||||
|
||||
LOG_DEBUG(log, "{} to file by path: {}. Blob Storage path: {}",
|
||||
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + blob_path);
|
||||
|
||||
auto buffer = std::make_unique<WriteBufferFromBlobStorage>(blob_container_client, metadata.remote_fs_root_path + blob_path, buf_size);
|
||||
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>>(std::move(buffer), std::move(metadata), path);
|
||||
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromBlobStorage>>(std::move(buffer), std::move(metadata), blob_path);
|
||||
}
|
||||
|
||||
|
||||
@ -215,6 +226,48 @@ void blob_do_sth()
|
||||
// << ", size: " << blobList.Value.BlobSize << "\n";
|
||||
}
|
||||
|
||||
|
||||
void blob_do_sth_2()
|
||||
{
|
||||
using namespace Azure::Storage::Blobs;
|
||||
|
||||
auto managedIdentityCredential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
|
||||
|
||||
auto url = "https://sadttmpstgeus.blob.core.windows.net/data";
|
||||
|
||||
auto blobContainerClient = BlobContainerClient(url, managedIdentityCredential);
|
||||
|
||||
auto blobClient = blobContainerClient.GetBlobClient("hello");
|
||||
|
||||
int buf_size = 80;
|
||||
|
||||
std::vector<uint8_t> v(buf_size);
|
||||
|
||||
blobClient.DownloadTo(v.data(), buf_size);
|
||||
|
||||
std::cout << "v contents (" << v.size() << "): ";
|
||||
|
||||
for (auto i : v)
|
||||
{
|
||||
std::cout << static_cast<int>(i) << " ";
|
||||
}
|
||||
|
||||
std::cout << "\n";
|
||||
|
||||
auto downloaded_data = blobClient.Download();
|
||||
|
||||
auto f = downloaded_data.RawResponse->GetBody();
|
||||
|
||||
std::cout << "f contents (" << f.size() << "): ";
|
||||
|
||||
for (auto i : f)
|
||||
{
|
||||
std::cout << static_cast<int>(i) << " ";
|
||||
}
|
||||
|
||||
std::cout << "\n";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -0,0 +1,130 @@
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
#include <Common/config.h>
|
||||
#endif
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include <IO/ReadBufferFromBlobStorage.h>
|
||||
// #include <IO/ReadBufferFromIStream.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||
}
|
||||
|
||||
|
||||
ReadBufferFromBlobStorage::ReadBufferFromBlobStorage(
|
||||
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
|
||||
const String & main_path_,
|
||||
const String & path_,
|
||||
size_t buf_size_) :
|
||||
SeekableReadBuffer(nullptr, 0),
|
||||
blob_container_client(blob_container_client_),
|
||||
main_path(main_path_),
|
||||
path(path_),
|
||||
buf_size(buf_size_) {}
|
||||
|
||||
|
||||
bool ReadBufferFromBlobStorage::nextImpl()
|
||||
{
|
||||
bool next_result = false;
|
||||
|
||||
if (impl)
|
||||
{
|
||||
/// `impl` has been initialized earlier and now we're at the end of the current portion of data.
|
||||
impl->position() = position();
|
||||
assert(!impl->hasPendingData());
|
||||
}
|
||||
else
|
||||
{
|
||||
/// `impl` is not initialized and we're about to read the first portion of data.
|
||||
impl = initialize();
|
||||
next_result = impl->hasPendingData();
|
||||
}
|
||||
|
||||
if (!next_result)
|
||||
{
|
||||
/// Try to read a next portion of data.
|
||||
next_result = impl->next();
|
||||
}
|
||||
|
||||
// std::cout << "\nReadBufferFromBlobStorage::nextImpl next_result: " << next_result << "\n";
|
||||
|
||||
if (!next_result)
|
||||
return false;
|
||||
|
||||
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
|
||||
|
||||
offset += working_buffer.size();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
off_t ReadBufferFromBlobStorage::seek(off_t offset_, int whence)
|
||||
{
|
||||
if (impl)
|
||||
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
|
||||
if (whence != SEEK_SET)
|
||||
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||
|
||||
if (offset_ < 0)
|
||||
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset_), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
||||
|
||||
offset = offset_;
|
||||
|
||||
return offset;
|
||||
}
|
||||
|
||||
|
||||
off_t ReadBufferFromBlobStorage::getPosition()
|
||||
{
|
||||
return offset - available();
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromBlobStorage::initialize()
|
||||
{
|
||||
std::cout << "path: " << path << "\n";
|
||||
std::cout << "main_path: " << main_path << "\n";
|
||||
std::cout << "buf_size: " << buf_size << "\n";
|
||||
|
||||
std::cout << "blob_container_client.GetUrl(): " << blob_container_client.GetUrl() << "\n";
|
||||
|
||||
auto blob_client = blob_container_client.GetBlobClient(path);
|
||||
// auto blob_client = blob_container_client.GetBlobClient("cwtwuujiyxiqpylavpoziotxlygiaias");
|
||||
|
||||
std::vector<uint8_t> v(buf_size);
|
||||
|
||||
blob_client.DownloadTo(v.data(), buf_size);
|
||||
|
||||
// std::cout << "v contents (" << v.size() << "): ";
|
||||
|
||||
// for (auto i : v)
|
||||
// {
|
||||
// std::cout << static_cast<int>(i) << " ";
|
||||
// }
|
||||
|
||||
// std::cout << "\n";
|
||||
|
||||
// auto downloaded_data = blob_client.Download();
|
||||
|
||||
return std::make_unique<ReadBufferFromString>(v);
|
||||
|
||||
// return std::make_unique<ReadBufferFromString>(downloaded_data.RawResponse->GetBody());
|
||||
// return std::make_unique<ReadBufferFromIStream>(downloaded_data.RawResponse->ExtractBodyStream(), buf_size);
|
||||
// return std::make_unique<ReadBufferFromIStream>(downloaded_data.Value.BodyStream, buf_size);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -7,6 +7,7 @@
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <azure/storage/blobs.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,12 +15,29 @@ namespace DB
|
||||
class ReadBufferFromBlobStorage : public SeekableReadBuffer
|
||||
{
|
||||
public:
|
||||
explicit ReadBufferFromBlobStorage() :
|
||||
SeekableReadBuffer(nullptr, 0)
|
||||
{}
|
||||
explicit ReadBufferFromBlobStorage(
|
||||
Azure::Storage::Blobs::BlobContainerClient blob_container_client_,
|
||||
const String & main_path_,
|
||||
const String & path_,
|
||||
size_t buf_size_
|
||||
);
|
||||
|
||||
off_t seek(off_t off, int whence) override;
|
||||
off_t getPosition() override;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBuffer> impl;
|
||||
off_t offset = 0;
|
||||
|
||||
Azure::Storage::Blobs::BlobContainerClient blob_container_client;
|
||||
const String & main_path;
|
||||
const String & path;
|
||||
size_t buf_size;
|
||||
|
||||
std::unique_ptr<ReadBuffer> initialize();
|
||||
|
||||
off_t seek(off_t, int) override { return 0; }
|
||||
off_t getPosition() override { return 0; }
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user