Fix tests

This commit is contained in:
kssenii 2022-01-24 12:32:39 +03:00
parent 710bba895e
commit 898139acae
6 changed files with 23 additions and 19 deletions

View File

@ -126,10 +126,6 @@ private:
Poco::Logger * log;
bool startup_restore_finished = false;
/**
* Get list of file segments which intesect with `range`.
* If `key` is not in cache or there is not such range, return std::nullopt.
*/
FileSegments getImpl(
const Key & key, const FileSegment::Range & range,
[[maybe_unused]] std::lock_guard<std::mutex> & cache_lock);

View File

@ -4,6 +4,7 @@
#include <Common/FileCache.h>
#include <Common/CurrentThread.h>
#include <Common/filesystemHelpers.h>
#include <Common/tests/gtest_global_context.h>
#include <Common/SipHash.h>
#include <Common/hex.h>
#include <Interpreters/Context.h>
@ -95,10 +96,7 @@ TEST(LRUFileCache, get)
DB::ThreadStatus thread_status;
/// To work with cache need query_id and query context.
auto shared_context = DB::Context::createShared();
auto context = DB::Context::createGlobal(shared_context.get());
context->makeGlobalContext();
auto query_context = DB::Context::createCopy(context);
auto query_context = DB::Context::createCopy(getContext().context);
query_context->makeQueryContext();
query_context->setCurrentQueryId("query_id");
DB::CurrentThread::QueryScope query_scope_holder(query_context);
@ -343,7 +341,7 @@ TEST(LRUFileCache, get)
std::thread other_1([&]
{
DB::ThreadStatus thread_status_1;
auto query_context_1 = DB::Context::createCopy(context);
auto query_context_1 = DB::Context::createCopy(getContext().context);
query_context_1->makeQueryContext();
query_context_1->setCurrentQueryId("query_id_1");
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);
@ -410,7 +408,7 @@ TEST(LRUFileCache, get)
std::thread other_1([&]
{
DB::ThreadStatus thread_status_1;
auto query_context_1 = DB::Context::createCopy(context);
auto query_context_1 = DB::Context::createCopy(getContext().context);
query_context_1->makeQueryContext();
query_context_1->setCurrentQueryId("query_id_1");
DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1);

View File

@ -34,6 +34,7 @@ CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS(
, reader(reader_)
, settings(settings_)
, read_until_position(read_until_position_)
, use_external_buffer(settings_.remote_fs_method == RemoteFSReadMethod::threadpool)
{
}
@ -227,9 +228,17 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
assert(current_read_range.left <= file_offset_of_buffer_end);
assert(current_read_range.right >= file_offset_of_buffer_end);
assert(!internal_buffer.empty());
swap(*impl);
if (use_external_buffer)
{
assert(!internal_buffer.empty());
swap(*impl);
}
else
{
impl->position() = position();
assert(!impl->hasPendingData());
}
bool result;
auto & file_segment = *current_file_segment_it;
@ -292,7 +301,10 @@ bool CacheableReadBufferFromRemoteFS::nextImpl()
}
}
swap(*impl);
if (use_external_buffer)
swap(*impl);
else
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
if (file_offset_of_buffer_end > current_read_range.right)
completeFileSegmentAndGetNext();

View File

@ -52,6 +52,9 @@ private:
SeekableReadBufferPtr impl;
bool initialized = false;
/// Flag to identify usage of threadpool reads
bool use_external_buffer;
enum class ReadType
{
CACHED,

View File

@ -138,8 +138,6 @@ void ReadBufferFromRemoteFSGather::initialize()
bool ReadBufferFromRemoteFSGather::nextImpl()
{
assert(!internal_buffer.empty());
/// Find first available buffer that fits to given offset.
if (!current_buf)
initialize();

View File

@ -14,8 +14,7 @@ namespace ErrorCodes
ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_)
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, impl(std::move(impl_))
: impl(std::move(impl_))
{
}
@ -74,8 +73,6 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
bool ReadIndirectBufferFromRemoteFS::nextImpl()
{
assert(!internal_buffer.empty());
/// Transfer current position and working_buffer to actual ReadBuffer
swap(*impl);
/// Position and working_buffer will be updated in next() call