Resubmit prefetches

This commit is contained in:
kssenii 2023-02-07 18:50:31 +01:00
parent b43ffb98e8
commit b0b865c32e
78 changed files with 1731 additions and 215 deletions

View File

@ -20,17 +20,22 @@ struct ProfileEventTimeIncrement
explicit ProfileEventTimeIncrement<time>(ProfileEvents::Event event_) explicit ProfileEventTimeIncrement<time>(ProfileEvents::Event event_)
: event(event_), watch(CLOCK_MONOTONIC) {} : event(event_), watch(CLOCK_MONOTONIC) {}
UInt64 elapsed()
{
if constexpr (time == Time::Nanoseconds)
return watch.elapsedNanoseconds();
else if constexpr (time == Time::Microseconds)
return watch.elapsedMicroseconds();
else if constexpr (time == Time::Milliseconds)
return watch.elapsedMilliseconds();
else if constexpr (time == Time::Seconds)
return watch.elapsedSeconds();
}
~ProfileEventTimeIncrement() ~ProfileEventTimeIncrement()
{ {
watch.stop(); watch.stop();
if constexpr (time == Time::Nanoseconds) ProfileEvents::increment(event, elapsed());
ProfileEvents::increment(event, watch.elapsedNanoseconds());
else if constexpr (time == Time::Microseconds)
ProfileEvents::increment(event, watch.elapsedMicroseconds());
else if constexpr (time == Time::Milliseconds)
ProfileEvents::increment(event, watch.elapsedMilliseconds());
else if constexpr (time == Time::Seconds)
ProfileEvents::increment(event, watch.elapsedSeconds());
} }
ProfileEvents::Event event; ProfileEvents::Event event;

View File

@ -348,6 +348,7 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \ M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \
\ \
M(ReadBufferFromS3Microseconds, "Time spend in reading from S3.") \ M(ReadBufferFromS3Microseconds, "Time spend in reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spend initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \ M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \ M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
\ \
@ -375,6 +376,8 @@ The server successfully detected this situation and will download merged part fr
M(RemoteFSLazySeeks, "Number of lazy seeks") \ M(RemoteFSLazySeeks, "Number of lazy seeks") \
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \ M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \ M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
M(MergeTreePrefetchedReadPoolInit, "Time spent preparing tasks in MergeTreePrefetchedReadPool") \
M(WaitPrefetchTaskMicroseconds, "Time spend waiting for prefetched reader") \
\ \
M(ThreadpoolReaderTaskMicroseconds, "Time spent getting the data in asynchronous reading") \ M(ThreadpoolReaderTaskMicroseconds, "Time spent getting the data in asynchronous reading") \
M(ThreadpoolReaderReadBytes, "Bytes read from a threadpool task in asynchronous reading") \ M(ThreadpoolReaderReadBytes, "Bytes read from a threadpool task in asynchronous reading") \

View File

@ -55,7 +55,8 @@ public:
UInt64 elapsedMilliseconds() const { return elapsedNanoseconds() / 1000000UL; } UInt64 elapsedMilliseconds() const { return elapsedNanoseconds() / 1000000UL; }
double elapsedSeconds() const { return static_cast<double>(elapsedNanoseconds()) / 1000000000ULL; } double elapsedSeconds() const { return static_cast<double>(elapsedNanoseconds()) / 1000000000ULL; }
UInt64 getStart() { return start_ns; } UInt64 getStart() const { return start_ns; }
UInt64 getEnd() const { return stop_ns; }
private: private:
UInt64 start_ns = 0; UInt64 start_ns = 0;

View File

@ -10,6 +10,7 @@
#include <Interpreters/TextLog.h> #include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h> #include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h> #include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h> #include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h> #include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/TransactionsInfoLog.h> #include <Interpreters/TransactionsInfoLog.h>

View File

@ -28,6 +28,7 @@
M(ProcessorProfileLogElement) \ M(ProcessorProfileLogElement) \
M(TextLogElement) \ M(TextLogElement) \
M(FilesystemCacheLogElement) \ M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) M(AsynchronousInsertLogElement)
namespace Poco namespace Poco

View File

@ -28,10 +28,10 @@ void CachedCompressedReadBuffer::initInput()
} }
void CachedCompressedReadBuffer::prefetch() void CachedCompressedReadBuffer::prefetch(int64_t priority)
{ {
initInput(); initInput();
file_in->prefetch(); file_in->prefetch(priority);
} }

View File

@ -36,7 +36,7 @@ private:
bool nextImpl() override; bool nextImpl() override;
void prefetch() override; void prefetch(int64_t priority) override;
/// Passed into file_in. /// Passed into file_in.
ReadBufferFromFileBase::ProfileCallback profile_callback; ReadBufferFromFileBase::ProfileCallback profile_callback;

View File

@ -51,9 +51,9 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadB
} }
void CompressedReadBufferFromFile::prefetch() void CompressedReadBufferFromFile::prefetch(int64_t priority)
{ {
file_in.prefetch(); file_in.prefetch(priority);
} }

View File

@ -43,7 +43,7 @@ private:
bool nextImpl() override; bool nextImpl() override;
void prefetch() override; void prefetch(int64_t priority) override;
public: public:
explicit CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false); explicit CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);

View File

@ -629,6 +629,15 @@ class IColumn;
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \ M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
\ \
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \ M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
M(Bool, enable_filesystem_read_prefetches_log, false, "Log to system.filesystem prefetch_log during query. Should be used only for testing or debugging, not recommended to be turned on by default", 0) \
M(Bool, allow_prefetched_read_pool_for_remote_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
M(Bool, allow_prefetched_read_pool_for_local_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
\
M(UInt64, filesystem_prefetch_step_bytes, 0, "Prefetch step in bytes. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_step_marks, 0, "Prefetch step in marks. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_min_bytes_for_single_read_task, "8Mi", "Do not parallelize within one file read less than this amount of bytes. E.g. one reader will not receive a read task of size less than this amount. This setting is recommended to avoid spikes of time for aws getObject requests to aws", 0) \
M(UInt64, filesystem_prefetch_max_memory_usage, "1Gi", "Maximum memory usage for prefetches. Zero means unlimited", 0) \
M(UInt64, filesystem_prefetches_limit, 0, "Maximum number of prefetches. Zero means unlimited. A setting `filesystem_prefetches_max_memory_usage` is more recommended if you want to limit the number of prefetches", 0) \
\ \
M(UInt64, use_structure_from_insertion_table_in_table_functions, 2, "Use structure from insertion table instead of schema inference from data. Possible values: 0 - disabled, 1 - enabled, 2 - auto", 0) \ M(UInt64, use_structure_from_insertion_table_in_table_functions, 2, "Use structure from insertion table instead of schema inference from data. Possible values: 0 - disabled, 1 - enabled, 2 - auto", 0) \
\ \

View File

@ -2,9 +2,13 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ElapsedTimeProfileEventIncrement.h> #include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h> #include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h> #include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/Context.cpp>
#include <base/getThreadId.h>
namespace CurrentMetrics namespace CurrentMetrics
@ -47,10 +51,13 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0) : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_) , read_settings(settings_)
, reader(reader_) , reader(reader_)
, priority(settings_.priority) , base_priority(settings_.priority)
, impl(impl_) , impl(impl_)
, prefetch_buffer(settings_.remote_fs_buffer_size) , prefetch_buffer(settings_.remote_fs_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_) , min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
#ifndef NDEBUG #ifndef NDEBUG
, log(&Poco::Logger::get("AsynchronousBufferFromRemoteFS")) , log(&Poco::Logger::get("AsynchronousBufferFromRemoteFS"))
#else #else
@ -101,14 +108,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
} }
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size) std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{ {
IAsynchronousReader::Request request; IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl); request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.buf = data; request.buf = data;
request.size = size; request.size = size;
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
request.priority = priority; request.priority = base_priority + priority;
if (bytes_to_ignore) if (bytes_to_ignore)
{ {
@ -119,7 +126,7 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
} }
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
{ {
if (prefetch_future.valid()) if (prefetch_future.valid())
return; return;
@ -128,9 +135,12 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
if (!hasPendingDataToRead()) if (!hasPendingDataToRead())
return; return;
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true. /// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.remote_fs_buffer_size); chassert(prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size()); prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
} }
@ -163,6 +173,29 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
} }
void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch)
{
const auto & object = impl->getCurrentObject();
FilesystemReadPrefetchesLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.path = object.getMappedPath(),
.offset = file_offset_of_buffer_end,
.size = size,
.prefetch_submit_time = last_prefetch_info.submit_time,
.execution_watch = execution_watch ? std::optional<Stopwatch>(*execution_watch) : std::nullopt,
.priority = last_prefetch_info.priority,
.state = state,
.thread_id = getThreadId(),
.reader_id = current_reader_id,
};
if (auto log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
log->add(elem);
}
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{ {
if (!hasPendingDataToRead()) if (!hasPendingDataToRead())
@ -176,10 +209,19 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds); ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
std::tie(size, offset) = prefetch_future.get(); auto result = prefetch_future.get();
size = result.size;
offset = result.offset;
prefetch_future = {}; prefetch_future = {};
prefetch_buffer.swap(memory); prefetch_buffer.swap(memory);
if (read_settings.enable_filesystem_read_prefetches_log)
{
appendToPrefetchLog(FilesystemPrefetchState::USED, size, result.execution_watch);
}
last_prefetch_info = {};
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
} }
@ -196,6 +238,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
} }
chassert(size >= offset); chassert(size >= offset);
size_t bytes_read = size - offset; size_t bytes_read = size - offset;
if (bytes_read) if (bytes_read)
{ {
@ -265,7 +308,13 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
/// Prefetch is cancelled because of seek. /// Prefetch is cancelled because of seek.
if (read_from_prefetch) if (read_from_prefetch)
{
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches); ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
if (read_settings.enable_filesystem_read_prefetches_log)
{
appendToPrefetchLog(FilesystemPrefetchState::CANCELLED_WITH_SEEK, -1, nullptr);
}
}
break; break;
} }
@ -333,8 +382,9 @@ void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetc
if (!prefetch_future.valid()) if (!prefetch_future.valid())
return; return;
auto [size, _] = prefetch_future.get(); auto [size, offset, _] = prefetch_future.get();
prefetch_future = {}; prefetch_future = {};
last_prefetch_info = {};
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);

View File

@ -4,6 +4,7 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h> #include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility> #include <utility>
namespace Poco { class Logger; } namespace Poco { class Logger; }
@ -43,7 +44,7 @@ public:
String getFileName() const override; String getFileName() const override;
void prefetch() override; void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position). void setReadUntilPosition(size_t position) override; /// [..., position).
@ -62,22 +63,17 @@ private:
bool hasPendingDataToRead(); bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size); void appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
enum class FilesystemPrefetchState
{
USED,
CANCELLED_WITH_SEEK,
CANCELLED_WITH_RANGE_CHANGE,
UNNEEDED,
};
void resetPrefetch(FilesystemPrefetchState state); void resetPrefetch(FilesystemPrefetchState state);
ReadSettings read_settings; ReadSettings read_settings;
IAsynchronousReader & reader; IAsynchronousReader & reader;
Int64 priority; int64_t base_priority;
std::shared_ptr<ReadBufferFromRemoteFSGather> impl; std::shared_ptr<ReadBufferFromRemoteFSGather> impl;
@ -89,11 +85,22 @@ private:
size_t min_bytes_for_seek; size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
size_t bytes_to_ignore = 0; size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position; std::optional<size_t> read_until_position;
Poco::Logger * log; Poco::Logger * log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
}; };
} }

View File

