Correct merge

This commit is contained in:
kssenii 2021-10-10 20:24:36 +03:00
parent fde6f0507d
commit e3d2942aa6
6 changed files with 61 additions and 38 deletions

View File

@ -168,9 +168,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta, getContext(),
read_settings,
threadpool_read);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta, getContext(), threadpool_read, read_settings);
if (threadpool_read)
{

View File

@ -36,7 +36,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const
{
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries, threadpool_read);
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read);
}

View File

@ -6,6 +6,7 @@
#include <Disks/IDiskRemote.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
namespace Aws
{
@ -92,17 +93,13 @@ public:
const String & uri_,
RemoteMetadata metadata_,
ContextPtr context_,
size_t buf_size_,
size_t backoff_threshold_,
size_t max_tries_,
size_t threadpool_read_)
size_t threadpool_read_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(metadata_)
, uri(uri_)
, context(context_)
, buf_size(buf_size_)
, backoff_threshold(backoff_threshold_)
, max_tries(max_tries_)
, threadpool_read(threadpool_read_)
, settings(settings_)
{
}
@ -111,10 +108,8 @@ public:
private:
String uri;
ContextPtr context;
size_t buf_size;
size_t backoff_threshold;
size_t max_tries;
bool threadpool_read;
ReadSettings settings;
};

View File

@ -20,6 +20,8 @@ namespace ErrorCodes
}
static constexpr size_t HTTP_MAX_TRIES = 10;
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_)
: SeekableReadBuffer(nullptr, 0)
@ -93,7 +95,22 @@ bool ReadBufferFromWebServer::nextImpl()
}
else
{
impl = initialize();
/// Initialize impl with retry.
auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES);
for (size_t i = 0; i < num_tries; ++i)
{
try
{
impl = initialize();
}
catch (Poco::Exception &)
{
if (i == num_tries - 1)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
auto result = impl->next();

View File

@ -41,9 +41,9 @@ private:
off_t offset = 0;
bool use_external_buffer;
ReadSettings read_settings;
bool use_external_buffer;
};
}

View File

@ -239,6 +239,16 @@ namespace detail
try
{
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
if (use_external_buffer)
{
/**
* See comment 30 lines lower.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
}
catch (const Poco::Exception & e)
{
@ -255,28 +265,31 @@ namespace detail
if (next_callback)
next_callback(count());
if (use_external_buffer)
if (impl)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data, each nextImpl() call we can fill
* a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
{
/**
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read, becuase
* this branch means we read sequentially.
*/
if (!working_buffer.empty())
impl->position() = position();
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data, each nextImpl() call we can fill
* a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
{
/**
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read, becuase
* this branch means we read sequentially.
*/
if (!working_buffer.empty())
impl->position() = position();
}
}
if (total_bytes_to_read && bytes_read == total_bytes_to_read.value())