Some clean up before merging

This commit is contained in:
kssenii 2021-10-29 01:04:52 +03:00
parent 7e2ea97e3c
commit 4f3433b4e1
7 changed files with 54 additions and 76 deletions

View File

@ -523,11 +523,6 @@ class IColumn;
M(Int64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
\
M(Int64, http_max_tries, 1, "Max attempts to read via http.", 0) \
M(Int64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
M(Int64, http_retry_max_backoff_ms, 10000, "Max milliseconds for backoff, when retrying read via http", 0) \
M(Bool, http_retriable_read, true, "Allow to resume reading via http if some error occurred. Reading will continue starting from last read byte (with `range` header)", 0) \
\
M(Bool, force_remove_data_recursively_on_drop, false, "Recursively remove data on DROP query. Avoids 'Directory not empty' error, but may silently remove detached data", 0) \
\
/** Experimental functions */ \

View File

@ -101,6 +101,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
if (prefetch_future.valid())
return;
/// Check boundary, which was set in readUntilPosition().
if (!hasPendingDataToRead())
return;

View File

@ -22,6 +22,7 @@ namespace ErrorCodes
static constexpr size_t HTTP_MAX_TRIES = 10;
static constexpr size_t WAIT_INIT = 100;
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_,
@ -87,12 +88,8 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
void ReadBufferFromWebServer::initializeWithRetry()
{
/// Initialize impl with retry.
auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES);
size_t milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms;
bool initialized = false;
for (size_t i = 0; (i < num_tries) && !initialized; ++i)
{
while (milliseconds_to_wait < read_settings.http_retry_max_backoff_ms)
size_t milliseconds_to_wait = WAIT_INIT;
for (size_t i = 0; i < HTTP_MAX_TRIES; ++i)
{
try
{
@ -108,12 +105,11 @@ void ReadBufferFromWebServer::initializeWithRetry()
assert(!internal_buffer.empty());
}
initialized = true;
break;
}
catch (Poco::Exception & e)
{
if (i == num_tries - 1)
if (i == HTTP_MAX_TRIES - 1)
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), "Error: {}, code: {}", e.what(), e.code());
@ -121,8 +117,6 @@ void ReadBufferFromWebServer::initializeWithRetry()
milliseconds_to_wait *= 2;
}
}
milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms;
}
}

View File

@ -67,8 +67,9 @@ bool ReadBufferFromS3::nextImpl()
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data.
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
@ -77,11 +78,8 @@ bool ReadBufferFromS3::nextImpl()
else
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data, each nextImpl() call we can fill
* a different buffer.
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read.
*/
impl->position() = position();
assert(!impl->hasPendingData());
@ -171,6 +169,10 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
req.SetBucket(bucket);
req.SetKey(key);
/**
* If remote_filesustem_method = 'read_threadpool', then for MergeTree family tables
* exact byte ranges to read are always passed here.
*/
if (read_until_position)
{
if (offset >= read_until_position)

View File

@ -83,11 +83,6 @@ struct ReadSettings
/// If reading is done without final position set, throw logical_error.
bool must_read_until_position = false;
bool http_retriable_read = true;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 10000;
ReadSettings adjustBufferSize(size_t file_size) const
{
ReadSettings res = *this;

View File

@ -229,16 +229,14 @@ namespace detail
if (next_callback)
next_callback(count());
if (impl)
{
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data, each nextImpl() call we can fill
* a different buffer.
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
@ -248,15 +246,13 @@ namespace detail
{
/**
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read, because
* this branch means we read sequentially.
* sure there is no pending data which was not read.
*/
if (!working_buffer.empty())
impl->position() = position();
}
}
if (impl && !working_buffer.empty())
if (!working_buffer.empty())
impl->position() = position();
if (!impl->next())

View File

@ -3085,11 +3085,6 @@ ReadSettings Context::getReadSettings() const
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.http_retriable_read = settings.http_retriable_read;
res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
res.mmap_cache = getMMappedFileCache().get();
return res;