This commit is contained in:
SmitaRKulkarni 2024-03-21 21:17:57 +00:00 committed by GitHub
commit 3822e41f7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 72 additions and 5 deletions

View File

@ -8,7 +8,9 @@
#include <Common/Throttler.h>
#include <base/sleep.h>
#include <Common/ProfileEvents.h>
#include <IO/SeekableReadBuffer.h>
#include <sstream>
namespace ProfileEvents
{
@ -27,7 +29,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & path_,
@ -56,7 +57,6 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
}
}
void ReadBufferFromAzureBlobStorage::setReadUntilEnd()
{
if (read_until_position)
@ -139,7 +139,6 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
return true;
}
off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence)
{
if (offset_ == getPosition() && whence == SEEK_SET)
@ -193,13 +192,11 @@ off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence)
return offset;
}
off_t ReadBufferFromAzureBlobStorage::getPosition()
{
return offset - available();
}
void ReadBufferFromAzureBlobStorage::initialize()
{
if (initialized)
@ -262,6 +259,49 @@ size_t ReadBufferFromAzureBlobStorage::getFileSize()
return *file_size;
}
size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const
{
size_t initial_n = n;
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t i = 0; i < max_single_download_retries && n > 0; ++i)
{
size_t bytes_copied = 0;
try
{
Azure::Storage::Blobs::DownloadBlobOptions download_options;
download_options.Range = {static_cast<int64_t>(range_begin), n};
auto download_response = blob_client->Download(download_options);
std::unique_ptr<Azure::Core::IO::BodyStream> body_stream = std::move(download_response.Value.BodyStream);
auto bytes = body_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), body_stream->Length());
std::istringstream string_stream(String(static_cast<char *>(data_ptr),bytes)); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
copyFromIStreamWithProgressCallback(string_stream, to, n, progress_callback, &bytes_copied);
LOG_INFO(log, "AzureBlobStorage readBigAt read bytes {}", bytes_copied);
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_copied, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
}
catch (const Azure::Core::RequestFailedException & e)
{
LOG_INFO(log, "Exception caught during Azure Download for file {} at offset {} at attempt {}/{}: {}", path, offset, i + 1, max_single_download_retries, e.Message);
if (i + 1 == max_single_download_retries)
throw;
sleepForMilliseconds(sleep_time_with_backoff_milliseconds);
sleep_time_with_backoff_milliseconds *= 2;
}
range_begin += bytes_copied;
to += bytes_copied;
n -= bytes_copied;
}
return initial_n;
}
}
#endif

View File

@ -44,6 +44,10 @@ public:
size_t getFileSize() override;
size_t readBigAt(char * to, size_t n, size_t range_begin, const std::function<bool(size_t)> & progress_callback) const override;
bool supportsReadAt() override { return true; }
private:
void initialize();

View File

@ -16,6 +16,7 @@ from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
from helpers.test_tools import assert_logs_contain_with_retry
@pytest.fixture(scope="module")
@ -1320,3 +1321,25 @@ def test_format_detection(cluster):
)
assert result == expected_result
def test_parallel_read(cluster):
node = cluster.instances["node"]
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_parallel_read.parquet', '{account_name}', '{account_key}') "
f"select * from numbers(10000) settings azure_truncate_on_insert=1",
)
time.sleep(1)
res = azure_query(
node,
f"select count() from azureBlobStorage('{connection_string}', 'cont', 'test_parallel_read.parquet')",
)
assert int(res) == 10000
assert_logs_contain_with_retry(node, "AzureBlobStorage readBigAt read bytes")