Fix prefetch mistake

This commit is contained in:
kssenii 2021-09-21 15:40:48 +03:00
parent 40ee75fe97
commit e301457e91
5 changed files with 17 additions and 9 deletions

View File

@ -38,8 +38,6 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
auto remote_fd = std::make_shared<ThreadPoolRemoteFSReader::RemoteFSFileDescriptor>();
remote_fd->impl = impl;
impl->position() = position();
assert(!impl->hasPendingData());
request.descriptor = std::move(remote_fd);
request.priority = priority;
@ -52,6 +50,12 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
{
if (prefetch_future.valid())
return;
if (impl->initialized())
{
impl->position() = impl->buffer().end(); /// May be should try to do this differently.
assert(!impl->hasPendingData());
}
prefetch_future = readNext();
}
@ -62,7 +66,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
if (prefetch_future.valid())
{
std::cerr << "Having prefetched data\n";
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
Stopwatch watch;
size = prefetch_future.get();
@ -71,7 +74,11 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
else
{
std::cerr << "No prefetched data\n";
if (impl->initialized())
{
impl->position() = position();
assert(!impl->hasPendingData());
}
size = readNext().get();
}

View File

@ -20,6 +20,8 @@ public:
RestartAwareReadBuffer(const DiskRestartProxy & disk, std::unique_ptr<ReadBufferFromFileBase> impl_)
: ReadBufferFromFileDecorator(std::move(impl_)), lock(disk.mutex) { }
void prefetch() override { impl->prefetch(); }
private:
ReadLock lock;
};

View File

@ -25,6 +25,8 @@ public:
void reset();
bool initialized() const { return current_buf != nullptr; }
protected:
virtual SeekableReadBufferPtr createReadBuffer(const String & path) const = 0;
RemoteMetadata metadata;

View File

@ -20,7 +20,7 @@ public:
std::future<Result> submit(Request request) override;
struct RemoteFSFileDescriptor : IFileDescriptor
struct RemoteFSFileDescriptor : public IFileDescriptor
{
ReadBufferPtr impl;
};

View File

@ -161,9 +161,6 @@ def drop_table(cluster, node_name):
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)
def assert_with_read_method(node, query, read_method, expected):
assert node.query("SET remote_filesystem_read_method = '{}';{}".format(read_method, query)) == expected
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_per_part,node_name",
@ -189,7 +186,7 @@ def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part, n
assert node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + files_per_part * 2
assert node.query("SELECT count(*) FROM s3_test where id = 1 FORMAT Values") == "(2)"
assert node.query("SELECT count(*) FROM s3_test where id = 1 FORMAT Values") == "(2)"
@pytest.mark.parametrize(