@ -1172,7 +1172,7 @@ void CachedOnDiskReadBufferFromFile::assertCorrectness() const
{ {
if (FileCache::isReadOnly() if (FileCache::isReadOnly()
&& !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache) && !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed (query_id: {})", query_id);
} }
String CachedOnDiskReadBufferFromFile::getInfoForLog() String CachedOnDiskReadBufferFromFile::getInfoForLog()

View File

@ -33,14 +33,16 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
if (blobs_to_read.empty()) if (blobs_to_read.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects");
current_object = blobs_to_read.front();
with_cache = settings.remote_fs_cache with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache && settings.enable_filesystem_cache
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache); && (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
} }
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object) SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{ {
if (current_object && !with_cache && enable_cache_log) if (current_buf != nullptr && !with_cache && enable_cache_log)
{ {
appendFilesystemCacheLog(); appendFilesystemCacheLog();
} }
@ -73,15 +75,13 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog() void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{ {
if (!current_object) chassert(!current_object.absolute_path.empty());
return;
FilesystemCacheLogElement elem FilesystemCacheLogElement elem
{ {
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()), .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id, .query_id = query_id,
.source_file_path = current_object->absolute_path, .source_file_path = current_object.absolute_path,
.file_segment_range = { 0, current_object->bytes_size }, .file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE, .cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file, .file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false, .read_from_cache_attempted = false,
@ -107,9 +107,9 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
auto result = nextImpl(); auto result = nextImpl();
if (result) if (result)
return {working_buffer.size(), BufferBase::offset()}; return { working_buffer.size(), BufferBase::offset(), nullptr };
return {0, 0}; return {0, 0, nullptr};
} }
void ReadBufferFromRemoteFSGather::initialize() void ReadBufferFromRemoteFSGather::initialize()
@ -198,6 +198,10 @@ bool ReadBufferFromRemoteFSGather::readImpl()
bytes_to_ignore = 0; bytes_to_ignore = 0;
} }
LOG_TEST(
log, "with cache: {}, query_id: {}, read-only: {}",
with_cache, query_id, settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
if (!result) if (!result)
result = current_buf->next(); result = current_buf->next();
@ -245,9 +249,7 @@ void ReadBufferFromRemoteFSGather::reset()
String ReadBufferFromRemoteFSGather::getFileName() const String ReadBufferFromRemoteFSGather::getFileName() const
{ {
if (current_object) return current_object.absolute_path;
return current_object->absolute_path;
return blobs_to_read[0].absolute_path;
} }
size_t ReadBufferFromRemoteFSGather::getFileSize() const size_t ReadBufferFromRemoteFSGather::getFileSize() const

View File

@ -47,6 +47,8 @@ public:
size_t getImplementationBufferOffset() const; size_t getImplementationBufferOffset() const;
const StoredObject & getCurrentObject() const { return current_object; }
private: private:
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object); SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
@ -68,10 +70,7 @@ private:
size_t read_until_position = 0; size_t read_until_position = 0;
String current_file_path; StoredObject current_object;
size_t current_file_size = 0;
std::optional<StoredObject> current_object;
bool with_cache; bool with_cache;

View File

@ -95,7 +95,7 @@ bool ReadIndirectBufferFromRemoteFS::nextImpl()
chassert(internal_buffer.size() == read_settings.remote_fs_buffer_size); chassert(internal_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(file_offset_of_buffer_end <= impl->getFileSize()); chassert(file_offset_of_buffer_end <= impl->getFileSize());
auto [size, offset] = impl->readInto(internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end, /* ignore */0); auto [size, offset, _] = impl->readInto(internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end, /* ignore */0);
chassert(offset <= size); chassert(offset <= size);
chassert(size <= internal_buffer.size()); chassert(size <= internal_buffer.size());

View File

@ -145,7 +145,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
if (!res) if (!res)
{ {
/// The file has ended. /// The file has ended.
promise.set_value({0, 0}); promise.set_value({0, 0, nullptr});
return future; return future;
} }
@ -190,7 +190,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read); ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
promise.set_value({bytes_read, request.ignore}); promise.set_value({bytes_read, request.ignore, nullptr});
return future; return future;
} }
} }

View File

@ -9,6 +9,9 @@
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/ElapsedTimeProfileEventIncrement.h> #include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future> #include <future>
@ -27,6 +30,29 @@ namespace CurrentMetrics
namespace DB namespace DB
{ {
namespace
{
struct AsyncReadIncrement : boost::noncopyable
{
explicit AsyncReadIncrement(std::shared_ptr<AsyncReadCounters> counters_)
: counters(counters_)
{
std::lock_guard lock(counters->mutex);
if (++counters->current_parallel_read_tasks > counters->max_parallel_read_tasks)
counters->max_parallel_read_tasks = counters->current_parallel_read_tasks;
}
~AsyncReadIncrement()
{
std::lock_guard lock(counters->mutex);
--counters->current_parallel_read_tasks;
}
std::shared_ptr<AsyncReadCounters> counters;
};
}
IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
{ {
return reader.readInto(data, size, offset, ignore); return reader.readInto(data, size, offset, ignore);
@ -45,18 +71,25 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result return scheduleFromThreadPool<Result>([request]() -> Result
{ {
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead}; CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
Stopwatch watch(CLOCK_MONOTONIC);
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get()); auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();
watch.stop(); ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size); ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size);
return Result{ .size = result.size, .offset = result.offset }; return Result{ .size = result.size, .offset = result.offset, .execution_watch = std::move(watch) };
}, pool, "VFSRead", request.priority); }, pool, "VFSRead", request.priority);
} }

View File

@ -42,7 +42,7 @@ StoredObject StoredObject::create(
bool object_bypasses_cache) bool object_bypasses_cache)
{ {
if (object_bypasses_cache) if (object_bypasses_cache)
return StoredObject(object_path, object_size, {}); return StoredObject(object_path, object_size, mapped_path, {});
PathKeyForCacheCreator path_key_for_cache_creator = [&object_storage](const std::string & path) -> std::string PathKeyForCacheCreator path_key_for_cache_creator = [&object_storage](const std::string & path) -> std::string
{ {

View File

@ -35,6 +35,8 @@ struct StoredObject
using PathKeyForCacheCreator = std::function<std::string(const std::string &)>; using PathKeyForCacheCreator = std::function<std::string(const std::string &)>;
PathKeyForCacheCreator path_key_for_cache_creator; PathKeyForCacheCreator path_key_for_cache_creator;
StoredObject() = default;
explicit StoredObject( explicit StoredObject(
const std::string & absolute_path_, const std::string & absolute_path_,
uint64_t bytes_size_ = 0, uint64_t bytes_size_ = 0,

View File

@ -0,0 +1,37 @@
#include <IO/AsyncReadCounters.h>
namespace DB
{
void AsyncReadCounters::dumpToMapColumn(IColumn * column) const
{
auto * column_map = column ? &typeid_cast<DB::ColumnMap &>(*column) : nullptr;
if (!column_map)
return;
auto & offsets = column_map->getNestedColumn().getOffsets();
auto & tuple_column = column_map->getNestedData();
auto & key_column = tuple_column.getColumn(0);
auto & value_column = tuple_column.getColumn(1);
size_t size = 0;
auto load_if_not_empty = [&](const auto & key, const auto & value)
{
if (value)
{
key_column.insert(key);
value_column.insert(value);
++size;
}
};
std::lock_guard lock(mutex);
load_if_not_empty("max_parallel_read_tasks", max_parallel_read_tasks);
load_if_not_empty("max_parallel_prefetch_tasks", max_parallel_prefetch_tasks);
load_if_not_empty("total_prefetch_tasks", total_prefetch_tasks);
offsets.push_back(offsets.back() + size);
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Core/Types.h>
#include <Columns/ColumnMap.h>
namespace DB
{
/// Metrics for asynchronous reading feature.
struct AsyncReadCounters
{
/// Count current and max number of tasks in a asynchronous read pool.
/// The tasks are requests to read the data.
size_t max_parallel_read_tasks = 0;
size_t current_parallel_read_tasks = 0;
/// Count current and max number of tasks in a reader prefetch read pool.
/// The tasks are calls to IMergeTreeReader::prefetch(), which does not do
/// any reading but creates a request for read. But as we need to wait for
/// marks to be loaded during this prefetch, we do it in a threadpool too.
size_t max_parallel_prefetch_tasks = 0;
size_t current_parallel_prefetch_tasks = 0;
size_t total_prefetch_tasks = 0;
mutable std::mutex mutex;
AsyncReadCounters() = default;
void dumpToMapColumn(IColumn * column) const;
};
}

View File

@ -37,14 +37,14 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
} }
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::asyncReadInto(char * data, size_t size) std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::asyncReadInto(char * data, size_t size, int64_t priority)
{ {
IAsynchronousReader::Request request; IAsynchronousReader::Request request;
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd); request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
request.buf = data; request.buf = data;
request.size = size; request.size = size;
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
request.priority = priority; request.priority = base_priority + priority;
request.ignore = bytes_to_ignore; request.ignore = bytes_to_ignore;
bytes_to_ignore = 0; bytes_to_ignore = 0;
@ -58,14 +58,14 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescripto
} }
void AsynchronousReadBufferFromFileDescriptor::prefetch() void AsynchronousReadBufferFromFileDescriptor::prefetch(int64_t priority)
{ {
if (prefetch_future.valid()) if (prefetch_future.valid())
return; return;
/// Will request the same amount of data that is read in nextImpl. /// Will request the same amount of data that is read in nextImpl.
prefetch_buffer.resize(internal_buffer.size()); prefetch_buffer.resize(internal_buffer.size());
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size()); prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
} }
@ -108,8 +108,9 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
else else
{ {
/// No pending request. Do synchronous read. /// No pending request. Do synchronous read.
Stopwatch watch; Stopwatch watch;
auto [size, offset] = asyncReadInto(memory.data(), memory.size()).get(); auto [size, offset, _] = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
file_offset_of_buffer_end += size; file_offset_of_buffer_end += size;
@ -151,7 +152,7 @@ AsynchronousReadBufferFromFileDescriptor::AsynchronousReadBufferFromFileDescript
std::optional<size_t> file_size_) std::optional<size_t> file_size_)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_) : ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, reader(reader_) , reader(reader_)
, priority(priority_) , base_priority(priority_)
, required_alignment(alignment) , required_alignment(alignment)
, fd(fd_) , fd(fd_)
{ {

View File

@ -17,7 +17,7 @@ class AsynchronousReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{ {
protected: protected:
IAsynchronousReader & reader; IAsynchronousReader & reader;
Int32 priority; int64_t base_priority;
Memory<> prefetch_buffer; Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future; std::future<IAsynchronousReader::Result> prefetch_future;
@ -46,7 +46,7 @@ public:
~AsynchronousReadBufferFromFileDescriptor() override; ~AsynchronousReadBufferFromFileDescriptor() override;
void prefetch() override; void prefetch(int64_t priority) override;
int getFD() const int getFD() const
{ {
@ -67,7 +67,7 @@ public:
size_t getFileSize() override; size_t getFileSize() override;
private: private:
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size); std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
}; };
} }

View File

@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <future> #include <future>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Common/Stopwatch.h>
namespace DB namespace DB
@ -62,6 +63,8 @@ public:
/// Optional. Useful when implementation needs to do ignore(). /// Optional. Useful when implementation needs to do ignore().
size_t offset = 0; size_t offset = 0;
std::unique_ptr<Stopwatch> execution_watch;
operator std::tuple<size_t &, size_t &>() { return {size, offset}; } operator std::tuple<size_t &, size_t &>() { return {size, offset}; }
}; };

View File

@ -19,7 +19,7 @@ public:
const ReadBuffer & getWrappedReadBuffer() const { return *in; } const ReadBuffer & getWrappedReadBuffer() const { return *in; }
ReadBuffer & getWrappedReadBuffer() { return *in; } ReadBuffer & getWrappedReadBuffer() { return *in; }
void prefetch() override { in->prefetch(); } void prefetch(int64_t priority) override { in->prefetch(priority); }
protected: protected:
std::unique_ptr<ReadBuffer> in; std::unique_ptr<ReadBuffer> in;

View File

@ -20,7 +20,7 @@ public:
~PeekableReadBuffer() override; ~PeekableReadBuffer() override;
void prefetch() override { sub_buf->prefetch(); } void prefetch(int64_t priority) override { sub_buf->prefetch(priority); }
/// Sets checkpoint at current position /// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint() ALWAYS_INLINE inline void setCheckpoint()

View File

@ -20,6 +20,8 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
} }
static constexpr auto DEFAULT_PREFETCH_PRIORITY = 0;
/** A simple abstract class for buffered data reading (char sequences) from somewhere. /** A simple abstract class for buffered data reading (char sequences) from somewhere.
* Unlike std::istream, it provides access to the internal buffer, * Unlike std::istream, it provides access to the internal buffer,
* and also allows you to manually manage the position inside the buffer. * and also allows you to manually manage the position inside the buffer.
@ -202,8 +204,10 @@ public:
/** Do something to allow faster subsequent call to 'nextImpl' if possible. /** Do something to allow faster subsequent call to 'nextImpl' if possible.
* It's used for asynchronous readers with double-buffering. * It's used for asynchronous readers with double-buffering.
* `priority` is the Threadpool priority, with which the prefetch task will be schedules.
* Smaller is more priority.
*/ */
virtual void prefetch() {} virtual void prefetch(int64_t /* priority */) {}
/** /**
* Set upper bound for read range [..., position). * Set upper bound for read range [..., position).

View File

@ -118,7 +118,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
} }
void ReadBufferFromFileDescriptor::prefetch() void ReadBufferFromFileDescriptor::prefetch(int64_t)
{ {
#if defined(POSIX_FADV_WILLNEED) #if defined(POSIX_FADV_WILLNEED)
/// For direct IO, loading data into page cache is pointless. /// For direct IO, loading data into page cache is pointless.

View File

@ -22,7 +22,7 @@ protected:
int fd; int fd;
bool nextImpl() override; bool nextImpl() override;
void prefetch() override; void prefetch(int64_t priority) override;
/// Name or some description of file. /// Name or some description of file.
std::string getFileName() const override; std::string getFileName() const override;

View File

@ -12,6 +12,7 @@
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/Throttler.h> #include <Common/Throttler.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <base/sleep.h> #include <base/sleep.h>
#include <utility> #include <utility>
@ -20,6 +21,7 @@
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event ReadBufferFromS3Microseconds; extern const Event ReadBufferFromS3Microseconds;
extern const Event ReadBufferFromS3InitMicroseconds;
extern const Event ReadBufferFromS3Bytes; extern const Event ReadBufferFromS3Bytes;
extern const Event ReadBufferFromS3RequestsErrors; extern const Event ReadBufferFromS3RequestsErrors;
extern const Event ReadBufferSeekCancelConnection; extern const Event ReadBufferSeekCancelConnection;
@ -323,6 +325,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
if (read_settings.for_object_storage) if (read_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3GetObject); ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req); Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
if (outcome.IsSuccess()) if (outcome.IsSuccess())

View File

@ -88,6 +88,8 @@ struct ReadSettings
size_t remote_fs_read_max_backoff_ms = 10000; size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4; size_t remote_fs_read_backoff_max_tries = 4;
bool enable_filesystem_read_prefetches_log = false;
bool enable_filesystem_cache = true; bool enable_filesystem_cache = true;
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false; bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false; bool enable_filesystem_cache_log = false;

View File

@ -17,7 +17,7 @@ public:
off_t seek(off_t off, int whence) override; off_t seek(off_t off, int whence) override;
void prefetch() override { impl->prefetch(); } void prefetch(int64_t priority) override { impl->prefetch(priority); }
private: private:
UInt64 min_bytes_for_seek; /// Minimum positive seek offset which shall be executed using seek operation. UInt64 min_bytes_for_seek; /// Minimum positive seek offset which shall be executed using seek operation.

View File

@ -234,6 +234,7 @@ struct ContextSharedPart : boost::noncopyable
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache. mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices. mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices. mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results. mutable QueryCachePtr query_cache; /// Cache of query results.
@ -412,6 +413,20 @@ struct ContextSharedPart : boost::noncopyable
} }
} }
if (prefetch_threadpool)
{
try
{
LOG_DEBUG(log, "Desctructing prefetch threadpool");
prefetch_threadpool->wait();
prefetch_threadpool.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
try try
{ {
shutdown(); shutdown();
@ -1992,6 +2007,31 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool; return *shared->load_marks_threadpool;
} }
static size_t getPrefetchThreadpoolSizeFromConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return getPrefetchThreadpoolSizeFromConfig(config);
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes) void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{ {
auto lock = getLock(); auto lock = getLock();
@ -2931,7 +2971,16 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
if (!shared->system_logs) if (!shared->system_logs)
return {}; return {};
return shared->system_logs->cache_log; return shared->system_logs->filesystem_cache_log;
}
std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetchesLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->filesystem_read_prefetches_log;
} }
std::shared_ptr<AsynchronousInsertLog> Context::getAsynchronousInsertLog() const std::shared_ptr<AsynchronousInsertLog> Context::getAsynchronousInsertLog() const
@ -3784,6 +3833,31 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
return shared->common_executor; return shared->common_executor;
} }
static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
}
case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
return config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
}
case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::numeric_limits<std::size_t>::max();
}
}
}
size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
return getThreadPoolReaderSizeFromConfig(type, config);
}
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{ {
const auto & config = getConfigRef(); const auto & config = getConfigRef();
@ -3796,9 +3870,8 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
{ {
if (!shared->asynchronous_remote_fs_reader) if (!shared->asynchronous_remote_fs_reader)
{ {
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 100); auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000); auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size); shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
} }
@ -3808,9 +3881,8 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
{ {
if (!shared->asynchronous_local_fs_reader) if (!shared->asynchronous_local_fs_reader)
{ {
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100); auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000); auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size); shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size);
} }
@ -3868,6 +3940,8 @@ ReadSettings Context::getReadSettings() const
res.load_marks_asynchronously = settings.load_marks_asynchronously; res.load_marks_asynchronously = settings.load_marks_asynchronously;
res.enable_filesystem_read_prefetches_log = settings.enable_filesystem_read_prefetches_log;
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms; res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries; res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.enable_filesystem_cache = settings.enable_filesystem_cache; res.enable_filesystem_cache = settings.enable_filesystem_cache;
@ -3920,6 +3994,14 @@ WriteSettings Context::getWriteSettings() const
return res; return res;
} }
std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
{
auto lock = getLock();
if (!async_read_counters)
async_read_counters = std::make_shared<AsyncReadCounters>();
return async_read_counters;
}
bool Context::canUseParallelReplicasOnInitiator() const bool Context::canUseParallelReplicasOnInitiator() const
{ {
const auto & settings = getSettingsRef(); const auto & settings = getSettingsRef();

View File

@ -10,6 +10,7 @@
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Core/UUID.h> #include <Core/UUID.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/ClientInfo.h> #include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
@ -95,6 +96,7 @@ class BackupsWorker;
class TransactionsInfoLog; class TransactionsInfoLog;
class ProcessorsProfileLog; class ProcessorsProfileLog;
class FilesystemCacheLog; class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog; class AsynchronousInsertLog;
class IAsynchronousReader; class IAsynchronousReader;
struct MergeTreeSettings; struct MergeTreeSettings;
@ -369,6 +371,8 @@ private:
/// Needs to be changed while having const context in factories methods /// Needs to be changed while having const context in factories methods
mutable QueryFactoriesInfo query_factories_info; mutable QueryFactoriesInfo query_factories_info;
/// Query metrics for reading data asynchronously with IAsynchronousReader.
mutable std::shared_ptr<AsyncReadCounters> async_read_counters;
/// TODO: maybe replace with temporary tables? /// TODO: maybe replace with temporary tables?
StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views
@ -856,6 +860,13 @@ public:
void dropMarkCache() const; void dropMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const; ThreadPool & getLoadMarksThreadpool() const;
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once. /// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes); void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const; std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
@ -943,6 +954,7 @@ public:
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const; std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const; std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const; std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const; std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
/// Returns an object used to log operations with parts if it possible. /// Returns an object used to log operations with parts if it possible.
@ -1081,6 +1093,10 @@ public:
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const; IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
size_t getThreadPoolReaderSize(FilesystemReaderType type) const;
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const; ThreadPool & getThreadPoolWriter() const;
/** Get settings for reading from filesystem. */ /** Get settings for reading from filesystem. */

