Finally all fixed

This commit is contained in:
kssenii 2021-09-30 15:35:59 +03:00
parent 85aae03a26
commit cc9ce13bac
5 changed files with 53 additions and 13 deletions

View File

@ -27,7 +27,8 @@ namespace ErrorCodes
AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS( AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS(
AsynchronousReaderPtr reader_, Int32 priority_, std::shared_ptr<ReadBufferFromRemoteFS> impl_) AsynchronousReaderPtr reader_, Int32 priority_, std::shared_ptr<ReadBufferFromRemoteFS> impl_)
: reader(reader_), priority(priority_), impl(impl_) : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, reader(reader_), priority(priority_), impl(impl_), prefetch_buffer(DBMS_DEFAULT_BUFFER_SIZE)
{ {
} }
@ -48,6 +49,9 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
return; return;
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
assert(prefetch_buffer.data() != nullptr);
prefetch_buffer.resize(DBMS_DEFAULT_BUFFER_SIZE);
impl->set(prefetch_buffer.data(), prefetch_buffer.size());
prefetch_future = readNext(); prefetch_future = readNext();
} }
@ -65,9 +69,9 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
size = prefetch_future.get(); size = prefetch_future.get();
if (size) if (size)
{ {
set(impl->buffer().begin(), impl->buffer().size()); memory.swap(prefetch_buffer);
set(memory.data(), memory.size());
working_buffer.resize(size); working_buffer.resize(size);
impl->reset();
absolute_position += size; absolute_position += size;
} }
} }
@ -77,12 +81,15 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
else else
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
impl->check = true;
impl->set(memory.data(), memory.size());
assert(memory.data() != nullptr);
assert(impl->buffer().begin() != nullptr);
size = readNext().get(); size = readNext().get();
if (size) if (size)
{ {
set(impl->buffer().begin(), impl->buffer().size()); set(memory.data(), memory.size());
working_buffer.resize(size); working_buffer.resize(size);
impl->reset();
absolute_position += size; absolute_position += size;
} }
} }

View File

@ -14,7 +14,17 @@
namespace DB namespace DB
{ {
/// Reads data from S3/HDFS/Web using stored paths in metadata. /**
* Reads data from S3/HDFS/Web using stored paths in metadata.
*
* Buffers chain for diskS3:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadBufferFromS3 -> ReadBufferFromIStream.
*
* Buffers chain for diskWeb:
* AsynchronousIndirectReadBufferFromRemoteFS -> ReadBufferFromRemoteFS ->
* -> ReadIndirectBufferFromWebServer -> ReadBufferFromHttp -> ReadBufferFromIStream.
*/
class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{ {
public: public:
@ -31,6 +41,8 @@ public:
void prefetch() override; void prefetch() override;
bool check = false;
private: private:
bool nextImpl() override; bool nextImpl() override;
@ -45,6 +57,7 @@ private:
size_t absolute_position = 0; size_t absolute_position = 0;
std::mutex mutex; std::mutex mutex;
Memory<> prefetch_buffer;
}; };
} }

View File

@ -72,9 +72,16 @@ bool ReadBufferFromRemoteFS::nextImpl()
bool ReadBufferFromRemoteFS::read() bool ReadBufferFromRemoteFS::read()
{ {
if (check)
{
assert(!internal_buffer.empty());
assert(working_buffer.begin() != nullptr);
}
/// Transfer current position and working_buffer to actual ReadBuffer /// Transfer current position and working_buffer to actual ReadBuffer
swap(*current_buf); swap(*current_buf);
/// Position and working_buffer will be updated in next() call /// Position and working_buffer will be updated in next() call
if (check)
assert(current_buf->buffer().begin() != nullptr);
auto result = current_buf->next(); auto result = current_buf->next();
/// Assign result to current buffer. /// Assign result to current buffer.
swap(*current_buf); swap(*current_buf);
@ -111,7 +118,7 @@ void ReadBufferFromRemoteFS::reset(bool reset_inner_buf)
{ {
if (reset_inner_buf) if (reset_inner_buf)
current_buf.reset(); current_buf.reset();
BufferBase::set(nullptr, 0, 0); // BufferBase::set(nullptr, 0, 0);
} }
} }

View File

@ -22,6 +22,7 @@ public:
void reset(bool reset_inner_buf = false); void reset(bool reset_inner_buf = false);
bool check = false;
protected: protected:
size_t fetch(size_t offset); size_t fetch(size_t offset);

View File

@ -50,16 +50,12 @@ bool ReadBufferFromS3::nextImpl()
/// `impl` has been initialized earlier and now we're at the end of the current portion of data. /// `impl` has been initialized earlier and now we're at the end of the current portion of data.
if (impl) if (impl)
{ {
if (use_external_buffer) if (!use_external_buffer)
{
impl->set(working_buffer.begin(), working_buffer.size());
}
else
{ {
impl->position() = position(); impl->position() = position();
assert(!impl->hasPendingData());
} }
assert(!impl->hasPendingData());
} }
else else
{ {
@ -67,6 +63,14 @@ bool ReadBufferFromS3::nextImpl()
impl = initialize(); impl = initialize();
next_result = impl->hasPendingData(); next_result = impl->hasPendingData();
} }
if (use_external_buffer)
{
// assert(!impl->buffer().begin() || impl->buffer().begin() != working_buffer.begin());
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
// impl->BufferBase::set(nullptr, 0, 0);
}
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100); auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt) for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
@ -99,6 +103,14 @@ bool ReadBufferFromS3::nextImpl()
/// Try to reinitialize `impl`. /// Try to reinitialize `impl`.
impl.reset(); impl.reset();
impl = initialize(); impl = initialize();
if (use_external_buffer)
{
// assert(!impl->buffer().begin() || impl->buffer().begin() != working_buffer.begin());
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
// impl->BufferBase::set(nullptr, 0, 0);
}
next_result = impl->hasPendingData(); next_result = impl->hasPendingData();
} }
} }