Copy and paste

This commit is contained in:
alesapin 2023-06-06 13:01:21 +02:00
parent 3d99abee43
commit d497562a07
4 changed files with 89 additions and 6 deletions

View File

@ -35,6 +35,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
size_t max_single_read_retries_,
size_t max_single_download_retries_,
bool use_external_buffer_,
bool restricted_seek_,
size_t read_until_position_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0)
, blob_container_client(blob_container_client_)
@ -44,6 +45,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
, read_settings(read_settings_)
, tmp_buffer_size(read_settings.remote_fs_buffer_size)
, use_external_buffer(use_external_buffer_)
, restricted_seek(restricted_seek_)
, read_until_position(read_until_position_)
{
if (!use_external_buffer)
@ -118,8 +120,17 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence)
{
if (initialized)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer.");
if (offset_ == getPosition() && whence == SEEK_SET)
return offset_;
if (initialized && restricted_seek)
{
throw Exception(
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Seek is allowed only before first read attempt from the buffer (current offset: "
"{}, new offset: {}, reading until position: {}, available: {})",
getPosition(), offset_, read_until_position, available());
}
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed.");
@ -127,8 +138,36 @@ off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence)
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);
offset = offset_;
if (!restricted_seek)
{
if (!working_buffer.empty()
&& static_cast<size_t>(offset_) >= offset - working_buffer.size()
&& offset_ < offset)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end());
return getPosition();
}
off_t position = getPosition();
if (initialized && offset_ > position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}
resetWorkingBuffer();
if (initialized)
initialized = false;
}
offset = offset_;
return offset;
}
@ -152,7 +191,8 @@ void ReadBufferFromAzureBlobStorage::initialize()
download_options.Range = {static_cast<int64_t>(offset), length};
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
if (!blob_client)
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t i = 0; i < max_single_download_retries; ++i)
@ -182,6 +222,18 @@ void ReadBufferFromAzureBlobStorage::initialize()
initialized = true;
}
size_t ReadBufferFromAzureBlobStorage::getFileSize()
{
if (!blob_client)
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
if (file_size.has_value())
return *file_size;
file_size = blob_client->GetProperties().Value.BlobSize;
return *file_size;
}
}
#endif

View File

@ -24,6 +24,7 @@ public:
size_t max_single_read_retries_,
size_t max_single_download_retries_,
bool use_external_buffer_ = false,
bool restricted_seek_ = false,
size_t read_until_position_ = 0);
off_t seek(off_t off, int whence) override;
@ -40,6 +41,8 @@ public:
bool supportsRightBoundedReads() const override { return true; }
size_t getFileSize() override;
private:
void initialize();
@ -55,6 +58,12 @@ private:
std::vector<char> tmp_buffer;
size_t tmp_buffer_size;
bool use_external_buffer;
/// There is different seek policy for disk seek and for non-disk seek
/// (non-disk seek is applied for seekable input formats: orc, arrow, parquet).
bool restricted_seek;
off_t read_until_position = 0;
off_t offset = 0;

View File

@ -453,7 +453,7 @@ void StorageAzure::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextP
for (const auto & key : configuration.blobs_paths)
objects.emplace_back(key);
object_storage->removeObjects(objects);
object_storage->removeObjectsIfExist(objects);
}
namespace

View File

@ -199,5 +199,27 @@ def test_simple_read_write(cluster):
azure_query(node, "INSERT INTO test_simple_read_write VALUES (1, 'a')")
assert get_azure_file_content("test_simple_read_write.csv") == '1,"a"\n'
print(azure_query(node, "SELECT * FROM test_simple_read_write"))
assert azure_query(node, "SELECT * FROM test_simple_read_write") == "1\ta\n"
def test_create_new_files_on_insert(cluster):
node = cluster.instances["node"]
azure_query(node, f"create table test_multiple_inserts(a Int32, b String) ENGINE = Azure(azure_conf2, container='cont', blob_path='test_parquet', format='Parquet')")
azure_query(node, "truncate table test_multiple_inserts")
azure_query(node,
f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings azure_truncate_on_insert=1"
)
azure_query(node,
f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings azure_create_new_file_on_insert=1"
)
azure_query(node,
f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings azure_create_new_file_on_insert=1"
)
result = azure_query(node, f"select count() from test_multiple_inserts")
assert int(result) == 60
azure_query(node, f"drop table test_multiple_inserts")