View File

@ -0,0 +1,61 @@
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
NamesAndTypesList FilesystemReadPrefetchesLogElement::getNamesAndTypes()
{
return {
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"query_id", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeInt64>()},
{"prefetch_submit_time", std::make_shared<DataTypeDateTime64>(6)},
{"priority", std::make_shared<DataTypeUInt64>()},
{"prefetch_execution_start_time", std::make_shared<DataTypeDateTime64>(6)},
{"prefetch_execution_end_time", std::make_shared<DataTypeDateTime64>(6)},
{"prefetch_execution_time_us", std::make_shared<DataTypeUInt64>()},
{"state", std::make_shared<DataTypeString>()}, /// Was this prefetch used or we downloaded it in vain?
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"reader_id", std::make_shared<DataTypeString>()},
};
}
void FilesystemReadPrefetchesLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(query_id);
columns[i++]->insert(path);
columns[i++]->insert(offset);
columns[i++]->insert(size);
columns[i++]->insert(prefetch_submit_time);
columns[i++]->insert(priority);
if (execution_watch)
{
columns[i++]->insert(execution_watch->getStart());
columns[i++]->insert(execution_watch->getEnd());
columns[i++]->insert(execution_watch->elapsedMicroseconds());
}
else
{
columns[i++]->insertDefault();
columns[i++]->insertDefault();
columns[i++]->insertDefault();
}
columns[i++]->insert(magic_enum::enum_name(state));
columns[i++]->insert(thread_id);
columns[i++]->insert(reader_id);
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Core/NamesAndAliases.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/SystemLog.h>
#include <Common/Stopwatch.h>
namespace DB
{
enum class FilesystemPrefetchState
{
USED,
CANCELLED_WITH_SEEK,
CANCELLED_WITH_RANGE_CHANGE,
UNNEEDED,
};
struct FilesystemReadPrefetchesLogElement
{
time_t event_time{};
String query_id;
String path;
UInt64 offset;
Int64 size; /// -1 means unknown
Decimal64 prefetch_submit_time{};
std::optional<Stopwatch> execution_watch;
size_t priority;
FilesystemPrefetchState state;
UInt64 thread_id;
String reader_id;
static std::string name() { return "FilesystemReadPrefetchesLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
class FilesystemReadPrefetchesLog : public SystemLog<FilesystemReadPrefetchesLogElement>
{
public:
using SystemLog<FilesystemReadPrefetchesLogElement>::SystemLog;
};
}

View File

@ -126,6 +126,8 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"used_row_policies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()))}, {"used_row_policies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()))},
{"transaction_id", getTransactionIDDataType()}, {"transaction_id", getTransactionIDDataType()},
{"AsyncReadCounters", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
}; };
} }
@ -271,6 +273,11 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
} }
columns[i++]->insert(Tuple{tid.start_csn, tid.local_tid, tid.host_id}); columns[i++]->insert(Tuple{tid.start_csn, tid.local_tid, tid.host_id});
if (async_read_counters)
async_read_counters->dumpToMapColumn(columns[i++].get());
else
columns[i++]->insertDefault();
} }
void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableColumns & columns, size_t & i) void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableColumns & columns, size_t & i)

View File

@ -6,6 +6,7 @@
#include <Interpreters/SystemLog.h> #include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h> #include <Interpreters/ClientInfo.h>
#include <Interpreters/TransactionVersionMetadata.h> #include <Interpreters/TransactionVersionMetadata.h>
#include <IO/AsyncReadCounters.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
@ -88,6 +89,7 @@ struct QueryLogElement
std::vector<UInt64> thread_ids; std::vector<UInt64> thread_ids;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters; std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<Settings> query_settings; std::shared_ptr<Settings> query_settings;
TransactionID tid; TransactionID tid;

View File

@ -3,7 +3,6 @@
#include <Interpreters/AsynchronousMetricLog.h> #include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/CrashLog.h> #include <Interpreters/CrashLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterInsertQuery.h> #include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h> #include <Interpreters/InterpreterRenameQuery.h>
@ -18,6 +17,8 @@
#include <Interpreters/TextLog.h> #include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h> #include <Interpreters/TraceLog.h>
#include <Interpreters/TransactionsInfoLog.h> #include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ZooKeeperLog.h> #include <Interpreters/ZooKeeperLog.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
@ -120,6 +121,8 @@ std::shared_ptr<TSystemLog> createSystemLog(
return {}; return {};
} }
LOG_DEBUG(&Poco::Logger::get("SystemLog"),
"Creating {}.{} from {}", default_database_name, default_table_name, config_prefix);
String database = config.getString(config_prefix + ".database", default_database_name); String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name); String table = config.getString(config_prefix + ".table", default_table_name);
@ -200,7 +203,9 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
crash_log = createSystemLog<CrashLog>(global_context, "system", "crash_log", config, "crash_log"); crash_log = createSystemLog<CrashLog>(global_context, "system", "crash_log", config, "crash_log");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log"); text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log"); metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log");
cache_log = createSystemLog<FilesystemCacheLog>(global_context, "system", "filesystem_cache_log", config, "filesystem_cache_log"); filesystem_cache_log = createSystemLog<FilesystemCacheLog>(global_context, "system", "filesystem_cache_log", config, "filesystem_cache_log");
filesystem_read_prefetches_log = createSystemLog<FilesystemReadPrefetchesLog>(
global_context, "system", "filesystem_read_prefetches_log", config, "filesystem_read_prefetches_log");
asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>( asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>(
global_context, "system", "asynchronous_metric_log", config, global_context, "system", "asynchronous_metric_log", config,
"asynchronous_metric_log"); "asynchronous_metric_log");
@ -246,8 +251,10 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(transactions_info_log.get()); logs.emplace_back(transactions_info_log.get());
if (processors_profile_log) if (processors_profile_log)
logs.emplace_back(processors_profile_log.get()); logs.emplace_back(processors_profile_log.get());
if (cache_log) if (filesystem_cache_log)
logs.emplace_back(cache_log.get()); logs.emplace_back(filesystem_cache_log.get());
if (filesystem_read_prefetches_log)
logs.emplace_back(filesystem_read_prefetches_log.get());
if (asynchronous_insert_log) if (asynchronous_insert_log)
logs.emplace_back(asynchronous_insert_log.get()); logs.emplace_back(asynchronous_insert_log.get());

View File

@ -47,6 +47,7 @@ class SessionLog;
class TransactionsInfoLog; class TransactionsInfoLog;
class ProcessorsProfileLog; class ProcessorsProfileLog;
class FilesystemCacheLog; class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog; class AsynchronousInsertLog;
/// System logs should be destroyed in destructor of the last Context and before tables, /// System logs should be destroyed in destructor of the last Context and before tables,
@ -65,7 +66,8 @@ struct SystemLogs
std::shared_ptr<CrashLog> crash_log; /// Used to log server crashes. std::shared_ptr<CrashLog> crash_log; /// Used to log server crashes.
std::shared_ptr<TextLog> text_log; /// Used to log all text messages. std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics. std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::shared_ptr<FilesystemCacheLog> cache_log; /// Used to log cache trace. std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
/// Metrics from system.asynchronous_metrics. /// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log; std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans. /// OpenTelemetry trace spans.

View File

@ -903,6 +903,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.used_functions = factories_info.functions; element.used_functions = factories_info.functions;
element.used_storages = factories_info.storages; element.used_storages = factories_info.storages;
element.used_table_functions = factories_info.table_functions; element.used_table_functions = factories_info.table_functions;
element.async_read_counters = context_ptr->getAsyncReadCounters();
}; };
/// Also make possible for caller to log successful query finish and exception during execution. /// Also make possible for caller to log successful query finish and exception during execution.

View File

@ -33,6 +33,7 @@
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h> #include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h> #include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h> #include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeSource.h> #include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/RangesInDataPart.h> #include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
@ -81,6 +82,16 @@ static const PrewhereInfoPtr & getPrewhereInfoFromQueryInfo(const SelectQueryInf
: query_info.prewhere_info; : query_info.prewhere_info;
} }
static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
{
for (const auto & part : parts)
{
if (!part.data_part->isStoredOnRemoteDisk())
return false;
}
return true;
}
ReadFromMergeTree::ReadFromMergeTree( ReadFromMergeTree::ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_, MergeTreeData::DataPartsVector parts_,
Names real_column_names_, Names real_column_names_,
@ -426,16 +437,6 @@ struct PartRangesReadInfo
bool use_uncompressed_cache = false; bool use_uncompressed_cache = false;
static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
{
for (const auto & part : parts)
{
if (!part.data_part->isStoredOnRemoteDisk())
return false;
}
return true;
}
PartRangesReadInfo( PartRangesReadInfo(
const RangesInDataParts & parts, const RangesInDataParts & parts,
const Settings & settings, const Settings & settings,
@ -443,6 +444,7 @@ struct PartRangesReadInfo
{ {
/// Count marks for each part. /// Count marks for each part.
sum_marks_in_parts.resize(parts.size()); sum_marks_in_parts.resize(parts.size());
for (size_t i = 0; i < parts.size(); ++i) for (size_t i = 0; i < parts.size(); ++i)
{ {
total_rows += parts[i].getRowsCount(); total_rows += parts[i].getRowsCount();
@ -463,14 +465,23 @@ struct PartRangesReadInfo
index_granularity_bytes); index_granularity_bytes);
auto all_parts_on_remote_disk = checkAllPartsOnRemoteFS(parts); auto all_parts_on_remote_disk = checkAllPartsOnRemoteFS(parts);
size_t min_rows_for_concurrent_read;
size_t min_bytes_for_concurrent_read;
if (all_parts_on_remote_disk)
{
min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem;
min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem;
}
else
{
min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read;
min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read;
}
min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead( min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
all_parts_on_remote_disk ? settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem min_rows_for_concurrent_read, min_bytes_for_concurrent_read,
: settings.merge_tree_min_rows_for_concurrent_read, data_settings.index_granularity, index_granularity_bytes, sum_marks);
all_parts_on_remote_disk ? settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem
: settings.merge_tree_min_bytes_for_concurrent_read,
data_settings.index_granularity,
index_granularity_bytes,
sum_marks);
use_uncompressed_cache = settings.use_uncompressed_cache; use_uncompressed_cache = settings.use_uncompressed_cache;
if (sum_marks > max_marks_to_use_cache) if (sum_marks > max_marks_to_use_cache)

View File

@ -5,6 +5,7 @@
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h> #include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h> #include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/AsynchronousReader.h>
namespace CurrentMetrics namespace CurrentMetrics
@ -38,7 +39,7 @@ AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS(
IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_) IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_)
: BufferWithOwnMemory<SeekableReadBuffer>(settings_.remote_fs_buffer_size) : BufferWithOwnMemory<SeekableReadBuffer>(settings_.remote_fs_buffer_size)
, reader(reader_) , reader(reader_)
, priority(settings_.priority) , base_priority(settings_.priority)
, impl(std::move(impl_)) , impl(std::move(impl_))
, prefetch_buffer(settings_.remote_fs_buffer_size) , prefetch_buffer(settings_.remote_fs_buffer_size)
, read_until_position(impl->getFileSize()) , read_until_position(impl->getFileSize())
@ -63,19 +64,19 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
return true; return true;
} }
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size) std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
{ {
IAsynchronousReader::Request request; IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl); request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.buf = data; request.buf = data;
request.size = size; request.size = size;
request.offset = file_offset_of_buffer_end; request.offset = file_offset_of_buffer_end;
request.priority = priority; request.priority = base_priority + priority;
request.ignore = 0; request.ignore = 0;
return reader.submit(request); return reader.submit(request);
} }
void AsynchronousReadBufferFromHDFS::prefetch() void AsynchronousReadBufferFromHDFS::prefetch(int64_t priority)
{ {
interval_watch.restart(); interval_watch.restart();
@ -85,7 +86,7 @@ void AsynchronousReadBufferFromHDFS::prefetch()
if (!hasPendingDataToRead()) if (!hasPendingDataToRead())
return; return;
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size()); prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
} }
@ -146,7 +147,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
{ {
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
auto result = asyncReadInto(memory.data(), memory.size()).get(); auto result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
size = result.size; size = result.size;
auto offset = result.offset; auto offset = result.offset;
@ -166,8 +167,8 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
prefetch_future = {}; prefetch_future = {};
if (use_prefetch) if (use_prefetch && bytes_read)
prefetch(); prefetch(DEFAULT_PREFETCH_PRIORITY);
sum_duration += next_watch.elapsedMicroseconds(); sum_duration += next_watch.elapsedMicroseconds();
sum_wait += wait; sum_wait += wait;

View File

@ -12,7 +12,6 @@
#include <base/types.h> #include <base/types.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/AsynchronousReader.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h> #include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -20,6 +19,8 @@
namespace DB namespace DB
{ {
class IAsynchronousReader;
class AsynchronousReadBufferFromHDFS : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileName, public WithFileSize class AsynchronousReadBufferFromHDFS : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileName, public WithFileSize
{ {
public: public:
@ -32,7 +33,7 @@ public:
off_t seek(off_t offset_, int whence) override; off_t seek(off_t offset_, int whence) override;
void prefetch() override; void prefetch(int64_t priority) override;
size_t getFileSize() override; size_t getFileSize() override;
@ -49,10 +50,10 @@ private:
bool hasPendingDataToRead(); bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size); std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
IAsynchronousReader & reader; IAsynchronousReader & reader;
size_t priority; int64_t base_priority;
std::shared_ptr<ReadBufferFromHDFS> impl; std::shared_ptr<ReadBufferFromHDFS> impl;
std::future<IAsynchronousReader::Result> prefetch_future; std::future<IAsynchronousReader::Result> prefetch_future;
Memory<> prefetch_buffer; Memory<> prefetch_buffer;

View File

@ -217,13 +217,13 @@ IAsynchronousReader::Result ReadBufferFromHDFS::readInto(char * data, size_t siz
/// TODO: we don't need to copy if there is no pending data /// TODO: we don't need to copy if there is no pending data
seek(offset, SEEK_SET); seek(offset, SEEK_SET);
if (eof()) if (eof())
return {0, 0}; return {0, 0, nullptr};
/// Make sure returned size no greater than available bytes in working_buffer /// Make sure returned size no greater than available bytes in working_buffer
size_t count = std::min(size, available()); size_t count = std::min(size, available());
memcpy(data, position(), count); memcpy(data, position(), count);
position() += count; position() += count;
return {count, 0}; return {count, 0, nullptr};
} }
String ReadBufferFromHDFS::getFileName() const String ReadBufferFromHDFS::getFileName() const

View File

@ -5,7 +5,6 @@
#if USE_HDFS #if USE_HDFS
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/AsynchronousReader.h>
#include <string> #include <string>
#include <memory> #include <memory>
#include <hdfs/hdfs.h> #include <hdfs/hdfs.h>

View File

@ -236,7 +236,7 @@ public:
raw_read_buf = get_raw_read_buf(); raw_read_buf = get_raw_read_buf();
if (read_settings.remote_fs_prefetch) if (read_settings.remote_fs_prefetch)
raw_read_buf->prefetch(); raw_read_buf->prefetch(DEFAULT_PREFETCH_PRIORITY);
} }
catch (Exception & e) catch (Exception & e)
{ {

View File

@ -61,6 +61,8 @@ public:
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read; MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
virtual void prefetchBeginOfRange(int64_t /* priority */) {}
protected: protected:
/// Returns actual column name in part, which can differ from table metadata. /// Returns actual column name in part, which can differ from table metadata.
String getColumnNameInPart(const NameAndTypePair & required_column) const; String getColumnNameInPart(const NameAndTypePair & required_column) const;

View File

@ -5,16 +5,20 @@
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h> #include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/RequestResponse.h> #include <Storages/MergeTree/RequestResponse.h>
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h> #include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h> #include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <Processors/Transforms/AggregatingTransform.h> #include <Processors/Transforms/AggregatingTransform.h>
#include <city.h> #include <city.h>
namespace ProfileEvents
{
extern const Event WaitPrefetchTaskMicroseconds;
};
namespace DB namespace DB
{ {
@ -61,6 +65,8 @@ IMergeTreeSelectAlgorithm::IMergeTreeSelectAlgorithm(
, use_uncompressed_cache(use_uncompressed_cache_) , use_uncompressed_cache(use_uncompressed_cache_)
, virt_column_names(virt_column_names_) , virt_column_names(virt_column_names_)
, partition_value_type(storage.getPartitionValueType()) , partition_value_type(storage.getPartitionValueType())
, owned_uncompressed_cache(use_uncompressed_cache ? storage.getContext()->getUncompressedCache() : nullptr)
, owned_mark_cache(storage.getContext()->getMarkCache())
{ {
header_without_const_virtual_columns = applyPrewhereActions(std::move(header), prewhere_info); header_without_const_virtual_columns = applyPrewhereActions(std::move(header), prewhere_info);
size_t non_const_columns_offset = header_without_const_virtual_columns.columns(); size_t non_const_columns_offset = header_without_const_virtual_columns.columns();
@ -176,33 +182,89 @@ ChunkAndProgress IMergeTreeSelectAlgorithm::read()
return {Chunk(), num_read_rows, num_read_bytes}; return {Chunk(), num_read_rows, num_read_bytes};
} }
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart( void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
MergeTreeData::DataPartPtr & data_part, const StorageMetadataPtr & metadata_snapshot,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot, const IMergeTreeReader::ValueSizeMap & value_size_map,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{ {
reader = data_part->getReader(task_columns.columns, metadata_snapshot, mark_ranges, if (!task)
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no task");
value_size_map, profile_callback);
if (task->reader.valid())
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
reader = task->reader.get();
}
else
{
reader = task->data_part->getReader(
task->task_columns.columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback);
}
if (!task->pre_reader_for_step.empty())
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
pre_reader_for_step.clear();
for (auto & pre_reader : task->pre_reader_for_step)
pre_reader_for_step.push_back(pre_reader.get());
}
else
{
initializeMergeTreePreReadersForPart(
task->data_part, task->task_columns, metadata_snapshot,
task->mark_ranges, value_size_map, profile_callback);
}
}
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
reader = data_part->getReader(
task_columns.columns, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback);
initializeMergeTreePreReadersForPart(
data_part, task_columns, metadata_snapshot,
mark_ranges, value_size_map, profile_callback);
}
void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
pre_reader_for_step.clear(); pre_reader_for_step.clear();
/// Add lightweight delete filtering step /// Add lightweight delete filtering step
if (reader_settings.apply_deleted_mask && data_part->hasLightweightDelete()) if (reader_settings.apply_deleted_mask && data_part->hasLightweightDelete())
{ {
pre_reader_for_step.push_back(data_part->getReader({LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot, mark_ranges, pre_reader_for_step.push_back(
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, data_part->getReader(
value_size_map, profile_callback)); {LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot,
mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback));
} }
if (prewhere_info) if (prewhere_info)
{ {
for (const auto & pre_columns_per_step : task_columns.pre_columns) for (const auto & pre_columns_per_step : task_columns.pre_columns)
{ {
pre_reader_for_step.push_back(data_part->getReader(pre_columns_per_step, metadata_snapshot, mark_ranges, pre_reader_for_step.push_back(
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, data_part->getReader(
value_size_map, profile_callback)); pre_columns_per_step, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback));
} }
} }
} }

View File

@ -116,10 +116,17 @@ protected:
const Names & non_const_virtual_column_names); const Names & non_const_virtual_column_names);
/// Sets up data readers for each step of prewhere and where /// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForCurrentTask(
const StorageMetadataPtr & metadata_snapshot,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
void initializeMergeTreeReadersForPart( void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part, MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot, const MergeTreeReadTaskColumns & task_columns,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback); const ReadBufferFromFileBase::ProfileCallback & profile_callback);
/// Sets up range readers corresponding to data readers /// Sets up range readers corresponding to data readers
@ -153,8 +160,8 @@ protected:
/// A result of getHeader(). A chunk which this header is returned from read(). /// A result of getHeader(). A chunk which this header is returned from read().
Block result_header; Block result_header;
std::shared_ptr<UncompressedCache> owned_uncompressed_cache; UncompressedCachePtr owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache; MarkCachePtr owned_mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>; using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader; MergeTreeReaderPtr reader;
@ -181,6 +188,15 @@ private:
bool getNewTask(); bool getNewTask();
/// Initialize pre readers.
void initializeMergeTreePreReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info); static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info);
}; };

View File

@ -141,14 +141,20 @@ MergeTreeReadTask::MergeTreeReadTask(
const NameSet & column_name_set_, const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_, const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_, bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_) MergeTreeBlockSizePredictorPtr size_predictor_,
int64_t priority_,
std::future<MergeTreeReaderPtr> reader_,
std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_)
: data_part{data_part_} : data_part{data_part_}
, mark_ranges{mark_ranges_} , mark_ranges{mark_ranges_}
, part_index_in_query{part_index_in_query_} , part_index_in_query{part_index_in_query_}
, column_name_set{column_name_set_} , column_name_set{column_name_set_}
, task_columns{task_columns_} , task_columns{task_columns_}
, remove_prewhere_column{remove_prewhere_column_} , remove_prewhere_column{remove_prewhere_column_}
, size_predictor{std::move(size_predictor_)} , size_predictor{size_predictor_}
, reader(std::move(reader_))
, pre_reader_for_step(std::move(pre_reader_for_step_))
, priority(priority_)
{ {
} }

View File

@ -5,6 +5,7 @@
#include <Storages/StorageSnapshot.h> #include <Storages/StorageSnapshot.h>
#include <Storages/MergeTree/RangesInDataPart.h> #include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h> #include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
namespace DB namespace DB
@ -67,6 +68,16 @@ struct MergeTreeReadTask
/// NOTE: we take references to elements and push_back new elements, that's why it is a deque but noit a vector /// NOTE: we take references to elements and push_back new elements, that's why it is a deque but noit a vector
std::deque<MergeTreeRangeReader> pre_range_readers; std::deque<MergeTreeRangeReader> pre_range_readers;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
std::future<MergeTreeReaderPtr> reader;
std::vector<std::future<MergeTreeReaderPtr>> pre_reader_for_step;
int64_t priority = 0; /// Priority of the task. Bigger value, bigger priority.
bool operator <(const MergeTreeReadTask & rhs) const
{
return priority < rhs.priority;
}
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); } bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
MergeTreeReadTask( MergeTreeReadTask(
@ -76,9 +87,13 @@ struct MergeTreeReadTask
const NameSet & column_name_set_, const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_, const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_, bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_); MergeTreeBlockSizePredictorPtr size_predictor_,
int64_t priority_ = 0,
std::future<MergeTreeReaderPtr> reader_ = {},
std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_ = {});
}; };
MergeTreeReadTaskColumns getReadTaskColumns( MergeTreeReadTaskColumns getReadTaskColumns(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader, const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot, const StorageSnapshotPtr & storage_snapshot,

View File

@ -3,6 +3,8 @@
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Storages/MergeTree/MergeTreeSettings.h> #include <Storages/MergeTree/MergeTreeSettings.h>
#include <IO/WriteSettings.h> #include <IO/WriteSettings.h>
#include <Compression/CompressionFactory.h>
#include <Compression/ICompressionCodec.h>
namespace DB namespace DB

View File

@ -0,0 +1,612 @@
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/Operators.h>
#include <base/getThreadId.h>
namespace ProfileEvents
{
extern const Event MergeTreePrefetchedReadPoolInit;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
size_t threads,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
size_t preferred_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
ContextPtr context_,
bool use_uncompressed_cache_,
bool is_remote_read_,
const MergeTreeSettings & storage_settings_)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
parts_,
(preferred_block_size_bytes_ > 0),
/*do_not_steal_tasks_*/false)
, WithContext(context_)
, log(&Poco::Logger::get("MergeTreePrefetchedReadPool(" + (parts_.empty() ? "" : parts_.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
, header(storage_snapshot_->getSampleBlockForColumns(column_names_))
, mark_cache(context_->getGlobalContext()->getMarkCache().get())
, uncompressed_cache(use_uncompressed_cache_ ? context_->getGlobalContext()->getUncompressedCache().get() : nullptr)
, reader_settings(reader_settings_)
, profile_callback([this](ReadBufferFromFileBase::ProfileInfo info_) { profileFeedback(info_); })
, index_granularity_bytes(storage_settings_.index_granularity_bytes)
, fixed_index_granularity(storage_settings_.index_granularity)
, is_remote_read(is_remote_read_)
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
{
/// Tasks creation might also create a lost of readers - check they do not
/// do any time consuming operations in ctor.
ProfileEventTimeIncrement<Milliseconds> watch(ProfileEvents::MergeTreePrefetchedReadPoolInit);
parts_infos = getPartsInfos(parts_, preferred_block_size_bytes_);
threads_tasks = createThreadsTasks(threads, sum_marks_, min_marks_for_concurrent_read_);
}
struct MergeTreePrefetchedReadPool::PartInfo
{
MergeTreeData::DataPartPtr data_part;
size_t part_index_in_query;
size_t sum_marks = 0;
MarkRanges ranges;
NameSet column_name_set;
MergeTreeReadTaskColumns task_columns;
MergeTreeBlockSizePredictorPtr size_predictor;
size_t approx_size_of_mark = 0;
size_t prefetch_step_marks = 0;
};
std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedReader(
const IMergeTreeDataPart & data_part,
const NamesAndTypesList & columns,
const MarkRanges & required_ranges,
int64_t priority) const
{
auto reader = data_part.getReader(
columns, storage_snapshot->metadata, required_ranges,
uncompressed_cache, mark_cache, reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
/// In order to make a prefetch we need to wait for marks to be loaded. But we just created
/// a reader (which starts loading marks in its constructor), then if we do prefetch right
/// after creating a reader, it will be very inefficient. We can do prefetch for all parts
/// only inside this MergeTreePrefetchedReadPool, where read tasks are created and distributed,
/// and we cannot block either, therefore make prefetch inside the pool and put the future
/// into the read task (MergeTreeReadTask). When a thread calls getTask(), it will wait for
/// it (if not yet ready) after getting the task.
auto task = [=, reader = std::move(reader), context = getContext()]() mutable -> MergeTreeReaderPtr &&
{
/// For async read metrics in system.query_log.
PrefetchIncrement watch(context->getAsyncReadCounters());
reader->prefetchBeginOfRange(priority);
return std::move(reader);
};
return scheduleFromThreadPool<IMergeTreeDataPart::MergeTreeReaderPtr>(std::move(task), prefetch_threadpool, "ReadPrepare", priority);
}
void MergeTreePrefetchedReadPool::createPrefetchedReaderForTask(MergeTreeReadTask & task) const
{
if (task.reader.valid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task already has a reader");
task.reader = createPrefetchedReader(*task.data_part, task.task_columns.columns, task.mark_ranges, task.priority);
if (reader_settings.apply_deleted_mask && task.data_part->hasLightweightDelete())
{
auto pre_reader = createPrefetchedReader(*task.data_part, {LightweightDeleteDescription::FILTER_COLUMN}, task.mark_ranges, task.priority);
task.pre_reader_for_step.push_back(std::move(pre_reader));
}
if (prewhere_info)
{
for (const auto & pre_columns_per_step : task.task_columns.pre_columns)
{
auto pre_reader = createPrefetchedReader(*task.data_part, pre_columns_per_step, task.mark_ranges, task.priority);
task.pre_reader_for_step.push_back(std::move(pre_reader));
}
}
}
bool MergeTreePrefetchedReadPool::TaskHolder::operator <(const TaskHolder & other) const
{
return task->priority < other.task->priority;
}
void MergeTreePrefetchedReadPool::startPrefetches() const
{
for (const auto & task : prefetch_queue)
{
createPrefetchedReaderForTask(*task.task);
}
}
MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t thread)
{
std::lock_guard lock(mutex);
if (threads_tasks.empty())
return nullptr;
if (!started_prefetches)
{
startPrefetches();
started_prefetches = true;
}
auto it = threads_tasks.find(thread);
if (it == threads_tasks.end())
{
ThreadsTasks::iterator non_prefetched_tasks_to_steal = threads_tasks.end();
ThreadsTasks::iterator prefetched_tasks_to_steal = threads_tasks.end();
int64_t best_prefetched_task_priority = -1;
/// There is no point stealing in order (like in MergeTreeReadPool, where tasks can be stolen
/// only from the next thread). Even if we steal task from the next thread, which reads from
/// the same part as we just read, it might seem that we can reuse our own reader, do some
/// seek avoiding and it will have a good result as we avoided seek (new request). But it is
/// not so, because this next task will most likely have its own reader a prefetch already on
/// the fly. (Not to mention that in fact we cannot reuse our own reader if initially we did
/// not accounted this range into range request to object storage).
for (auto thread_tasks_it = threads_tasks.begin(); thread_tasks_it != threads_tasks.end(); ++thread_tasks_it)
{
/// Prefer to steal tasks which have an initialized reader (with prefetched data). Thus we avoid
/// losing a prefetch by creating our own reader (or resusing our own reader if the part
/// is the same as last read by this thread).
auto & thread_tasks = thread_tasks_it->second;
auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->reader.valid(); });
if (task_it == thread_tasks.end())
{
/// The follow back to non-prefetched task should lie on the thread which
/// has more tasks than others.
if (non_prefetched_tasks_to_steal == threads_tasks.end()
|| non_prefetched_tasks_to_steal->second.size() < thread_tasks.size())
non_prefetched_tasks_to_steal = thread_tasks_it;
}
/// Try to steal task with the best (lowest) priority (because it will be executed faster).
else if (prefetched_tasks_to_steal == threads_tasks.end()
|| (*task_it)->priority < best_prefetched_task_priority)
{
best_prefetched_task_priority = (*task_it)->priority;
chassert(best_prefetched_task_priority >= 0);
prefetched_tasks_to_steal = thread_tasks_it;
}
}
if (prefetched_tasks_to_steal != threads_tasks.end())
{
const auto from_thread = prefetched_tasks_to_steal->first;
auto & thread_tasks = prefetched_tasks_to_steal->second;
assert(!thread_tasks.empty());
auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->reader.valid(); });
assert(task_it != thread_tasks.end());
auto task = std::move(*task_it);
thread_tasks.erase(task_it);
if (thread_tasks.empty())
threads_tasks.erase(prefetched_tasks_to_steal);
LOG_TRACE(
log, "Thread {} stole prefetched task (priority: {}) from {} ({})",
thread, task->priority, from_thread, toString(task->mark_ranges));
return task;
}
/// TODO: it also makes sense to first try to steal from the next thread if it has ranges
/// from the same part as current thread last read - to reuse the reader.
if (non_prefetched_tasks_to_steal != threads_tasks.end())
{
auto & thread_tasks = non_prefetched_tasks_to_steal->second;
auto from_thread = non_prefetched_tasks_to_steal->first;
assert(!thread_tasks.empty());
/// Get second half of the tasks.
const size_t total_tasks = thread_tasks.size();
const size_t half = total_tasks / 2;
auto half_it = thread_tasks.begin() + half;
assert(half_it != thread_tasks.end());
/// Give them to current thread, as current thread's tasks list is empty.
auto & current_thread_tasks = threads_tasks[thread];
current_thread_tasks.insert(
current_thread_tasks.end(), make_move_iterator(half_it), make_move_iterator(thread_tasks.end()));
/// Erase them from the thread from which we steal.
thread_tasks.resize(half);
if (thread_tasks.empty())
threads_tasks.erase(non_prefetched_tasks_to_steal);
LOG_TRACE(log, "Thread {} stole {} non-prefetched tasks from {}", thread, current_thread_tasks.size(), from_thread);
auto task = std::move(current_thread_tasks.front());
current_thread_tasks.erase(current_thread_tasks.begin());
if (current_thread_tasks.empty())
threads_tasks.erase(thread);
return task;
}
LOG_TEST(log, "Thread {} returns with no task (current threads tasks: {})", thread, dumpTasks(threads_tasks));
return nullptr;
}
auto & thread_tasks = it->second;
assert(!thread_tasks.empty());
auto task = std::move(thread_tasks.front());
thread_tasks.pop_front();
size_t remaining_tasks_num = thread_tasks.size();
if (thread_tasks.empty())
threads_tasks.erase(it);
LOG_TEST(
log,
"Thread {} returns with task (min_marks_for_concurrent_read: {}, reader: {}, remaining_tasks: {} ({}, all remainin: {}))",
thread, min_marks_for_concurrent_read, task->reader.valid(), remaining_tasks_num, toString(task->mark_ranges), dumpTasks(threads_tasks));
return task;
}
size_t MergeTreePrefetchedReadPool::getApproxSizeOfGranule(const IMergeTreeDataPart & part) const
{
const auto & columns = part.getColumns();
auto all_columns_are_fixed_size = columns.end() == std::find_if(
columns.begin(), columns.end(),
[](const auto & col){ return col.type->haveMaximumSizeOfValue() == false; });
if (all_columns_are_fixed_size)
{
size_t approx_size = 0;
for (const auto & col : columns)
approx_size += col.type->getMaximumSizeOfValueInMemory() * fixed_index_granularity;
if (!index_granularity_bytes)
return approx_size;
return std::min(index_granularity_bytes, approx_size);
}
const size_t approx_size = static_cast<size_t>(std::round(static_cast<double>(part.getBytesOnDisk()) / part.getMarksCount()));
if (!index_granularity_bytes)
return approx_size;
return std::min(index_granularity_bytes, approx_size);
}
MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInfos(
const RangesInDataParts & parts, size_t preferred_block_size_bytes) const
{
PartsInfos result;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
const bool predict_block_size_bytes = preferred_block_size_bytes > 0;
for (const auto & part : parts)
{
auto part_info = std::make_unique<PartInfo>();
part_info->data_part = part.data_part;
part_info->part_index_in_query = part.part_index_in_query;
part_info->ranges = part.ranges;
std::sort(part_info->ranges.begin(), part_info->ranges.end());
/// Sum up total size of all mark ranges in a data part.
for (const auto & range : part.ranges)
{
part_info->sum_marks += range.end - range.begin;
}
part_info->approx_size_of_mark = getApproxSizeOfGranule(*part_info->data_part);
const auto task_columns = getReadTaskColumns(
LoadedMergeTreeDataPartInfoForReader(part.data_part),
storage_snapshot,
column_names,
virtual_column_names,
prewhere_info,
/* with_subcolumns */true);
part_info->size_predictor = !predict_block_size_bytes
? nullptr
: IMergeTreeSelectAlgorithm::getSizePredictor(part.data_part, task_columns, sample_block);
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter.
const auto & required_column_names = task_columns.columns.getNames();
part_info->column_name_set = {required_column_names.begin(), required_column_names.end()};
part_info->task_columns = std::move(task_columns);
result.push_back(std::move(part_info));
}
return result;
}
MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThreadsTasks(
size_t threads, size_t sum_marks, size_t /* min_marks_for_concurrent_read */) const
{
if (parts_infos.empty())
return {};
const auto & context = getContext();
const auto & settings = context->getSettingsRef();
size_t total_size_approx = 0;
for (const auto & part : parts_infos)
{
total_size_approx += part->sum_marks * part->approx_size_of_mark;
}
size_t min_prefetch_step_marks = 0;
if (settings.filesystem_prefetches_limit && settings.filesystem_prefetches_limit < sum_marks)
{
min_prefetch_step_marks = static_cast<size_t>(std::round(static_cast<double>(sum_marks) / settings.filesystem_prefetches_limit));
}
size_t total_prefetches_approx = 0;
for (const auto & part : parts_infos)
{
if (settings.filesystem_prefetch_step_marks)
{
part->prefetch_step_marks = settings.filesystem_prefetch_step_marks;
}
else if (settings.filesystem_prefetch_step_bytes && part->approx_size_of_mark)
{
part->prefetch_step_marks = std::max<size_t>(
1, static_cast<size_t>(std::round(static_cast<double>(settings.filesystem_prefetch_step_bytes) / part->approx_size_of_mark)));
}
else
{
/// Experimentally derived ratio.
part->prefetch_step_marks = static_cast<size_t>(
std::round(std::pow(std::max<size_t>(1, static_cast<size_t>(std::round(sum_marks / 1000))), double(1.5))));
}
/// This limit is important to avoid spikes of slow aws getObject requests when parallelizing within one file.
/// (The default is taken from here https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html).
if (part->approx_size_of_mark
&& settings.filesystem_prefetch_min_bytes_for_single_read_task
&& part->approx_size_of_mark < settings.filesystem_prefetch_min_bytes_for_single_read_task)
{
const size_t new_min_prefetch_step_marks = static_cast<size_t>(
std::ceil(static_cast<double>(settings.filesystem_prefetch_min_bytes_for_single_read_task) / part->approx_size_of_mark));
if (min_prefetch_step_marks < new_min_prefetch_step_marks)
{
LOG_TEST(
log, "Increasing min prefetch step from {} to {}", min_prefetch_step_marks, new_min_prefetch_step_marks);
min_prefetch_step_marks = new_min_prefetch_step_marks;
}
}
if (part->prefetch_step_marks < min_prefetch_step_marks)
{
LOG_TEST(
log, "Increasing prefetch step from {} to {} because of the prefetches limit {}",
part->prefetch_step_marks, min_prefetch_step_marks, settings.filesystem_prefetches_limit);
part->prefetch_step_marks = min_prefetch_step_marks;
}
LOG_TEST(log,
"Part: {}, sum_marks: {}, approx mark size: {}, prefetch_step_bytes: {}, prefetch_step_marks: {}, (ranges: {})",
part->data_part->name, part->sum_marks, part->approx_size_of_mark,
settings.filesystem_prefetch_step_bytes, part->prefetch_step_marks, toString(part->ranges));
}
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
LOG_DEBUG(
log,
"Sum marks: {}, threads: {}, min_marks_per_thread: {}, result prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}",
sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx);
size_t current_prefetches_count = 0;
prefetch_queue.reserve(total_prefetches_approx);
ThreadsTasks result_threads_tasks;
size_t memory_usage_approx = 0;
for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i)
{
auto need_marks = min_marks_per_thread;
/// Priority is given according to the prefetch number for each thread,
/// e.g. the first task of each thread has the same priority and is bigger
/// than second task of each thread, and so on.
/// Add 1 to query read priority because higher priority should be given to
/// reads from pool which are from reader.
int64_t priority = reader_settings.read_settings.priority + 1;
while (need_marks > 0 && part_idx < parts_infos.size())
{
auto & part = *parts_infos[part_idx];
size_t & marks_in_part = part.sum_marks;
if (marks_in_part == 0)
{
++part_idx;
continue;
}
MarkRanges ranges_to_get_from_part;
size_t marks_to_get_from_part = std::min(need_marks, marks_in_part);
/// Split by prefetch step even if !allow_prefetch below. Because it will allow
/// to make a better distribution of tasks which did not fill into memory limit
/// or prefetches limit through tasks stealing.
if (part.prefetch_step_marks)
{
marks_to_get_from_part = std::min<size_t>(marks_to_get_from_part, part.prefetch_step_marks);
}
if (marks_in_part == marks_to_get_from_part)
{
ranges_to_get_from_part = part.ranges;
}
else
{
if (part.sum_marks < marks_to_get_from_part)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Requested {} marks from part {}, but part has only {} marks",
marks_to_get_from_part, part.data_part->name, part.sum_marks);
}
size_t get_marks_num = marks_to_get_from_part;
while (get_marks_num > 0)
{
MarkRange & range = part.ranges.front();
const size_t marks_in_range = range.end - range.begin;
const size_t marks_to_get_from_range = std::min(marks_in_range, get_marks_num);
get_marks_num -= marks_to_get_from_range;
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
if (range.begin == range.end)
{
part.ranges.pop_front();
}
else if (!get_marks_num && part.prefetch_step_marks && range.end - range.begin < part.prefetch_step_marks)
{
/// We already have `get_marks_num` marks, but current mark range has
/// less than `prefetch_step_marks` marks, then add them too.
ranges_to_get_from_part.emplace_back(range.begin, range.end);
marks_to_get_from_part += range.end - range.begin;
part.ranges.pop_front();
}
}
}
need_marks -= marks_to_get_from_part;
sum_marks -= marks_to_get_from_part;
marks_in_part -= marks_to_get_from_part;
auto curr_task_size_predictor = !part.size_predictor ? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(*part.size_predictor); /// make a copy
auto read_task = std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query,
part.column_name_set, part.task_columns, prewhere_info && prewhere_info->remove_prewhere_column,
std::move(curr_task_size_predictor));
read_task->priority = priority;
bool allow_prefetch = !settings.filesystem_prefetches_limit || current_prefetches_count + 1 <= settings.filesystem_prefetches_limit;
if (allow_prefetch && settings.filesystem_prefetch_max_memory_usage)
{
size_t num_readers = 1;
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
++num_readers;
if (prewhere_info)
num_readers += part.task_columns.pre_columns.size();
memory_usage_approx += settings.max_read_buffer_size * num_readers;
allow_prefetch = memory_usage_approx <= settings.filesystem_prefetch_max_memory_usage;
}
if (allow_prefetch)
{
prefetch_queue.emplace(TaskHolder(read_task.get()));
++current_prefetches_count;
}
++priority;
result_threads_tasks[i].push_back(std::move(read_task));
}
}
LOG_TEST(
log, "Result tasks {} for {} threads: {}",
result_threads_tasks.size(), threads, dumpTasks(result_threads_tasks));
return result_threads_tasks;
}
MergeTreePrefetchedReadPool::~MergeTreePrefetchedReadPool()
{
for (const auto & [_, thread_tasks] : threads_tasks)
{
for (const auto & task : thread_tasks)
{
if (task->reader.valid())
task->reader.wait();
for (const auto & pre_reader : task->pre_reader_for_step)
{
if (pre_reader.valid())
pre_reader.wait();
}
}
}
}
std::string MergeTreePrefetchedReadPool::dumpTasks(const ThreadsTasks & tasks)
{
WriteBufferFromOwnString result;
for (const auto & [thread_id, thread_tasks] : tasks)
{
result << "\tthread id: " << toString(thread_id) << ", tasks: " << toString(thread_tasks.size());
if (!thread_tasks.empty())
{
size_t no = 0;
for (const auto & task : thread_tasks)
{
result << '\t';
result << ++no << ": ";
result << "reader: " << task->reader.valid() << ", ";
result << "part: " << task->data_part->name << ", ";
result << "ranges: " << toString(task->mark_ranges);
}
}
}
return result.str();
}
bool MergeTreePrefetchedReadPool::checkReadMethodAllowed(LocalFSReadMethod method)
{
return method == LocalFSReadMethod::pread_threadpool || method == LocalFSReadMethod::pread_fake_async;
}
bool MergeTreePrefetchedReadPool::checkReadMethodAllowed(RemoteFSReadMethod method)
{
return method == RemoteFSReadMethod::threadpool;
}
}

View File

@ -0,0 +1,128 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Core/BackgroundSchedulePool.h>
#include <IO/AsyncReadCounters.h>
#include <queue>
namespace DB
{
class IMergeTreeReader;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
/// A class which is responsible for creating read tasks
/// which are later taken by readers via getTask method.
/// Does prefetching for the read tasks it creates.
class MergeTreePrefetchedReadPool : public IMergeTreeReadPool, private WithContext
{
public:
MergeTreePrefetchedReadPool(
size_t threads,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
size_t preferred_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
ContextPtr context_,
bool use_uncompressed_cache_,
bool is_remote_read_,
const MergeTreeSettings & storage_settings_);
~MergeTreePrefetchedReadPool() override;
MergeTreeReadTaskPtr getTask(size_t thread) override;
void profileFeedback(ReadBufferFromFileBase::ProfileInfo) override {}
Block getHeader() const override { return header; }
static bool checkReadMethodAllowed(LocalFSReadMethod method);
static bool checkReadMethodAllowed(RemoteFSReadMethod method);
private:
struct PartInfo;
using PartInfoPtr = std::shared_ptr<PartInfo>;
using PartsInfos = std::vector<PartInfoPtr>;
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
using ThreadTasks = std::deque<MergeTreeReadTaskPtr>;
using ThreadsTasks = std::map<size_t, ThreadTasks>;
/// smaller `priority` means more priority
std::future<MergeTreeReaderPtr> createPrefetchedReader(
const IMergeTreeDataPart & data_part,
const NamesAndTypesList & columns,
const MarkRanges & required_ranges,
int64_t priority) const;
void createPrefetchedReaderForTask(MergeTreeReadTask & task) const;
size_t getApproxSizeOfGranule(const IMergeTreeDataPart & part) const;
PartsInfos getPartsInfos(const RangesInDataParts & parts, size_t preferred_block_size_bytes) const;
ThreadsTasks createThreadsTasks(
size_t threads,
size_t sum_marks,
size_t min_marks_for_concurrent_read) const;
void startPrefetches() const;
static std::string dumpTasks(const ThreadsTasks & tasks);
Poco::Logger * log;
Block header;
MarkCache * mark_cache;
UncompressedCache * uncompressed_cache;
MergeTreeReaderSettings reader_settings;
ReadBufferFromFileBase::ProfileCallback profile_callback;
size_t index_granularity_bytes;
size_t fixed_index_granularity;
[[ maybe_unused ]] const bool is_remote_read;
ThreadPool & prefetch_threadpool;
PartsInfos parts_infos;
ThreadsTasks threads_tasks;
struct TaskHolder
{
explicit TaskHolder(MergeTreeReadTask * task_) : task(task_) {}
MergeTreeReadTask * task;
bool operator <(const TaskHolder & other) const;
};
mutable boost::heap::priority_queue<TaskHolder> prefetch_queue;
bool started_prefetches = false;
/// A struct which allows to track max number of tasks which were in the
/// threadpool simultaneously (similar to CurrentMetrics, but the result
/// will be put to QueryLog).
struct PrefetchIncrement : boost::noncopyable
{
explicit PrefetchIncrement(std::shared_ptr<AsyncReadCounters> counters_)
: counters(counters_)
{
std::lock_guard lock(counters->mutex);
++counters->total_prefetch_tasks;
if (++counters->current_parallel_prefetch_tasks > counters->max_parallel_prefetch_tasks)
counters->max_parallel_prefetch_tasks = counters->current_parallel_prefetch_tasks;
}
~PrefetchIncrement()
{
std::lock_guard lock(counters->mutex);
--counters->current_parallel_prefetch_tasks;
}
std::shared_ptr<AsyncReadCounters> counters;
};
};
}

View File

@ -154,10 +154,8 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t thread)
/// Get whole part to read if it is small enough. /// Get whole part to read if it is small enough.
if (marks_in_part <= need_marks) if (marks_in_part <= need_marks)
{ {
const auto marks_to_get_from_range = marks_in_part;
ranges_to_get_from_part = thread_task.ranges; ranges_to_get_from_part = thread_task.ranges;
marks_in_part = 0;
marks_in_part -= marks_to_get_from_range;
thread_tasks.parts_and_ranges.pop_back(); thread_tasks.parts_and_ranges.pop_back();
thread_tasks.sum_marks_in_parts.pop_back(); thread_tasks.sum_marks_in_parts.pop_back();

View File

@ -14,6 +14,7 @@
namespace DB namespace DB
{ {
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>; using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
@ -115,21 +116,6 @@ public:
BackoffSettings backoff_settings; BackoffSettings backoff_settings;
private:
/** State to track numbers of slow reads.
*/
struct BackoffState
{
size_t current_threads;
Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE};
size_t num_events = 0;
explicit BackoffState(size_t threads) : current_threads(threads) {}
};
BackoffState backoff_state;
public:
MergeTreeReadPool( MergeTreeReadPool(
size_t threads_, size_t threads_,
size_t sum_marks_, size_t sum_marks_,
@ -144,6 +130,7 @@ public:
bool do_not_steal_tasks_ = false); bool do_not_steal_tasks_ = false);
~MergeTreeReadPool() override = default; ~MergeTreeReadPool() override = default;
MergeTreeReadTaskPtr getTask(size_t thread) override; MergeTreeReadTaskPtr getTask(size_t thread) override;
/** Each worker could call this method and pass information about read performance. /** Each worker could call this method and pass information about read performance.
@ -160,6 +147,26 @@ private:
size_t threads, size_t sum_marks, std::vector<size_t> per_part_sum_marks, size_t threads, size_t sum_marks, std::vector<size_t> per_part_sum_marks,
const RangesInDataParts & parts); const RangesInDataParts & parts);
/// State to track numbers of slow reads.
struct BackoffState
{
size_t current_threads;
Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE};
size_t num_events = 0;
explicit BackoffState(size_t threads) : current_threads(threads) {}
};
BackoffState backoff_state;
struct Part
{
MergeTreeData::DataPartPtr data_part;
size_t part_index_in_query;
};
std::vector<Part> parts_with_idx;
struct ThreadTask struct ThreadTask
{ {
struct PartIndexAndRange struct PartIndexAndRange

View File

@ -58,10 +58,77 @@ MergeTreeReaderWide::MergeTreeReaderWide(
} }
} }
void MergeTreeReaderWide::prefetchBeginOfRange(int64_t priority)
{
prefetched_streams.clear();
try
{
prefetchForAllColumns(priority, columns_to_read.size(), all_mark_ranges.front().begin, all_mark_ranges.back().end, false);
prefetched_from_mark = all_mark_ranges.front().begin;
/// Arguments explanation:
/// Current prefetch is done for read tasks before they can be picked by reading threads in IMergeTreeReadPool::getTask method.
/// 1. columns_to_read.size() == requested_columns.size() == readRows::res_columns.size().
/// 3. current_task_last_mark argument in readRows() (which is used only for reading from remote fs to make precise
/// ranged read requests) is different from current reader's IMergeTreeReader::all_mark_ranges.back().end because
/// the same reader can be reused between read tasks - if the new task mark ranges correspond to the same part we last
/// read, so we cannot rely on all_mark_ranges and pass actual current_task_last_mark. But here we can do prefetch for begin
/// of range only once so there is no such problem.
/// 4. continue_reading == false, as we haven't read anything yet.
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
throw;
}
}
void MergeTreeReaderWide::prefetchForAllColumns(
int64_t priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading)
{
bool do_prefetch = data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk()
? settings.read_settings.remote_fs_prefetch
: settings.read_settings.local_fs_prefetch;
if (!do_prefetch)
return;
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
for (size_t pos = 0; pos < num_columns; ++pos)
{
try
{
auto & cache = caches[columns_to_read[pos].getNameInStorage()];
prefetchForColumn(
priority, columns_to_read[pos], serializations[pos], from_mark, continue_reading,
current_task_last_mark, cache);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
throw;
}
}
}
size_t MergeTreeReaderWide::readRows( size_t MergeTreeReaderWide::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{ {
size_t read_rows = 0; size_t read_rows = 0;
if (prefetched_from_mark != -1 && static_cast<size_t>(prefetched_from_mark) != from_mark)
{
prefetched_streams.clear();
prefetched_from_mark = -1;
}
try try
{ {
size_t num_columns = res_columns.size(); size_t num_columns = res_columns.size();
@ -70,28 +137,7 @@ size_t MergeTreeReaderWide::readRows(
if (num_columns == 0) if (num_columns == 0)
return max_rows_to_read; return max_rows_to_read;
std::unordered_map<String, ISerialization::SubstreamsCache> caches; prefetchForAllColumns(/* priority */0, num_columns, from_mark, current_task_last_mark, continue_reading);
std::unordered_set<std::string> prefetched_streams;
if (data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
for (size_t pos = 0; pos < num_columns; ++pos)
{
try
{
auto & cache = caches[columns_to_read[pos].getNameInStorage()];
prefetch(columns_to_read[pos], serializations[pos], from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
throw;
}
}
}
for (size_t pos = 0; pos < num_columns; ++pos) for (size_t pos = 0; pos < num_columns; ++pos)
{ {
@ -129,6 +175,9 @@ size_t MergeTreeReaderWide::readRows(
res_columns[pos] = nullptr; res_columns[pos] = nullptr;
} }
prefetched_streams.clear();
caches.clear();
/// NOTE: positions for all streams must be kept in sync. /// NOTE: positions for all streams must be kept in sync.
/// In particular, even if for some streams there are no rows to be read, /// In particular, even if for some streams there are no rows to be read,
/// you must ensure that no seeks are skipped and at this point they all point to to_mark. /// you must ensure that no seeks are skipped and at this point they all point to to_mark.
@ -139,9 +188,11 @@ size_t MergeTreeReaderWide::readRows(
data_part_info_for_read->reportBroken(); data_part_info_for_read->reportBroken();
/// Better diagnostics. /// Better diagnostics.
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + " " e.addMessage(
"from mark " + toString(from_mark) + " " fmt::format(
"with max_rows_to_read = " + toString(max_rows_to_read) + ")"); "(while reading from part {} from mark {} with max_rows_to_read = {})",
data_part_info_for_read->getDataPartStorage()->getFullPath(),
toString(from_mark), toString(max_rows_to_read)));
throw; throw;
} }
catch (...) catch (...)
@ -253,14 +304,14 @@ void MergeTreeReaderWide::deserializePrefix(
} }
} }
void MergeTreeReaderWide::prefetch( void MergeTreeReaderWide::prefetchForColumn(
int64_t priority,
const NameAndTypePair & name_and_type, const NameAndTypePair & name_and_type,
const SerializationPtr & serialization, const SerializationPtr & serialization,
size_t from_mark, size_t from_mark,
bool continue_reading, bool continue_reading,
size_t current_task_last_mark, size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache, ISerialization::SubstreamsCache & cache)
std::unordered_set<std::string> & prefetched_streams)
{ {
deserializePrefix(serialization, name_and_type, current_task_last_mark, cache); deserializePrefix(serialization, name_and_type, current_task_last_mark, cache);
@ -272,9 +323,10 @@ void MergeTreeReaderWide::prefetch(
{ {
bool seek_to_mark = !continue_reading; bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache)) if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
buf->prefetch(); {
buf->prefetch(priority);
prefetched_streams.insert(stream_name); prefetched_streams.insert(stream_name);
}
} }
}); });
} }

View File

@ -33,11 +33,15 @@ public:
bool canReadIncompleteGranules() const override { return true; } bool canReadIncompleteGranules() const override { return true; }
void prefetchBeginOfRange(int64_t priority) override;
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>; using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
private: private:
FileStreams streams; FileStreams streams;
void prefetchForAllColumns(int64_t priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading);
void addStreams( void addStreams(
const NameAndTypePair & name_and_type, const NameAndTypePair & name_and_type,
const SerializationPtr & serialization, const SerializationPtr & serialization,
@ -50,20 +54,24 @@ private:
ISerialization::SubstreamsCache & cache, bool was_prefetched); ISerialization::SubstreamsCache & cache, bool was_prefetched);
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams). /// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
void prefetch( void prefetchForColumn(
int64_t priority,
const NameAndTypePair & name_and_type, const NameAndTypePair & name_and_type,
const SerializationPtr & serialization, const SerializationPtr & serialization,
size_t from_mark, size_t from_mark,
bool continue_reading, bool continue_reading,
size_t current_task_last_mark, size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache, ISerialization::SubstreamsCache & cache);
std::unordered_set<std::string> & prefetched_streams); /// if stream was already prefetched do nothing
void deserializePrefix( void deserializePrefix(
const SerializationPtr & serialization, const SerializationPtr & serialization,
const NameAndTypePair & name_and_type, const NameAndTypePair & name_and_type,
size_t current_task_last_mark, size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache); ISerialization::SubstreamsCache & cache);
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;
ssize_t prefetched_from_mark = -1;
}; };
} }

View File

@ -57,8 +57,8 @@ void MergeTreeSelectAlgorithm::initializeReaders()
owned_mark_cache = storage.getContext()->getMarkCache(); owned_mark_cache = storage.getContext()->getMarkCache();
initializeMergeTreeReadersForPart(data_part, task_columns, storage_snapshot->getMetadataForQuery(), initializeMergeTreeReadersForPart(
all_mark_ranges, {}, {}); data_part, task_columns, storage_snapshot->getMetadataForQuery(), all_mark_ranges, {}, {});
} }

View File

@ -50,22 +50,18 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
IMergeTreeReader::ValueSizeMap value_size_map; IMergeTreeReader::ValueSizeMap value_size_map;
if (!reader) if (reader && part_name != last_read_part_name)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
}
else if (part_name != last_read_part_name)
{ {
value_size_map = reader->getAvgValueSizeHints(); value_size_map = reader->getAvgValueSizeHints();
} }
const bool init_new_readers = !reader || part_name != last_read_part_name; /// task->reader.valid() means there is a prefetched reader in this test, use it.
const bool init_new_readers = !reader || task->reader.valid() || part_name != last_read_part_name;
if (init_new_readers) if (init_new_readers)
{ {
initializeMergeTreeReadersForPart(task->data_part, task->task_columns, metadata_snapshot, initializeMergeTreeReadersForPart(
task->mark_ranges, value_size_map, profile_callback); task->data_part, task->task_columns, metadata_snapshot, task->mark_ranges, value_size_map, profile_callback);
initializeMergeTreeReadersForCurrentTask(metadata_snapshot, value_size_map, profile_callback);
} }
last_read_part_name = part_name; last_read_part_name = part_name;

View File

@ -0,0 +1,55 @@
diff a/src/Storages/MergeTree/MergeTreeThreadSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeThreadSelectProcessor.cpp (rejected hunks)
@@ -27,8 +27,7 @@ MergeTreeThreadSelectAlgorithm::MergeTreeThreadSelectAlgorithm(
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
std::optional<ParallelReadingExtension> extension_)
- :
- IMergeTreeSelectAlgorithm{
+ : IMergeTreeSelectAlgorithm{
pool_->getHeader(), storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_, extension_},
@@ -84,15 +83,12 @@ MergeTreeThreadSelectAlgorithm::MergeTreeThreadSelectAlgorithm(
{
min_marks_to_read = min_marks_to_read_;
}
-
-
- ordered_names = getHeader().getNames();
}
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadSelectAlgorithm::getNewTaskImpl()
{
- task = pool->getTask(min_marks_to_read, thread, ordered_names);
+ task = pool->getTask(min_marks_to_read, thread);
return static_cast<bool>(task);
}
@@ -107,22 +103,16 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
IMergeTreeReader::ValueSizeMap value_size_map;
- if (!reader)
- {
- if (use_uncompressed_cache)
- owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
- owned_mark_cache = storage.getContext()->getMarkCache();
- }
- else if (part_name != last_readed_part_name)
+ if (reader && part_name != last_readed_part_name)
{
value_size_map = reader->getAvgValueSizeHints();
}
- const bool init_new_readers = !reader || part_name != last_readed_part_name;
+ /// task->reader.valid() means there is a prefetched reader in this test, use it.
+ const bool init_new_readers = !reader || task->reader.valid() || part_name != last_readed_part_name;
if (init_new_readers)
{
- initializeMergeTreeReadersForPart(task->data_part, task->task_columns, metadata_snapshot,
- task->mark_ranges, value_size_map, profile_callback);
+ initializeMergeTreeReadersForCurrentTask(metadata_snapshot, value_size_map, profile_callback);
}
last_readed_part_name = part_name;

View File

@ -683,7 +683,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
async_reader->setReadUntilEnd(); async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch) if (read_settings.remote_fs_prefetch)
async_reader->prefetch(); async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
return async_reader; return async_reader;
} }

View File

@ -0,0 +1,9 @@
diff a/tests/clickhouse-test b/tests/clickhouse-test (rejected hunks)
@@ -601,7 +601,6 @@ class SettingsRandomizer:
"local_filesystem_read_method": lambda: random.choice(
["read", "pread", "mmap", "pread_threadpool"]
),
- "remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
"compile_expressions": lambda: random.randint(0, 1),

View File

@ -73,6 +73,7 @@ ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/marks.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/marks.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/insert_keeper_retries.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/insert_keeper_retries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/prefetch_settings.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds # FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"), # when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,9 @@
diff a/tests/config/install.sh b/tests/config/install.sh (rejected hunks)
@@ -75,6 +75,7 @@ ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/marks.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/insert_keeper_retries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/allow_shared_merge_tree.xml $DEST_SERVER_PATH/users.d/
+ln -sf $SRC_PATH/users.d/prefetch_settings.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,9 @@
<clickhouse>
<profiles>
<default>
<allow_prefetched_read_pool_for_remote_filesystem>1</allow_prefetched_read_pool_for_remote_filesystem>
<allow_prefetched_read_pool_for_local_filesystem>0</allow_prefetched_read_pool_for_local_filesystem>
<filesystem_prefetch_max_memory_usage>1Gi</filesystem_prefetch_max_memory_usage>
</default>
</profiles>
</clickhouse>

View File

@ -34,6 +34,7 @@ def cluster():
init_list = { init_list = {
"ReadBufferFromS3Bytes": 0, "ReadBufferFromS3Bytes": 0,
"ReadBufferFromS3Microseconds": 0, "ReadBufferFromS3Microseconds": 0,
"ReadBufferFromS3InitMicroseconds": 0,
"ReadBufferFromS3RequestsErrors": 0, "ReadBufferFromS3RequestsErrors": 0,
"WriteBufferFromS3Bytes": 0, "WriteBufferFromS3Bytes": 0,
"S3ReadMicroseconds": 0, "S3ReadMicroseconds": 0,

View File

@ -35,7 +35,13 @@ def test_move_and_s3_memory_usage(started_single_node_cluster):
) )
# After this, we should have 5 columns per 10 * 100 * 400000 ~ 400 MB; total ~2G data in partition # After this, we should have 5 columns per 10 * 100 * 400000 ~ 400 MB; total ~2G data in partition
small_node.query("optimize table s3_test_with_ttl final") small_node.query(
"optimize table s3_test_with_ttl final",
settings={
"send_logs_level": "error",
"allow_prefetched_read_pool_for_remote_filesystem": 0,
},
)
small_node.query("system flush logs") small_node.query("system flush logs")
# Will take memory usage from metric_log. # Will take memory usage from metric_log.
@ -44,19 +50,26 @@ def test_move_and_s3_memory_usage(started_single_node_cluster):
small_node.query( small_node.query(
"alter table s3_test_with_ttl move partition 0 to volume 'external'", "alter table s3_test_with_ttl move partition 0 to volume 'external'",
settings={"send_logs_level": "error"}, settings={
"send_logs_level": "error",
"allow_prefetched_read_pool_for_remote_filesystem": 0,
},
) )
small_node.query("system flush logs") small_node.query("system flush logs")
max_usage = small_node.query( max_usage = small_node.query(
"""select max(m.val - am.val * 4096) from """
(select toStartOfMinute(event_time) as time, max(CurrentMetric_MemoryTracking) as val from system.metric_log group by time) as m join select max(m.val - am.val * 4096) from
(select toStartOfMinute(event_time) as time, min(value) as val from system.asynchronous_metric_log where metric='jemalloc.arenas.all.pdirty' group by time) as am using time""" (select toStartOfMinute(event_time) as time, max(CurrentMetric_MemoryTracking) as val from system.metric_log group by time) as m join
(select toStartOfMinute(event_time) as time, min(value) as val from system.asynchronous_metric_log where metric='jemalloc.arenas.all.pdirty' group by time) as am using time;"""
) )
# 3G limit is a big one. However, we can hit it anyway with parallel s3 writes enabled. # 3G limit is a big one. However, we can hit it anyway with parallel s3 writes enabled.
# Also actual value can be bigger because of memory drift. # Also actual value can be bigger because of memory drift.
# Increase it a little bit if test fails. # Increase it a little bit if test fails.
assert int(max_usage) < 3e9 assert int(max_usage) < 3e9
res = small_node.query( res = small_node.query(
"select * from system.errors where last_error_message like '%Memory limit%' limit 1" "select * from system.errors where last_error_message like '%Memory limit%' limit 1",
settings={
"allow_prefetched_read_pool_for_remote_filesystem": 0,
},
) )
assert res == "" assert res == ""

View File

@ -17,7 +17,7 @@ cached_query="SELECT count() FROM small_table where n > 0;"
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query="$cached_query" &> /dev/null $CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null $CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS" $CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
@ -25,4 +25,3 @@ $CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents['Seek'], ProfileEvents['ReadCompressedBytes'], ProfileEvents['UncompressedCacheHits'] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') and current_database = currentDatabase() AND (type = 2) AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 1" $CLICKHOUSE_CLIENT --query="SELECT ProfileEvents['Seek'], ProfileEvents['ReadCompressedBytes'], ProfileEvents['UncompressedCacheHits'] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') and current_database = currentDatabase() AND (type = 2) AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS small_table" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS small_table"

View File

@ -24,7 +24,7 @@ $CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 5"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM m ORDER BY a, s LIMIT 10" $CLICKHOUSE_CLIENT -q "SELECT a, s FROM m ORDER BY a, s LIMIT 10"
# Not a single .sql test with max_rows_to_read because it doesn't work with Merge storage # Not a single .sql test with max_rows_to_read because it doesn't work with Merge storage
rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 | grep "rows_read" | sed 's/[^0-9]*//g') rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 | grep "rows_read" | sed 's/[^0-9]*//g')
# Expected number of read rows with a bit margin # Expected number of read rows with a bit margin
if [[ $rows_read -lt 500 ]] if [[ $rows_read -lt 500 ]]
@ -36,7 +36,7 @@ fi
$CLICKHOUSE_CLIENT -q "SELECT '---StorageBuffer---'" $CLICKHOUSE_CLIENT -q "SELECT '---StorageBuffer---'"
$CLICKHOUSE_CLIENT -q "CREATE TABLE buf (a UInt32, s String) engine = Buffer('$CLICKHOUSE_DATABASE', s2, 16, 10, 100, 10000, 1000000, 10000000, 100000000)" $CLICKHOUSE_CLIENT -q "CREATE TABLE buf (a UInt32, s String) engine = Buffer('$CLICKHOUSE_DATABASE', s2, 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM buf ORDER BY a, s LIMIT 10" $CLICKHOUSE_CLIENT -q "SELECT a, s FROM buf ORDER BY a, s LIMIT 10"
rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM buf ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 | grep "rows_read" | sed 's/[^0-9]*//g') rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM buf ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 | grep "rows_read" | sed 's/[^0-9]*//g')
# Expected number of read rows with a bit margin # Expected number of read rows with a bit margin
if [[ $rows_read -lt 500 ]] if [[ $rows_read -lt 500 ]]

View File

@ -0,0 +1,10 @@
diff a/tests/queries/0_stateless/01045_order_by_pk_special_storages.sh b/tests/queries/0_stateless/01045_order_by_pk_special_storages.sh (rejected hunks)
@@ -48,7 +48,7 @@ fi
$CLICKHOUSE_CLIENT -q "SELECT '---MaterializedView---'"
$CLICKHOUSE_CLIENT -q "CREATE MATERIALIZED VIEW mv (a UInt32, s String) engine = MergeTree ORDER BY s SETTINGS min_bytes_for_wide_part = 0 POPULATE AS SELECT a, s FROM s1 WHERE a % 7 = 0"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM mv ORDER BY s LIMIT 10"
-rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a, s FROM mv ORDER BY s LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 | grep "rows_read" | sed 's/[^0-9]*//g')
+rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a, s FROM mv ORDER BY s LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 | grep "rows_read" | sed 's/[^0-9]*//g')
if [[ $rows_read -lt 500 ]]
then echo "OK"

View File

@ -5,3 +5,4 @@ connect_timeout_with_failover_secure_ms Milliseconds 3000
external_storage_connect_timeout_sec UInt64 10 external_storage_connect_timeout_sec UInt64 10
max_untracked_memory UInt64 1048576 max_untracked_memory UInt64 1048576
memory_profiler_step UInt64 1048576 memory_profiler_step UInt64 1048576
filesystem_prefetch_max_memory_usage UInt64 1073741824

View File

@ -13,6 +13,6 @@ $CLICKHOUSE_CLIENT --xyzgarbage 2>&1 | grep -q "UNRECOGNIZED_ARGUMENTS" && echo
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' xyzgarbage 2>&1 | grep -q "BAD_ARGUMENTS" && echo 'OK' || echo 'FAIL' cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' xyzgarbage 2>&1 | grep -q "BAD_ARGUMENTS" && echo 'OK' || echo 'FAIL'
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external -xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "UNRECOGNIZED_ARGUMENTS" && echo 'OK' || echo 'FAIL' cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external -xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "Bad arguments" && echo 'OK' || echo 'FAIL'
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "UNRECOGNIZED_ARGUMENTS" && echo 'OK' || echo 'FAIL' cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "Bad arguments" && echo 'OK' || echo 'FAIL'

View File

@ -0,0 +1,8 @@
diff a/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.sql b/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.sql (rejected hunks)
@@ -1,4 +1,6 @@
SET use_uncompressed_cache = 0;
+SET allow_prefetched_read_pool_for_remote_filesystem=0;
+SET allow_prefetched_read_pool_for_local_filesystem=0;
DROP TABLE IF EXISTS adaptive_table;

View File

@ -0,0 +1,6 @@
drop table ttt if exists;
create table ttt (id Int32, value String) engine=MergeTree() order by tuple() settings storage_policy='s3_cache_7';
insert into ttt settings throw_on_error_from_cache_on_write_operations = 1 select number, toString(number) from numbers(100000);
select * from ttt format Null;
select count() from ttt;
drop table ttt no delay;

View File

@ -0,0 +1,58 @@
#!/usr/bin/env bash
QUERIES_FILE=$1
DEBUG_MODE=$2
function random {
cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z' | fold -w ${1:-8} | head -n 1
}
while read line; do
query="$line"
echo "Query: $query"
function execute()
{
query_id=$(random)
echo "query id: $query_id"
result=($(clickhouse-client --time --query_id "$query_id" -q "$query" 2>&1))
query_result=${result[0]}
total_time=${result[1]}
clickhouse-client -q "system flush logs"
time_executing=$(clickhouse-client -q "select query_duration_ms / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
time_reading_from_prefetch=$(clickhouse-client -q "select ProfileEvents['AsynchronousRemoteReadWaitMicroseconds'] / 1000 / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
time_reading_without_prefetch=$(clickhouse-client -q "select ProfileEvents['SynchronousRemoteReadWaitMicroseconds'] / 1000 / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
function print()
{
echo " $1: $2"
}
print "time executing query" $time_executing
print "time reading data with prefetch" $time_reading_from_prefetch
print "time reading data without prefetch" $time_reading_without_prefetch
if (( $DEBUG_MODE == 1 )); then
remote_profiles=$(clickhouse-client -q "select mapExtractKeyLike(ProfileEvents, '%RemoteFS%') from system.query_log where type='QueryFinish' and query_id = '$query_id'")
threadpool_profiles=$(clickhouse-client -q "select mapExtractKeyLike(ProfileEvents, '%ThreadpoolReader%') from system.query_log where type='QueryFinish' and query_id = '$query_id'")
s3_profiles=$(clickhouse-client -q "select mapExtractKeyLike(ProfileEvents, '%S3%') from system.query_log where type='QueryFinish' and query_id = '$query_id'")
max_parallel_read_tasks=$(clickhouse-client -q "select AsyncReadCounters['max_parallel_read_tasks'] from system.query_log where type='QueryFinish' and query_id = '$query_id'")
max_parallel_prefetch_tasks=$(clickhouse-client -q "select AsyncReadCounters['max_parallel_prefetch_tasks'] from system.query_log where type='QueryFinish' and query_id = '$query_id'")
total_prefetch_tasks=$(clickhouse-client -q "select AsyncReadCounters['total_prefetch_tasks'] from system.query_log where type='QueryFinish' and query_id = '$query_id'")
init=$(clickhouse-client -q "select ProfileEvents['PrefetchedReadBufferInitMS'] / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
wait_prefetch_task=$(clickhouse-client -q "select ProfileEvents['WaitPrefetchTaskMicroseconds'] / 1000 / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
print "max parallel read tasks" $max_parallel_read_tasks
print "max parallel prefetch tasks" $max_parallel_prefetch_tasks
print "total prefetch tasks" $total_prefetch_tasks
print "init tasks time" $init
print "wait prefetch task" $wait_prefetch_task
print "remote reader profile events" $remote_profiles
print "threadpool profile events" $threadpool_profiles
print "s3 profile events" $s3_profiles
fi
}
execute
done < $QUERIES_FILE