Merge branch 'round-robin-merge-scheduler' of github.com:ClickHouse/ClickHouse into round-robin-merge-scheduler

This commit is contained in:
serxa 2023-02-11 16:45:55 +00:00
commit eb8cf6f816
82 changed files with 2012 additions and 411 deletions

2
contrib/zstd vendored

@ -1 +1 @@
Subproject commit b944db0c451ba1bc6bbd8e201d5f88f9041bf1f9
Subproject commit 945f27758c0fd67b636103a38dbf050266c6b90a

View File

@ -14,6 +14,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int DNS_ERROR;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
}
@ -89,7 +90,7 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
catch (const Exception & e)
{
if (e.code() != ErrorCodes::NETWORK_ERROR && e.code() != ErrorCodes::SOCKET_TIMEOUT
&& e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
&& e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF && e.code() != ErrorCodes::DNS_ERROR)
throw;
fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false);

View File

@ -20,17 +20,22 @@ struct ProfileEventTimeIncrement
explicit ProfileEventTimeIncrement<time>(ProfileEvents::Event event_)
: event(event_), watch(CLOCK_MONOTONIC) {}
UInt64 elapsed()
{
if constexpr (time == Time::Nanoseconds)
return watch.elapsedNanoseconds();
else if constexpr (time == Time::Microseconds)
return watch.elapsedMicroseconds();
else if constexpr (time == Time::Milliseconds)
return watch.elapsedMilliseconds();
else if constexpr (time == Time::Seconds)
return watch.elapsedSeconds();
}
~ProfileEventTimeIncrement()
{
watch.stop();
if constexpr (time == Time::Nanoseconds)
ProfileEvents::increment(event, watch.elapsedNanoseconds());
else if constexpr (time == Time::Microseconds)
ProfileEvents::increment(event, watch.elapsedMicroseconds());
else if constexpr (time == Time::Milliseconds)
ProfileEvents::increment(event, watch.elapsedMilliseconds());
else if constexpr (time == Time::Seconds)
ProfileEvents::increment(event, watch.elapsedSeconds());
ProfileEvents::increment(event, elapsed());
}
ProfileEvents::Event event;

View File

@ -348,6 +348,7 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \
\
M(ReadBufferFromS3Microseconds, "Time spend in reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spend initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
\
@ -375,6 +376,8 @@ The server successfully detected this situation and will download merged part fr
M(RemoteFSLazySeeks, "Number of lazy seeks") \
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
M(MergeTreePrefetchedReadPoolInit, "Time spent preparing tasks in MergeTreePrefetchedReadPool") \
M(WaitPrefetchTaskMicroseconds, "Time spend waiting for prefetched reader") \
\
M(ThreadpoolReaderTaskMicroseconds, "Time spent getting the data in asynchronous reading") \
M(ThreadpoolReaderReadBytes, "Bytes read from a threadpool task in asynchronous reading") \

View File

@ -55,7 +55,8 @@ public:
UInt64 elapsedMilliseconds() const { return elapsedNanoseconds() / 1000000UL; }
double elapsedSeconds() const { return static_cast<double>(elapsedNanoseconds()) / 1000000000ULL; }
UInt64 getStart() { return start_ns; }
UInt64 getStart() const { return start_ns; }
UInt64 getEnd() const { return stop_ns; }
private:
UInt64 start_ns = 0;

View File

@ -10,6 +10,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/TransactionsInfoLog.h>

View File

@ -28,6 +28,7 @@
M(ProcessorProfileLogElement) \
M(TextLogElement) \
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement)
namespace Poco

View File

@ -28,10 +28,10 @@ void CachedCompressedReadBuffer::initInput()
}
void CachedCompressedReadBuffer::prefetch()
void CachedCompressedReadBuffer::prefetch(int64_t priority)
{
initInput();
file_in->prefetch();
file_in->prefetch(priority);
}

View File

@ -36,7 +36,7 @@ private:
bool nextImpl() override;
void prefetch() override;
void prefetch(int64_t priority) override;
/// Passed into file_in.
ReadBufferFromFileBase::ProfileCallback profile_callback;

View File

@ -51,9 +51,9 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadB
}
void CompressedReadBufferFromFile::prefetch()
void CompressedReadBufferFromFile::prefetch(int64_t priority)
{
file_in.prefetch();
file_in.prefetch(priority);
}

View File

@ -43,7 +43,7 @@ private:
bool nextImpl() override;
void prefetch() override;
void prefetch(int64_t priority) override;
public:
explicit CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);

View File

@ -633,6 +633,15 @@ class IColumn;
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
\
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
M(Bool, enable_filesystem_read_prefetches_log, false, "Log to system.filesystem prefetch_log during query. Should be used only for testing or debugging, not recommended to be turned on by default", 0) \
M(Bool, allow_prefetched_read_pool_for_remote_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
M(Bool, allow_prefetched_read_pool_for_local_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
\
M(UInt64, filesystem_prefetch_step_bytes, 0, "Prefetch step in bytes. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_step_marks, 0, "Prefetch step in marks. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_min_bytes_for_single_read_task, "8Mi", "Do not parallelize within one file read less than this amount of bytes. E.g. one reader will not receive a read task of size less than this amount. This setting is recommended to avoid spikes of time for aws getObject requests to aws", 0) \
M(UInt64, filesystem_prefetch_max_memory_usage, "1Gi", "Maximum memory usage for prefetches. Zero means unlimited", 0) \
M(UInt64, filesystem_prefetches_limit, 0, "Maximum number of prefetches. Zero means unlimited. A setting `filesystem_prefetches_max_memory_usage` is more recommended if you want to limit the number of prefetches", 0) \
\
M(UInt64, use_structure_from_insertion_table_in_table_functions, 2, "Use structure from insertion table instead of schema inference from data. Possible values: 0 - disabled, 1 - enabled, 2 - auto", 0) \
\

View File

@ -2,9 +2,13 @@
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
namespace CurrentMetrics
@ -47,10 +51,13 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, priority(settings_.priority)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.remote_fs_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
#ifndef NDEBUG
, log(&Poco::Logger::get("AsynchronousBufferFromRemoteFS"))
#else
@ -101,14 +108,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
}
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size)
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = priority;
request.priority = base_priority + priority;
if (bytes_to_ignore)
{
@ -119,7 +126,7 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
}
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
{
if (prefetch_future.valid())
return;
@ -128,9 +135,12 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
if (!hasPendingDataToRead())
return;
last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size());
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -163,6 +173,29 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
}
void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch)
{
const auto & object = impl->getCurrentObject();
FilesystemReadPrefetchesLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.path = object.getMappedPath(),
.offset = file_offset_of_buffer_end,
.size = size,
.prefetch_submit_time = last_prefetch_info.submit_time,
.execution_watch = execution_watch ? std::optional<Stopwatch>(*execution_watch) : std::nullopt,
.priority = last_prefetch_info.priority,
.state = state,
.thread_id = getThreadId(),
.reader_id = current_reader_id,
};
if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
prefetch_log->add(elem);
}
bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
if (!hasPendingDataToRead())
@ -176,10 +209,19 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::AsynchronousRemoteReadWaitMicroseconds);
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
std::tie(size, offset) = prefetch_future.get();
auto result = prefetch_future.get();
size = result.size;
offset = result.offset;
prefetch_future = {};
prefetch_buffer.swap(memory);
if (read_settings.enable_filesystem_read_prefetches_log)
{
appendToPrefetchLog(FilesystemPrefetchState::USED, size, result.execution_watch);
}
last_prefetch_info = {};
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedReads);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);
}
@ -196,6 +238,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
}
chassert(size >= offset);
size_t bytes_read = size - offset;
if (bytes_read)
{
@ -265,7 +308,13 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
/// Prefetch is cancelled because of seek.
if (read_from_prefetch)
{
ProfileEvents::increment(ProfileEvents::RemoteFSCancelledPrefetches);
if (read_settings.enable_filesystem_read_prefetches_log)
{
appendToPrefetchLog(FilesystemPrefetchState::CANCELLED_WITH_SEEK, -1, nullptr);
}
}
break;
}
@ -333,8 +382,9 @@ void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetc
if (!prefetch_future.valid())
return;
auto [size, _] = prefetch_future.get();
auto [size, offset, _] = prefetch_future.get();
prefetch_future = {};
last_prefetch_info = {};
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchedBytes, size);

View File

@ -4,6 +4,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <utility>
namespace Poco { class Logger; }
@ -43,7 +44,7 @@ public:
String getFileName() const override;
void prefetch() override;
void prefetch(int64_t priority) override;
void setReadUntilPosition(size_t position) override; /// [..., position).
@ -62,22 +63,17 @@ private:
bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size);
void appendToPrefetchLog(FilesystemPrefetchState state, int64_t size, const std::unique_ptr<Stopwatch> & execution_watch);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
enum class FilesystemPrefetchState
{
USED,
CANCELLED_WITH_SEEK,
CANCELLED_WITH_RANGE_CHANGE,
UNNEEDED,
};
void resetPrefetch(FilesystemPrefetchState state);
ReadSettings read_settings;
IAsynchronousReader & reader;
Int64 priority;
int64_t base_priority;
std::shared_ptr<ReadBufferFromRemoteFSGather> impl;
@ -89,11 +85,22 @@ private:
size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position;
Poco::Logger * log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;
size_t priority = 0;
};
LastPrefetchInfo last_prefetch_info;
};
}

View File

@ -1172,7 +1172,7 @@ void CachedOnDiskReadBufferFromFile::assertCorrectness() const
{
if (FileCache::isReadOnly()
&& !settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache usage is not allowed (query_id: {})", query_id);
}
String CachedOnDiskReadBufferFromFile::getInfoForLog()

View File

@ -33,14 +33,16 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
if (blobs_to_read.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects");
current_object = blobs_to_read.front();
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!FileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
&& (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_object && !with_cache && enable_cache_log)
if (current_buf != nullptr && !with_cache && enable_cache_log)
{
appendFilesystemCacheLog();
}
@ -73,15 +75,13 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
if (!current_object)
return;
chassert(!current_object.absolute_path.empty());
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = current_object->absolute_path,
.file_segment_range = { 0, current_object->bytes_size },
.source_file_path = current_object.absolute_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
@ -107,9 +107,9 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
auto result = nextImpl();
if (result)
return {working_buffer.size(), BufferBase::offset()};
return { working_buffer.size(), BufferBase::offset(), nullptr };
return {0, 0};
return {0, 0, nullptr};
}
void ReadBufferFromRemoteFSGather::initialize()
@ -245,9 +245,7 @@ void ReadBufferFromRemoteFSGather::reset()
String ReadBufferFromRemoteFSGather::getFileName() const
{
if (current_object)
return current_object->absolute_path;
return blobs_to_read[0].absolute_path;
return current_object.absolute_path;
}
size_t ReadBufferFromRemoteFSGather::getFileSize() const

View File

@ -47,6 +47,8 @@ public:
size_t getImplementationBufferOffset() const;
const StoredObject & getCurrentObject() const { return current_object; }
private:
SeekableReadBufferPtr createImplementationBuffer(const StoredObject & object);
@ -68,10 +70,7 @@ private:
size_t read_until_position = 0;
String current_file_path;
size_t current_file_size = 0;
std::optional<StoredObject> current_object;
StoredObject current_object;
bool with_cache;

View File

@ -95,7 +95,7 @@ bool ReadIndirectBufferFromRemoteFS::nextImpl()
chassert(internal_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(file_offset_of_buffer_end <= impl->getFileSize());
auto [size, offset] = impl->readInto(internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end, /* ignore */0);
auto [size, offset, _] = impl->readInto(internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end, /* ignore */0);
chassert(offset <= size);
chassert(size <= internal_buffer.size());

View File

@ -145,7 +145,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
if (!res)
{
/// The file has ended.
promise.set_value({0, 0});
promise.set_value({0, 0, nullptr});
return future;
}
@ -190,7 +190,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
promise.set_value({bytes_read, request.ignore});
promise.set_value({bytes_read, request.ignore, nullptr});
return future;
}
}

View File

@ -9,6 +9,9 @@
#include <Common/CurrentThread.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future>
@ -27,6 +30,29 @@ namespace CurrentMetrics
namespace DB
{
namespace
{
struct AsyncReadIncrement : boost::noncopyable
{
explicit AsyncReadIncrement(std::shared_ptr<AsyncReadCounters> counters_)
: counters(counters_)
{
std::lock_guard lock(counters->mutex);
if (++counters->current_parallel_read_tasks > counters->max_parallel_read_tasks)
counters->max_parallel_read_tasks = counters->current_parallel_read_tasks;
}
~AsyncReadIncrement()
{
std::lock_guard lock(counters->mutex);
--counters->current_parallel_read_tasks;
}
std::shared_ptr<AsyncReadCounters> counters;
};
}
IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
{
return reader.readInto(data, size, offset, ignore);
@ -45,18 +71,25 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
Stopwatch watch(CLOCK_MONOTONIC);
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();
watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch->elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size);
return Result{ .size = result.size, .offset = result.offset };
return Result{ .size = result.size, .offset = result.offset, .execution_watch = std::move(watch) };
}, pool, "VFSRead", request.priority);
}

View File

@ -42,7 +42,7 @@ StoredObject StoredObject::create(
bool object_bypasses_cache)
{
if (object_bypasses_cache)
return StoredObject(object_path, object_size, {});
return StoredObject(object_path, object_size, mapped_path, {});
PathKeyForCacheCreator path_key_for_cache_creator = [&object_storage](const std::string & path) -> std::string
{

View File

@ -35,6 +35,8 @@ struct StoredObject
using PathKeyForCacheCreator = std::function<std::string(const std::string &)>;
PathKeyForCacheCreator path_key_for_cache_creator;
StoredObject() = default;
explicit StoredObject(
const std::string & absolute_path_,
uint64_t bytes_size_ = 0,

View File

@ -0,0 +1,37 @@
#include <IO/AsyncReadCounters.h>
namespace DB
{
void AsyncReadCounters::dumpToMapColumn(IColumn * column) const
{
auto * column_map = column ? &typeid_cast<DB::ColumnMap &>(*column) : nullptr;
if (!column_map)
return;
auto & offsets = column_map->getNestedColumn().getOffsets();
auto & tuple_column = column_map->getNestedData();
auto & key_column = tuple_column.getColumn(0);
auto & value_column = tuple_column.getColumn(1);
size_t size = 0;
auto load_if_not_empty = [&](const auto & key, const auto & value)
{
if (value)
{
key_column.insert(key);
value_column.insert(value);
++size;
}
};
std::lock_guard lock(mutex);
load_if_not_empty("max_parallel_read_tasks", max_parallel_read_tasks);
load_if_not_empty("max_parallel_prefetch_tasks", max_parallel_prefetch_tasks);
load_if_not_empty("total_prefetch_tasks", total_prefetch_tasks);
offsets.push_back(offsets.back() + size);
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Core/Types.h>
#include <Columns/ColumnMap.h>
namespace DB
{
/// Metrics for asynchronous reading feature.
struct AsyncReadCounters
{
/// Count current and max number of tasks in a asynchronous read pool.
/// The tasks are requests to read the data.
size_t max_parallel_read_tasks = 0;
size_t current_parallel_read_tasks = 0;
/// Count current and max number of tasks in a reader prefetch read pool.
/// The tasks are calls to IMergeTreeReader::prefetch(), which does not do
/// any reading but creates a request for read. But as we need to wait for
/// marks to be loaded during this prefetch, we do it in a threadpool too.
size_t max_parallel_prefetch_tasks = 0;
size_t current_parallel_prefetch_tasks = 0;
size_t total_prefetch_tasks = 0;
mutable std::mutex mutex;
AsyncReadCounters() = default;
void dumpToMapColumn(IColumn * column) const;
};
}

View File

@ -37,14 +37,14 @@ std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
}
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::asyncReadInto(char * data, size_t size)
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = priority;
request.priority = base_priority + priority;
request.ignore = bytes_to_ignore;
bytes_to_ignore = 0;
@ -58,14 +58,14 @@ std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescripto
}
void AsynchronousReadBufferFromFileDescriptor::prefetch()
void AsynchronousReadBufferFromFileDescriptor::prefetch(int64_t priority)
{
if (prefetch_future.valid())
return;
/// Will request the same amount of data that is read in nextImpl.
prefetch_buffer.resize(internal_buffer.size());
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size());
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
}
@ -108,8 +108,9 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
else
{
/// No pending request. Do synchronous read.
Stopwatch watch;
auto [size, offset] = asyncReadInto(memory.data(), memory.size()).get();
auto [size, offset, _] = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
file_offset_of_buffer_end += size;
@ -151,7 +152,7 @@ AsynchronousReadBufferFromFileDescriptor::AsynchronousReadBufferFromFileDescript
std::optional<size_t> file_size_)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, reader(reader_)
, priority(priority_)
, base_priority(priority_)
, required_alignment(alignment)
, fd(fd_)
{

View File

@ -17,7 +17,7 @@ class AsynchronousReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
IAsynchronousReader & reader;
Int32 priority;
int64_t base_priority;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
@ -46,7 +46,7 @@ public:
~AsynchronousReadBufferFromFileDescriptor() override;
void prefetch() override;
void prefetch(int64_t priority) override;
int getFD() const
{
@ -67,7 +67,7 @@ public:
size_t getFileSize() override;
private:
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
};
}

View File

@ -5,6 +5,7 @@
#include <memory>
#include <future>
#include <boost/noncopyable.hpp>
#include <Common/Stopwatch.h>
namespace DB
@ -62,6 +63,8 @@ public:
/// Optional. Useful when implementation needs to do ignore().
size_t offset = 0;
std::unique_ptr<Stopwatch> execution_watch;
operator std::tuple<size_t &, size_t &>() { return {size, offset}; }
};

View File

@ -19,7 +19,7 @@ public:
const ReadBuffer & getWrappedReadBuffer() const { return *in; }
ReadBuffer & getWrappedReadBuffer() { return *in; }
void prefetch() override { in->prefetch(); }
void prefetch(int64_t priority) override { in->prefetch(priority); }
protected:
std::unique_ptr<ReadBuffer> in;

View File

@ -20,7 +20,7 @@ public:
~PeekableReadBuffer() override;
void prefetch() override { sub_buf->prefetch(); }
void prefetch(int64_t priority) override { sub_buf->prefetch(priority); }
/// Sets checkpoint at current position
ALWAYS_INLINE inline void setCheckpoint()

View File

@ -20,6 +20,8 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
static constexpr auto DEFAULT_PREFETCH_PRIORITY = 0;
/** A simple abstract class for buffered data reading (char sequences) from somewhere.
* Unlike std::istream, it provides access to the internal buffer,
* and also allows you to manually manage the position inside the buffer.
@ -202,8 +204,10 @@ public:
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
* It's used for asynchronous readers with double-buffering.
* `priority` is the Threadpool priority, with which the prefetch task will be schedules.
* Smaller is more priority.
*/
virtual void prefetch() {}
virtual void prefetch(int64_t /* priority */) {}
/**
* Set upper bound for read range [..., position).

View File

@ -118,7 +118,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
}
void ReadBufferFromFileDescriptor::prefetch()
void ReadBufferFromFileDescriptor::prefetch(int64_t)
{
#if defined(POSIX_FADV_WILLNEED)
/// For direct IO, loading data into page cache is pointless.

View File

@ -22,7 +22,7 @@ protected:
int fd;
bool nextImpl() override;
void prefetch() override;
void prefetch(int64_t priority) override;
/// Name or some description of file.
std::string getFileName() const override;

View File

@ -12,6 +12,7 @@
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
#include <Common/logger_useful.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <base/sleep.h>
#include <utility>
@ -20,6 +21,7 @@
namespace ProfileEvents
{
extern const Event ReadBufferFromS3Microseconds;
extern const Event ReadBufferFromS3InitMicroseconds;
extern const Event ReadBufferFromS3Bytes;
extern const Event ReadBufferFromS3RequestsErrors;
extern const Event ReadBufferSeekCancelConnection;
@ -323,6 +325,8 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
if (read_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);

View File

@ -89,6 +89,8 @@ struct ReadSettings
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;
bool enable_filesystem_read_prefetches_log = false;
bool enable_filesystem_cache = true;
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;

View File

@ -17,7 +17,7 @@ public:
off_t seek(off_t off, int whence) override;
void prefetch() override { impl->prefetch(); }
void prefetch(int64_t priority) override { impl->prefetch(priority); }
private:
UInt64 min_bytes_for_seek; /// Minimum positive seek offset which shall be executed using seek operation.

View File

@ -235,6 +235,7 @@ struct ContextSharedPart : boost::noncopyable
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable QueryCachePtr query_cache; /// Cache of query results.
@ -413,6 +414,20 @@ struct ContextSharedPart : boost::noncopyable
}
}
if (prefetch_threadpool)
{
try
{
LOG_DEBUG(log, "Desctructing prefetch threadpool");
prefetch_threadpool->wait();
prefetch_threadpool.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
try
{
shutdown();
@ -1994,6 +2009,31 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool;
}
static size_t getPrefetchThreadpoolSizeFromConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.getUInt(".prefetch_threadpool_pool_size", 100);
}
size_t Context::getPrefetchThreadpoolSize() const
{
const auto & config = getConfigRef();
return getPrefetchThreadpoolSizeFromConfig(config);
}
ThreadPool & Context::getPrefetchThreadpool() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->prefetch_threadpool)
{
auto pool_size = getPrefetchThreadpoolSize();
auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000);
shared->prefetch_threadpool = std::make_unique<ThreadPool>(pool_size, pool_size, queue_size);
}
return *shared->prefetch_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
@ -2933,7 +2973,16 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
if (!shared->system_logs)
return {};
return shared->system_logs->cache_log;
return shared->system_logs->filesystem_cache_log;
}
std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetchesLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->filesystem_read_prefetches_log;
}
std::shared_ptr<AsynchronousInsertLog> Context::getAsynchronousInsertLog() const
@ -3838,6 +3887,31 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
return shared->common_executor;
}
static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
}
case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
return config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
}
case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::numeric_limits<std::size_t>::max();
}
}
}
size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
return getThreadPoolReaderSizeFromConfig(type, config);
}
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
@ -3850,9 +3924,8 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
{
if (!shared->asynchronous_remote_fs_reader)
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 100);
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
@ -3862,9 +3935,8 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
{
if (!shared->asynchronous_local_fs_reader)
{
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
@ -3922,6 +3994,8 @@ ReadSettings Context::getReadSettings() const
res.load_marks_asynchronously = settings.load_marks_asynchronously;
res.enable_filesystem_read_prefetches_log = settings.enable_filesystem_read_prefetches_log;
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.enable_filesystem_cache = settings.enable_filesystem_cache;
@ -3974,6 +4048,14 @@ WriteSettings Context::getWriteSettings() const
return res;
}
std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
{
auto lock = getLock();
if (!async_read_counters)
async_read_counters = std::make_shared<AsyncReadCounters>();
return async_read_counters;
}
bool Context::canUseParallelReplicasOnInitiator() const
{
const auto & settings = getSettingsRef();

View File

@ -10,6 +10,7 @@
#include <Core/NamesAndTypes.h>
#include <Core/Settings.h>
#include <Core/UUID.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h>
@ -95,6 +96,7 @@ class BackupsWorker;
class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class IAsynchronousReader;
struct MergeTreeSettings;
@ -375,6 +377,8 @@ private:
/// Needs to be changed while having const context in factories methods
mutable QueryFactoriesInfo query_factories_info;
/// Query metrics for reading data asynchronously with IAsynchronousReader.
mutable std::shared_ptr<AsyncReadCounters> async_read_counters;
/// TODO: maybe replace with temporary tables?
StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views
@ -862,6 +866,13 @@ public:
void dropMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;
ThreadPool & getPrefetchThreadpool() const;
/// Note: prefetchThreadpool is different from threadpoolReader
/// in the way that its tasks are - wait for marks to be loaded
/// and make a prefetch by putting a read task to threadpoolReader.
size_t getPrefetchThreadpoolSize() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
@ -949,6 +960,7 @@ public:
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
/// Returns an object used to log operations with parts if it possible.
@ -1090,6 +1102,10 @@ public:
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
size_t getThreadPoolReaderSize(FilesystemReaderType type) const;
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;
/** Get settings for reading from filesystem. */

View File

@ -0,0 +1,61 @@
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
NamesAndTypesList FilesystemReadPrefetchesLogElement::getNamesAndTypes()
{
return {
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"query_id", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeInt64>()},
{"prefetch_submit_time", std::make_shared<DataTypeDateTime64>(6)},
{"priority", std::make_shared<DataTypeUInt64>()},
{"prefetch_execution_start_time", std::make_shared<DataTypeDateTime64>(6)},
{"prefetch_execution_end_time", std::make_shared<DataTypeDateTime64>(6)},
{"prefetch_execution_time_us", std::make_shared<DataTypeUInt64>()},
{"state", std::make_shared<DataTypeString>()}, /// Was this prefetch used or we downloaded it in vain?
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"reader_id", std::make_shared<DataTypeString>()},
};
}
void FilesystemReadPrefetchesLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(query_id);
columns[i++]->insert(path);
columns[i++]->insert(offset);
columns[i++]->insert(size);
columns[i++]->insert(prefetch_submit_time);
columns[i++]->insert(priority);
if (execution_watch)
{
columns[i++]->insert(execution_watch->getStart());
columns[i++]->insert(execution_watch->getEnd());
columns[i++]->insert(execution_watch->elapsedMicroseconds());
}
else
{
columns[i++]->insertDefault();
columns[i++]->insertDefault();
columns[i++]->insertDefault();
}
columns[i++]->insert(magic_enum::enum_name(state));
columns[i++]->insert(thread_id);
columns[i++]->insert(reader_id);
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Core/NamesAndAliases.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/SystemLog.h>
#include <Common/Stopwatch.h>
namespace DB
{
enum class FilesystemPrefetchState
{
USED,
CANCELLED_WITH_SEEK,
CANCELLED_WITH_RANGE_CHANGE,
UNNEEDED,
};
struct FilesystemReadPrefetchesLogElement
{
time_t event_time{};
String query_id;
String path;
UInt64 offset;
Int64 size; /// -1 means unknown
Decimal64 prefetch_submit_time{};
std::optional<Stopwatch> execution_watch;
size_t priority;
FilesystemPrefetchState state;
UInt64 thread_id;
String reader_id;
static std::string name() { return "FilesystemReadPrefetchesLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
class FilesystemReadPrefetchesLog : public SystemLog<FilesystemReadPrefetchesLogElement>
{
public:
using SystemLog<FilesystemReadPrefetchesLogElement>::SystemLog;
};
}

View File

@ -126,6 +126,8 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"used_row_policies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()))},
{"transaction_id", getTransactionIDDataType()},
{"AsyncReadCounters", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
};
}
@ -271,6 +273,11 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
}
columns[i++]->insert(Tuple{tid.start_csn, tid.local_tid, tid.host_id});
if (async_read_counters)
async_read_counters->dumpToMapColumn(columns[i++].get());
else
columns[i++]->insertDefault();
}
void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableColumns & columns, size_t & i)

View File

@ -6,6 +6,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <IO/AsyncReadCounters.h>
#include <Parsers/IAST.h>
@ -88,6 +89,7 @@ struct QueryLogElement
std::vector<UInt64> thread_ids;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<Settings> query_settings;
TransactionID tid;

View File

@ -3,7 +3,6 @@
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/CrashLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
@ -18,6 +17,8 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
@ -120,6 +121,8 @@ std::shared_ptr<TSystemLog> createSystemLog(
return {};
}
LOG_DEBUG(&Poco::Logger::get("SystemLog"),
"Creating {}.{} from {}", default_database_name, default_table_name, config_prefix);
String database = config.getString(config_prefix + ".database", default_database_name);
String table = config.getString(config_prefix + ".table", default_table_name);
@ -200,7 +203,9 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
crash_log = createSystemLog<CrashLog>(global_context, "system", "crash_log", config, "crash_log");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log");
cache_log = createSystemLog<FilesystemCacheLog>(global_context, "system", "filesystem_cache_log", config, "filesystem_cache_log");
filesystem_cache_log = createSystemLog<FilesystemCacheLog>(global_context, "system", "filesystem_cache_log", config, "filesystem_cache_log");
filesystem_read_prefetches_log = createSystemLog<FilesystemReadPrefetchesLog>(
global_context, "system", "filesystem_read_prefetches_log", config, "filesystem_read_prefetches_log");
asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>(
global_context, "system", "asynchronous_metric_log", config,
"asynchronous_metric_log");
@ -246,8 +251,10 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(transactions_info_log.get());
if (processors_profile_log)
logs.emplace_back(processors_profile_log.get());
if (cache_log)
logs.emplace_back(cache_log.get());
if (filesystem_cache_log)
logs.emplace_back(filesystem_cache_log.get());
if (filesystem_read_prefetches_log)
logs.emplace_back(filesystem_read_prefetches_log.get());
if (asynchronous_insert_log)
logs.emplace_back(asynchronous_insert_log.get());

View File

@ -47,6 +47,7 @@ class SessionLog;
class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
@ -65,7 +66,8 @@ struct SystemLogs
std::shared_ptr<CrashLog> crash_log; /// Used to log server crashes.
std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::shared_ptr<FilesystemCacheLog> cache_log; /// Used to log cache trace.
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.

View File

@ -903,6 +903,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
element.used_functions = factories_info.functions;
element.used_storages = factories_info.storages;
element.used_table_functions = factories_info.table_functions;
element.async_read_counters = context_ptr->getAsyncReadCounters();
};
/// Also make possible for caller to log successful query finish and exception during execution.

View File

@ -33,6 +33,7 @@
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/VirtualColumnUtils.h>
@ -81,6 +82,16 @@ static const PrewhereInfoPtr & getPrewhereInfoFromQueryInfo(const SelectQueryInf
: query_info.prewhere_info;
}
static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
{
for (const auto & part : parts)
{
if (!part.data_part->isStoredOnRemoteDisk())
return false;
}
return true;
}
ReadFromMergeTree::ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
Names real_column_names_,
@ -426,16 +437,6 @@ struct PartRangesReadInfo
bool use_uncompressed_cache = false;
static bool checkAllPartsOnRemoteFS(const RangesInDataParts & parts)
{
for (const auto & part : parts)
{
if (!part.data_part->isStoredOnRemoteDisk())
return false;
}
return true;
}
PartRangesReadInfo(
const RangesInDataParts & parts,
const Settings & settings,
@ -443,6 +444,7 @@ struct PartRangesReadInfo
{
/// Count marks for each part.
sum_marks_in_parts.resize(parts.size());
for (size_t i = 0; i < parts.size(); ++i)
{
total_rows += parts[i].getRowsCount();
@ -463,14 +465,23 @@ struct PartRangesReadInfo
index_granularity_bytes);
auto all_parts_on_remote_disk = checkAllPartsOnRemoteFS(parts);
size_t min_rows_for_concurrent_read;
size_t min_bytes_for_concurrent_read;
if (all_parts_on_remote_disk)
{
min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem;
min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem;
}
else
{
min_rows_for_concurrent_read = settings.merge_tree_min_rows_for_concurrent_read;
min_bytes_for_concurrent_read = settings.merge_tree_min_bytes_for_concurrent_read;
}
min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
all_parts_on_remote_disk ? settings.merge_tree_min_rows_for_concurrent_read_for_remote_filesystem
: settings.merge_tree_min_rows_for_concurrent_read,
all_parts_on_remote_disk ? settings.merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem
: settings.merge_tree_min_bytes_for_concurrent_read,
data_settings.index_granularity,
index_granularity_bytes,
sum_marks);
min_rows_for_concurrent_read, min_bytes_for_concurrent_read,
data_settings.index_granularity, index_granularity_bytes, sum_marks);
use_uncompressed_cache = settings.use_uncompressed_cache;
if (sum_marks > max_marks_to_use_cache)

View File

@ -5,6 +5,7 @@
#include <Common/logger_useful.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/AsynchronousReader.h>
namespace CurrentMetrics
@ -38,7 +39,7 @@ AsynchronousReadBufferFromHDFS::AsynchronousReadBufferFromHDFS(
IAsynchronousReader & reader_, const ReadSettings & settings_, std::shared_ptr<ReadBufferFromHDFS> impl_)
: BufferWithOwnMemory<SeekableReadBuffer>(settings_.remote_fs_buffer_size)
, reader(reader_)
, priority(settings_.priority)
, base_priority(settings_.priority)
, impl(std::move(impl_))
, prefetch_buffer(settings_.remote_fs_buffer_size)
, read_until_position(impl->getFileSize())
@ -63,19 +64,19 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
return true;
}
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size)
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = priority;
request.priority = base_priority + priority;
request.ignore = 0;
return reader.submit(request);
}
void AsynchronousReadBufferFromHDFS::prefetch()
void AsynchronousReadBufferFromHDFS::prefetch(int64_t priority)
{
interval_watch.restart();
@ -85,7 +86,7 @@ void AsynchronousReadBufferFromHDFS::prefetch()
if (!hasPendingDataToRead())
return;
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size());
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -146,7 +147,7 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
{
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
auto result = asyncReadInto(memory.data(), memory.size()).get();
auto result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
size = result.size;
auto offset = result.offset;
@ -166,8 +167,8 @@ bool AsynchronousReadBufferFromHDFS::nextImpl()
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
prefetch_future = {};
if (use_prefetch)
prefetch();
if (use_prefetch && bytes_read)
prefetch(DEFAULT_PREFETCH_PRIORITY);
sum_duration += next_watch.elapsedMicroseconds();
sum_wait += wait;

View File

@ -12,7 +12,6 @@
#include <base/types.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/AsynchronousReader.h>
#include <IO/SeekableReadBuffer.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Interpreters/Context.h>
@ -20,6 +19,8 @@
namespace DB
{
class IAsynchronousReader;
class AsynchronousReadBufferFromHDFS : public BufferWithOwnMemory<SeekableReadBuffer>, public WithFileName, public WithFileSize
{
public:
@ -32,7 +33,7 @@ public:
off_t seek(off_t offset_, int whence) override;
void prefetch() override;
void prefetch(int64_t priority) override;
size_t getFileSize() override;
@ -49,10 +50,10 @@ private:
bool hasPendingDataToRead();
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size);
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, int64_t priority);
IAsynchronousReader & reader;
size_t priority;
int64_t base_priority;
std::shared_ptr<ReadBufferFromHDFS> impl;
std::future<IAsynchronousReader::Result> prefetch_future;
Memory<> prefetch_buffer;

View File

@ -234,13 +234,13 @@ IAsynchronousReader::Result ReadBufferFromHDFS::readInto(char * data, size_t siz
/// TODO: we don't need to copy if there is no pending data
seek(offset, SEEK_SET);
if (eof())
return {0, 0};
return {0, 0, nullptr};
/// Make sure returned size no greater than available bytes in working_buffer
size_t count = std::min(size, available());
memcpy(data, position(), count);
position() += count;
return {count, 0};
return {count, 0, nullptr};
}
String ReadBufferFromHDFS::getFileName() const

View File

@ -5,7 +5,6 @@
#if USE_HDFS
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/AsynchronousReader.h>
#include <string>
#include <memory>
#include <hdfs/hdfs.h>

View File

@ -236,7 +236,7 @@ public:
raw_read_buf = get_raw_read_buf();
if (read_settings.remote_fs_prefetch)
raw_read_buf->prefetch();
raw_read_buf->prefetch(DEFAULT_PREFETCH_PRIORITY);
}
catch (Exception & e)
{

View File

@ -61,6 +61,8 @@ public:
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
virtual void prefetchBeginOfRange(int64_t /* priority */) {}
protected:
/// Returns actual column name in part, which can differ from table metadata.
String getColumnNameInPart(const NameAndTypePair & required_column) const;

View File

@ -5,16 +5,20 @@
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Columns/FilterDescription.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <city.h>
namespace ProfileEvents
{
extern const Event WaitPrefetchTaskMicroseconds;
};
namespace DB
{
@ -61,6 +65,8 @@ IMergeTreeSelectAlgorithm::IMergeTreeSelectAlgorithm(
, use_uncompressed_cache(use_uncompressed_cache_)
, virt_column_names(virt_column_names_)
, partition_value_type(storage.getPartitionValueType())
, owned_uncompressed_cache(use_uncompressed_cache ? storage.getContext()->getUncompressedCache() : nullptr)
, owned_mark_cache(storage.getContext()->getMarkCache())
{
header_without_const_virtual_columns = applyPrewhereActions(std::move(header), prewhere_info);
size_t non_const_columns_offset = header_without_const_virtual_columns.columns();
@ -176,33 +182,89 @@ ChunkAndProgress IMergeTreeSelectAlgorithm::read()
return {Chunk(), num_read_rows, num_read_bytes};
}
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForCurrentTask(
const StorageMetadataPtr & metadata_snapshot,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
reader = data_part->getReader(task_columns.columns, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
value_size_map, profile_callback);
if (!task)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no task");
if (task->reader.valid())
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
reader = task->reader.get();
}
else
{
reader = task->data_part->getReader(
task->task_columns.columns, metadata_snapshot, task->mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback);
}
if (!task->pre_reader_for_step.empty())
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
pre_reader_for_step.clear();
for (auto & pre_reader : task->pre_reader_for_step)
pre_reader_for_step.push_back(pre_reader.get());
}
else
{
initializeMergeTreePreReadersForPart(
task->data_part, task->task_columns, metadata_snapshot,
task->mark_ranges, value_size_map, profile_callback);
}
}
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
reader = data_part->getReader(
task_columns.columns, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback);
initializeMergeTreePreReadersForPart(
data_part, task_columns, metadata_snapshot,
mark_ranges, value_size_map, profile_callback);
}
void IMergeTreeSelectAlgorithm::initializeMergeTreePreReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback)
{
pre_reader_for_step.clear();
/// Add lightweight delete filtering step
if (reader_settings.apply_deleted_mask && data_part->hasLightweightDelete())
{
pre_reader_for_step.push_back(data_part->getReader({LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
value_size_map, profile_callback));
pre_reader_for_step.push_back(
data_part->getReader(
{LightweightDeleteDescription::FILTER_COLUMN}, metadata_snapshot,
mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback));
}
if (prewhere_info)
{
for (const auto & pre_columns_per_step : task_columns.pre_columns)
{
pre_reader_for_step.push_back(data_part->getReader(pre_columns_per_step, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
value_size_map, profile_callback));
pre_reader_for_step.push_back(
data_part->getReader(
pre_columns_per_step, metadata_snapshot, mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(),
reader_settings, value_size_map, profile_callback));
}
}
}

View File

@ -116,10 +116,17 @@ protected:
const Names & non_const_virtual_column_names);
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForCurrentTask(
const StorageMetadataPtr & metadata_snapshot,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
/// Sets up range readers corresponding to data readers
@ -153,8 +160,8 @@ protected:
/// A result of getHeader(). A chunk which this header is returned from read().
Block result_header;
std::shared_ptr<UncompressedCache> owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache;
UncompressedCachePtr owned_uncompressed_cache;
MarkCachePtr owned_mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
@ -181,6 +188,15 @@ private:
bool getNewTask();
/// Initialize pre readers.
void initializeMergeTreePreReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info);
};

View File

@ -141,14 +141,20 @@ MergeTreeReadTask::MergeTreeReadTask(
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
MergeTreeBlockSizePredictorPtr size_predictor_,
int64_t priority_,
std::future<MergeTreeReaderPtr> reader_,
std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_)
: data_part{data_part_}
, mark_ranges{mark_ranges_}
, part_index_in_query{part_index_in_query_}
, column_name_set{column_name_set_}
, task_columns{task_columns_}
, remove_prewhere_column{remove_prewhere_column_}
, size_predictor{std::move(size_predictor_)}
, size_predictor{size_predictor_}
, reader(std::move(reader_))
, pre_reader_for_step(std::move(pre_reader_for_step_))
, priority(priority_)
{
}

View File

@ -5,6 +5,7 @@
#include <Storages/StorageSnapshot.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
namespace DB
@ -67,6 +68,16 @@ struct MergeTreeReadTask
/// NOTE: we take references to elements and push_back new elements, that's why it is a deque but noit a vector
std::deque<MergeTreeRangeReader> pre_range_readers;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
std::future<MergeTreeReaderPtr> reader;
std::vector<std::future<MergeTreeReaderPtr>> pre_reader_for_step;
int64_t priority = 0; /// Priority of the task. Bigger value, bigger priority.
bool operator <(const MergeTreeReadTask & rhs) const
{
return priority < rhs.priority;
}
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
MergeTreeReadTask(
@ -76,9 +87,13 @@ struct MergeTreeReadTask
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_);
MergeTreeBlockSizePredictorPtr size_predictor_,
int64_t priority_ = 0,
std::future<MergeTreeReaderPtr> reader_ = {},
std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_ = {});
};
MergeTreeReadTaskColumns getReadTaskColumns(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,

View File

@ -3,6 +3,8 @@
#include <Core/Settings.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <IO/WriteSettings.h>
#include <Compression/CompressionFactory.h>
#include <Compression/ICompressionCodec.h>
namespace DB

View File

@ -0,0 +1,597 @@
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/Operators.h>
#include <base/getThreadId.h>
namespace ProfileEvents
{
extern const Event MergeTreePrefetchedReadPoolInit;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
size_t threads,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
size_t preferred_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
ContextPtr context_,
bool use_uncompressed_cache_,
bool is_remote_read_,
const MergeTreeSettings & storage_settings_)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
parts_,
(preferred_block_size_bytes_ > 0),
/*do_not_steal_tasks_*/false)
, WithContext(context_)
, log(&Poco::Logger::get("MergeTreePrefetchedReadPool(" + (parts_.empty() ? "" : parts_.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
, header(storage_snapshot_->getSampleBlockForColumns(column_names_))
, mark_cache(context_->getGlobalContext()->getMarkCache().get())
, uncompressed_cache(use_uncompressed_cache_ ? context_->getGlobalContext()->getUncompressedCache().get() : nullptr)
, reader_settings(reader_settings_)
, profile_callback([this](ReadBufferFromFileBase::ProfileInfo info_) { profileFeedback(info_); })
, index_granularity_bytes(storage_settings_.index_granularity_bytes)
, fixed_index_granularity(storage_settings_.index_granularity)
, is_remote_read(is_remote_read_)
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
{
/// Tasks creation might also create a lost of readers - check they do not
/// do any time consuming operations in ctor.
ProfileEventTimeIncrement<Milliseconds> watch(ProfileEvents::MergeTreePrefetchedReadPoolInit);
parts_infos = getPartsInfos(parts_, preferred_block_size_bytes_);
threads_tasks = createThreadsTasks(threads, sum_marks_, min_marks_for_concurrent_read_);
}
struct MergeTreePrefetchedReadPool::PartInfo
{
MergeTreeData::DataPartPtr data_part;
size_t part_index_in_query;
size_t sum_marks = 0;
MarkRanges ranges;
NameSet column_name_set;
MergeTreeReadTaskColumns task_columns;
MergeTreeBlockSizePredictorPtr size_predictor;
size_t approx_size_of_mark = 0;
size_t prefetch_step_marks = 0;
};
std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedReader(
const IMergeTreeDataPart & data_part,
const NamesAndTypesList & columns,
const MarkRanges & required_ranges,
int64_t priority) const
{
auto reader = data_part.getReader(
columns, storage_snapshot->metadata, required_ranges,
uncompressed_cache, mark_cache, reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
/// In order to make a prefetch we need to wait for marks to be loaded. But we just created
/// a reader (which starts loading marks in its constructor), then if we do prefetch right
/// after creating a reader, it will be very inefficient. We can do prefetch for all parts
/// only inside this MergeTreePrefetchedReadPool, where read tasks are created and distributed,
/// and we cannot block either, therefore make prefetch inside the pool and put the future
/// into the read task (MergeTreeReadTask). When a thread calls getTask(), it will wait for
/// it (if not yet ready) after getting the task.
auto task = [=, reader = std::move(reader), context = getContext()]() mutable -> MergeTreeReaderPtr &&
{
/// For async read metrics in system.query_log.
PrefetchIncrement watch(context->getAsyncReadCounters());
reader->prefetchBeginOfRange(priority);
return std::move(reader);
};
return scheduleFromThreadPool<IMergeTreeDataPart::MergeTreeReaderPtr>(std::move(task), prefetch_threadpool, "ReadPrepare", priority);
}
void MergeTreePrefetchedReadPool::createPrefetchedReaderForTask(MergeTreeReadTask & task) const
{
if (task.reader.valid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task already has a reader");
task.reader = createPrefetchedReader(*task.data_part, task.task_columns.columns, task.mark_ranges, task.priority);
if (reader_settings.apply_deleted_mask && task.data_part->hasLightweightDelete())
{
auto pre_reader = createPrefetchedReader(*task.data_part, {LightweightDeleteDescription::FILTER_COLUMN}, task.mark_ranges, task.priority);
task.pre_reader_for_step.push_back(std::move(pre_reader));
}
if (prewhere_info)
{
for (const auto & pre_columns_per_step : task.task_columns.pre_columns)
{
auto pre_reader = createPrefetchedReader(*task.data_part, pre_columns_per_step, task.mark_ranges, task.priority);
task.pre_reader_for_step.push_back(std::move(pre_reader));
}
}
}
bool MergeTreePrefetchedReadPool::TaskHolder::operator <(const TaskHolder & other) const
{
return task->priority < other.task->priority;
}
void MergeTreePrefetchedReadPool::startPrefetches() const
{
for (const auto & task : prefetch_queue)
{
createPrefetchedReaderForTask(*task.task);
}
}
MergeTreeReadTaskPtr MergeTreePrefetchedReadPool::getTask(size_t thread)
{
std::lock_guard lock(mutex);
if (threads_tasks.empty())
return nullptr;
if (!started_prefetches)
{
startPrefetches();
started_prefetches = true;
}
auto it = threads_tasks.find(thread);
if (it == threads_tasks.end())
{
ThreadsTasks::iterator non_prefetched_tasks_to_steal = threads_tasks.end();
ThreadsTasks::iterator prefetched_tasks_to_steal = threads_tasks.end();
int64_t best_prefetched_task_priority = -1;
/// There is no point stealing in order (like in MergeTreeReadPool, where tasks can be stolen
/// only from the next thread). Even if we steal task from the next thread, which reads from
/// the same part as we just read, it might seem that we can reuse our own reader, do some
/// seek avoiding and it will have a good result as we avoided seek (new request). But it is
/// not so, because this next task will most likely have its own reader a prefetch already on
/// the fly. (Not to mention that in fact we cannot reuse our own reader if initially we did
/// not accounted this range into range request to object storage).
for (auto thread_tasks_it = threads_tasks.begin(); thread_tasks_it != threads_tasks.end(); ++thread_tasks_it)
{
/// Prefer to steal tasks which have an initialized reader (with prefetched data). Thus we avoid
/// losing a prefetch by creating our own reader (or resusing our own reader if the part
/// is the same as last read by this thread).
auto & thread_tasks = thread_tasks_it->second;
auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->reader.valid(); });
if (task_it == thread_tasks.end())
{
/// The follow back to non-prefetched task should lie on the thread which
/// has more tasks than others.
if (non_prefetched_tasks_to_steal == threads_tasks.end()
|| non_prefetched_tasks_to_steal->second.size() < thread_tasks.size())
non_prefetched_tasks_to_steal = thread_tasks_it;
}
/// Try to steal task with the best (lowest) priority (because it will be executed faster).
else if (prefetched_tasks_to_steal == threads_tasks.end()
|| (*task_it)->priority < best_prefetched_task_priority)
{
best_prefetched_task_priority = (*task_it)->priority;
chassert(best_prefetched_task_priority >= 0);
prefetched_tasks_to_steal = thread_tasks_it;
}
}
if (prefetched_tasks_to_steal != threads_tasks.end())
{
auto & thread_tasks = prefetched_tasks_to_steal->second;
assert(!thread_tasks.empty());
auto task_it = std::find_if(
thread_tasks.begin(), thread_tasks.end(),
[](const auto & task) { return task->reader.valid(); });
assert(task_it != thread_tasks.end());
auto task = std::move(*task_it);
thread_tasks.erase(task_it);
if (thread_tasks.empty())
threads_tasks.erase(prefetched_tasks_to_steal);
return task;
}
/// TODO: it also makes sense to first try to steal from the next thread if it has ranges
/// from the same part as current thread last read - to reuse the reader.
if (non_prefetched_tasks_to_steal != threads_tasks.end())
{
auto & thread_tasks = non_prefetched_tasks_to_steal->second;
assert(!thread_tasks.empty());
/// Get second half of the tasks.
const size_t total_tasks = thread_tasks.size();
const size_t half = total_tasks / 2;
auto half_it = thread_tasks.begin() + half;
assert(half_it != thread_tasks.end());
/// Give them to current thread, as current thread's tasks list is empty.
auto & current_thread_tasks = threads_tasks[thread];
current_thread_tasks.insert(
current_thread_tasks.end(), make_move_iterator(half_it), make_move_iterator(thread_tasks.end()));
/// Erase them from the thread from which we steal.
thread_tasks.resize(half);
if (thread_tasks.empty())
threads_tasks.erase(non_prefetched_tasks_to_steal);
auto task = std::move(current_thread_tasks.front());
current_thread_tasks.erase(current_thread_tasks.begin());
if (current_thread_tasks.empty())
threads_tasks.erase(thread);
return task;
}
return nullptr;
}
auto & thread_tasks = it->second;
assert(!thread_tasks.empty());
auto task = std::move(thread_tasks.front());
thread_tasks.pop_front();
if (thread_tasks.empty())
threads_tasks.erase(it);
return task;
}
size_t MergeTreePrefetchedReadPool::getApproxSizeOfGranule(const IMergeTreeDataPart & part) const
{
const auto & columns = part.getColumns();
auto all_columns_are_fixed_size = columns.end() == std::find_if(
columns.begin(), columns.end(),
[](const auto & col){ return col.type->haveMaximumSizeOfValue() == false; });
if (all_columns_are_fixed_size)
{
size_t approx_size = 0;
for (const auto & col : columns)
approx_size += col.type->getMaximumSizeOfValueInMemory() * fixed_index_granularity;
if (!index_granularity_bytes)
return approx_size;
return std::min(index_granularity_bytes, approx_size);
}
const size_t approx_size = static_cast<size_t>(std::round(static_cast<double>(part.getBytesOnDisk()) / part.getMarksCount()));
if (!index_granularity_bytes)
return approx_size;
return std::min(index_granularity_bytes, approx_size);
}
MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInfos(
const RangesInDataParts & parts, size_t preferred_block_size_bytes) const
{
PartsInfos result;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
const bool predict_block_size_bytes = preferred_block_size_bytes > 0;
for (const auto & part : parts)
{
auto part_info = std::make_unique<PartInfo>();
part_info->data_part = part.data_part;
part_info->part_index_in_query = part.part_index_in_query;
part_info->ranges = part.ranges;
std::sort(part_info->ranges.begin(), part_info->ranges.end());
/// Sum up total size of all mark ranges in a data part.
for (const auto & range : part.ranges)
{
part_info->sum_marks += range.end - range.begin;
}
part_info->approx_size_of_mark = getApproxSizeOfGranule(*part_info->data_part);
const auto task_columns = getReadTaskColumns(
LoadedMergeTreeDataPartInfoForReader(part.data_part),
storage_snapshot,
column_names,
virtual_column_names,
prewhere_info,
/* with_subcolumns */true);
part_info->size_predictor = !predict_block_size_bytes
? nullptr
: IMergeTreeSelectAlgorithm::getSizePredictor(part.data_part, task_columns, sample_block);
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter.
const auto & required_column_names = task_columns.columns.getNames();
part_info->column_name_set = {required_column_names.begin(), required_column_names.end()};
part_info->task_columns = task_columns;
result.push_back(std::move(part_info));
}
return result;
}
MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThreadsTasks(
size_t threads, size_t sum_marks, size_t /* min_marks_for_concurrent_read */) const
{
if (parts_infos.empty())
return {};
const auto & context = getContext();
const auto & settings = context->getSettingsRef();
size_t total_size_approx = 0;
for (const auto & part : parts_infos)
{
total_size_approx += part->sum_marks * part->approx_size_of_mark;
}
size_t min_prefetch_step_marks = 0;
if (settings.filesystem_prefetches_limit && settings.filesystem_prefetches_limit < sum_marks)
{
min_prefetch_step_marks = static_cast<size_t>(std::round(static_cast<double>(sum_marks) / settings.filesystem_prefetches_limit));
}
size_t total_prefetches_approx = 0;
for (const auto & part : parts_infos)
{
if (settings.filesystem_prefetch_step_marks)
{
part->prefetch_step_marks = settings.filesystem_prefetch_step_marks;
}
else if (settings.filesystem_prefetch_step_bytes && part->approx_size_of_mark)
{
part->prefetch_step_marks = std::max<size_t>(
1, static_cast<size_t>(std::round(static_cast<double>(settings.filesystem_prefetch_step_bytes) / part->approx_size_of_mark)));
}
else
{
/// Experimentally derived ratio.
part->prefetch_step_marks = static_cast<size_t>(
std::round(std::pow(std::max<size_t>(1, static_cast<size_t>(std::round(sum_marks / 1000))), double(1.5))));
}
/// This limit is important to avoid spikes of slow aws getObject requests when parallelizing within one file.
/// (The default is taken from here https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html).
if (part->approx_size_of_mark
&& settings.filesystem_prefetch_min_bytes_for_single_read_task
&& part->approx_size_of_mark < settings.filesystem_prefetch_min_bytes_for_single_read_task)
{
const size_t new_min_prefetch_step_marks = static_cast<size_t>(
std::ceil(static_cast<double>(settings.filesystem_prefetch_min_bytes_for_single_read_task) / part->approx_size_of_mark));
if (min_prefetch_step_marks < new_min_prefetch_step_marks)
{
LOG_TEST(
log, "Increasing min prefetch step from {} to {}", min_prefetch_step_marks, new_min_prefetch_step_marks);
min_prefetch_step_marks = new_min_prefetch_step_marks;
}
}
if (part->prefetch_step_marks < min_prefetch_step_marks)
{
LOG_TEST(
log, "Increasing prefetch step from {} to {} because of the prefetches limit {}",
part->prefetch_step_marks, min_prefetch_step_marks, settings.filesystem_prefetches_limit);
part->prefetch_step_marks = min_prefetch_step_marks;
}
LOG_TEST(log,
"Part: {}, sum_marks: {}, approx mark size: {}, prefetch_step_bytes: {}, prefetch_step_marks: {}, (ranges: {})",
part->data_part->name, part->sum_marks, part->approx_size_of_mark,
settings.filesystem_prefetch_step_bytes, part->prefetch_step_marks, toString(part->ranges));
}
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
LOG_DEBUG(
log,
"Sum marks: {}, threads: {}, min_marks_per_thread: {}, result prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}",
sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx);
size_t current_prefetches_count = 0;
prefetch_queue.reserve(total_prefetches_approx);
ThreadsTasks result_threads_tasks;
size_t memory_usage_approx = 0;
for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i)
{
auto need_marks = min_marks_per_thread;
/// Priority is given according to the prefetch number for each thread,
/// e.g. the first task of each thread has the same priority and is bigger
/// than second task of each thread, and so on.
/// Add 1 to query read priority because higher priority should be given to
/// reads from pool which are from reader.
int64_t priority = reader_settings.read_settings.priority + 1;
while (need_marks > 0 && part_idx < parts_infos.size())
{
auto & part = *parts_infos[part_idx];
size_t & marks_in_part = part.sum_marks;
if (marks_in_part == 0)
{
++part_idx;
continue;
}
MarkRanges ranges_to_get_from_part;
size_t marks_to_get_from_part = std::min(need_marks, marks_in_part);
/// Split by prefetch step even if !allow_prefetch below. Because it will allow
/// to make a better distribution of tasks which did not fill into memory limit
/// or prefetches limit through tasks stealing.
if (part.prefetch_step_marks)
{
marks_to_get_from_part = std::min<size_t>(marks_to_get_from_part, part.prefetch_step_marks);
}
if (marks_in_part == marks_to_get_from_part)
{
ranges_to_get_from_part = part.ranges;
}
else
{
if (part.sum_marks < marks_to_get_from_part)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Requested {} marks from part {}, but part has only {} marks",
marks_to_get_from_part, part.data_part->name, part.sum_marks);
}
size_t get_marks_num = marks_to_get_from_part;
while (get_marks_num > 0)
{
MarkRange & range = part.ranges.front();
const size_t marks_in_range = range.end - range.begin;
const size_t marks_to_get_from_range = std::min(marks_in_range, get_marks_num);
get_marks_num -= marks_to_get_from_range;
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
if (range.begin == range.end)
{
part.ranges.pop_front();
}
else if (!get_marks_num && part.prefetch_step_marks && range.end - range.begin < part.prefetch_step_marks)
{
/// We already have `get_marks_num` marks, but current mark range has
/// less than `prefetch_step_marks` marks, then add them too.
ranges_to_get_from_part.emplace_back(range.begin, range.end);
marks_to_get_from_part += range.end - range.begin;
part.ranges.pop_front();
}
}
}
need_marks -= marks_to_get_from_part;
sum_marks -= marks_to_get_from_part;
marks_in_part -= marks_to_get_from_part;
auto curr_task_size_predictor = !part.size_predictor ? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(*part.size_predictor); /// make a copy
auto read_task = std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query,
part.column_name_set, part.task_columns, prewhere_info && prewhere_info->remove_prewhere_column,
std::move(curr_task_size_predictor));
read_task->priority = priority;
bool allow_prefetch = !settings.filesystem_prefetches_limit || current_prefetches_count + 1 <= settings.filesystem_prefetches_limit;
if (allow_prefetch && settings.filesystem_prefetch_max_memory_usage)
{
size_t num_readers = 1;
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
++num_readers;
if (prewhere_info)
num_readers += part.task_columns.pre_columns.size();
memory_usage_approx += settings.max_read_buffer_size * num_readers;
allow_prefetch = memory_usage_approx <= settings.filesystem_prefetch_max_memory_usage;
}
if (allow_prefetch)
{
prefetch_queue.emplace(TaskHolder(read_task.get()));
++current_prefetches_count;
}
++priority;
result_threads_tasks[i].push_back(std::move(read_task));
}
}
LOG_TEST(
log, "Result tasks {} for {} threads: {}",
result_threads_tasks.size(), threads, dumpTasks(result_threads_tasks));
return result_threads_tasks;
}
MergeTreePrefetchedReadPool::~MergeTreePrefetchedReadPool()
{
for (const auto & [_, thread_tasks] : threads_tasks)
{
for (const auto & task : thread_tasks)
{
if (task->reader.valid())
task->reader.wait();
for (const auto & pre_reader : task->pre_reader_for_step)
{
if (pre_reader.valid())
pre_reader.wait();
}
}
}
}
std::string MergeTreePrefetchedReadPool::dumpTasks(const ThreadsTasks & tasks)
{
WriteBufferFromOwnString result;
for (const auto & [thread_id, thread_tasks] : tasks)
{
result << "\tthread id: " << toString(thread_id) << ", tasks: " << toString(thread_tasks.size());
if (!thread_tasks.empty())
{
size_t no = 0;
for (const auto & task : thread_tasks)
{
result << '\t';
result << ++no << ": ";
result << "reader: " << task->reader.valid() << ", ";
result << "part: " << task->data_part->name << ", ";
result << "ranges: " << toString(task->mark_ranges);
}
}
}
return result.str();
}
bool MergeTreePrefetchedReadPool::checkReadMethodAllowed(LocalFSReadMethod method)
{
return method == LocalFSReadMethod::pread_threadpool || method == LocalFSReadMethod::pread_fake_async;
}
bool MergeTreePrefetchedReadPool::checkReadMethodAllowed(RemoteFSReadMethod method)
{
return method == RemoteFSReadMethod::threadpool;
}
}

View File

@ -0,0 +1,128 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Core/BackgroundSchedulePool.h>
#include <IO/AsyncReadCounters.h>
#include <queue>
namespace DB
{
class IMergeTreeReader;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
/// A class which is responsible for creating read tasks
/// which are later taken by readers via getTask method.
/// Does prefetching for the read tasks it creates.
class MergeTreePrefetchedReadPool : public IMergeTreeReadPool, private WithContext
{
public:
MergeTreePrefetchedReadPool(
size_t threads,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
size_t preferred_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
ContextPtr context_,
bool use_uncompressed_cache_,
bool is_remote_read_,
const MergeTreeSettings & storage_settings_);
~MergeTreePrefetchedReadPool() override;
MergeTreeReadTaskPtr getTask(size_t thread) override;
void profileFeedback(ReadBufferFromFileBase::ProfileInfo) override {}
Block getHeader() const override { return header; }
static bool checkReadMethodAllowed(LocalFSReadMethod method);
static bool checkReadMethodAllowed(RemoteFSReadMethod method);
private:
struct PartInfo;
using PartInfoPtr = std::shared_ptr<PartInfo>;
using PartsInfos = std::vector<PartInfoPtr>;
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
using ThreadTasks = std::deque<MergeTreeReadTaskPtr>;
using ThreadsTasks = std::map<size_t, ThreadTasks>;
/// smaller `priority` means more priority
std::future<MergeTreeReaderPtr> createPrefetchedReader(
const IMergeTreeDataPart & data_part,
const NamesAndTypesList & columns,
const MarkRanges & required_ranges,
int64_t priority) const;
void createPrefetchedReaderForTask(MergeTreeReadTask & task) const;
size_t getApproxSizeOfGranule(const IMergeTreeDataPart & part) const;
PartsInfos getPartsInfos(const RangesInDataParts & parts, size_t preferred_block_size_bytes) const;
ThreadsTasks createThreadsTasks(
size_t threads,
size_t sum_marks,
size_t min_marks_for_concurrent_read) const;
void startPrefetches() const;
static std::string dumpTasks(const ThreadsTasks & tasks);
Poco::Logger * log;
Block header;
MarkCache * mark_cache;
UncompressedCache * uncompressed_cache;
MergeTreeReaderSettings reader_settings;
ReadBufferFromFileBase::ProfileCallback profile_callback;
size_t index_granularity_bytes;
size_t fixed_index_granularity;
[[ maybe_unused ]] const bool is_remote_read;
ThreadPool & prefetch_threadpool;
PartsInfos parts_infos;
ThreadsTasks threads_tasks;
struct TaskHolder
{
explicit TaskHolder(MergeTreeReadTask * task_) : task(task_) {}
MergeTreeReadTask * task;
bool operator <(const TaskHolder & other) const;
};
mutable boost::heap::priority_queue<TaskHolder> prefetch_queue;
bool started_prefetches = false;
/// A struct which allows to track max number of tasks which were in the
/// threadpool simultaneously (similar to CurrentMetrics, but the result
/// will be put to QueryLog).
struct PrefetchIncrement : boost::noncopyable
{
explicit PrefetchIncrement(std::shared_ptr<AsyncReadCounters> counters_)
: counters(counters_)
{
std::lock_guard lock(counters->mutex);
++counters->total_prefetch_tasks;
if (++counters->current_parallel_prefetch_tasks > counters->max_parallel_prefetch_tasks)
counters->max_parallel_prefetch_tasks = counters->current_parallel_prefetch_tasks;
}
~PrefetchIncrement()
{
std::lock_guard lock(counters->mutex);
--counters->current_parallel_prefetch_tasks;
}
std::shared_ptr<AsyncReadCounters> counters;
};
};
}

View File

@ -154,10 +154,8 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t thread)
/// Get whole part to read if it is small enough.
if (marks_in_part <= need_marks)
{
const auto marks_to_get_from_range = marks_in_part;
ranges_to_get_from_part = thread_task.ranges;
marks_in_part -= marks_to_get_from_range;
marks_in_part = 0;
thread_tasks.parts_and_ranges.pop_back();
thread_tasks.sum_marks_in_parts.pop_back();

View File

@ -14,6 +14,7 @@
namespace DB
{
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
@ -115,21 +116,6 @@ public:
BackoffSettings backoff_settings;
private:
/** State to track numbers of slow reads.
*/
struct BackoffState
{
size_t current_threads;
Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE};
size_t num_events = 0;
explicit BackoffState(size_t threads) : current_threads(threads) {}
};
BackoffState backoff_state;
public:
MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
@ -144,6 +130,7 @@ public:
bool do_not_steal_tasks_ = false);
~MergeTreeReadPool() override = default;
MergeTreeReadTaskPtr getTask(size_t thread) override;
/** Each worker could call this method and pass information about read performance.
@ -160,6 +147,26 @@ private:
size_t threads, size_t sum_marks, std::vector<size_t> per_part_sum_marks,
const RangesInDataParts & parts);
/// State to track numbers of slow reads.
struct BackoffState
{
size_t current_threads;
Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE};
size_t num_events = 0;
explicit BackoffState(size_t threads) : current_threads(threads) {}
};
BackoffState backoff_state;
struct Part
{
MergeTreeData::DataPartPtr data_part;
size_t part_index_in_query;
};
std::vector<Part> parts_with_idx;
struct ThreadTask
{
struct PartIndexAndRange

View File

@ -58,10 +58,77 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
}
void MergeTreeReaderWide::prefetchBeginOfRange(int64_t priority)
{
prefetched_streams.clear();
try
{
prefetchForAllColumns(priority, columns_to_read.size(), all_mark_ranges.front().begin, all_mark_ranges.back().end, false);
prefetched_from_mark = all_mark_ranges.front().begin;
/// Arguments explanation:
/// Current prefetch is done for read tasks before they can be picked by reading threads in IMergeTreeReadPool::getTask method.
/// 1. columns_to_read.size() == requested_columns.size() == readRows::res_columns.size().
/// 3. current_task_last_mark argument in readRows() (which is used only for reading from remote fs to make precise
/// ranged read requests) is different from current reader's IMergeTreeReader::all_mark_ranges.back().end because
/// the same reader can be reused between read tasks - if the new task mark ranges correspond to the same part we last
/// read, so we cannot rely on all_mark_ranges and pass actual current_task_last_mark. But here we can do prefetch for begin
/// of range only once so there is no such problem.
/// 4. continue_reading == false, as we haven't read anything yet.
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
throw;
}
}
void MergeTreeReaderWide::prefetchForAllColumns(
int64_t priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading)
{
bool do_prefetch = data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk()
? settings.read_settings.remote_fs_prefetch
: settings.read_settings.local_fs_prefetch;
if (!do_prefetch)
return;
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
for (size_t pos = 0; pos < num_columns; ++pos)
{
try
{
auto & cache = caches[columns_to_read[pos].getNameInStorage()];
prefetchForColumn(
priority, columns_to_read[pos], serializations[pos], from_mark, continue_reading,
current_task_last_mark, cache);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
throw;
}
}
}
size_t MergeTreeReaderWide::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
size_t read_rows = 0;
if (prefetched_from_mark != -1 && static_cast<size_t>(prefetched_from_mark) != from_mark)
{
prefetched_streams.clear();
prefetched_from_mark = -1;
}
try
{
size_t num_columns = res_columns.size();
@ -70,28 +137,7 @@ size_t MergeTreeReaderWide::readRows(
if (num_columns == 0)
return max_rows_to_read;
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;
if (data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
for (size_t pos = 0; pos < num_columns; ++pos)
{
try
{
auto & cache = caches[columns_to_read[pos].getNameInStorage()];
prefetch(columns_to_read[pos], serializations[pos], from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
throw;
}
}
}
prefetchForAllColumns(/* priority */0, num_columns, from_mark, current_task_last_mark, continue_reading);
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -129,6 +175,9 @@ size_t MergeTreeReaderWide::readRows(
res_columns[pos] = nullptr;
}
prefetched_streams.clear();
caches.clear();
/// NOTE: positions for all streams must be kept in sync.
/// In particular, even if for some streams there are no rows to be read,
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
@ -139,9 +188,11 @@ size_t MergeTreeReaderWide::readRows(
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + " "
"from mark " + toString(from_mark) + " "
"with max_rows_to_read = " + toString(max_rows_to_read) + ")");
e.addMessage(
fmt::format(
"(while reading from part {} from mark {} with max_rows_to_read = {})",
data_part_info_for_read->getDataPartStorage()->getFullPath(),
toString(from_mark), toString(max_rows_to_read)));
throw;
}
catch (...)
@ -253,14 +304,14 @@ void MergeTreeReaderWide::deserializePrefix(
}
}
void MergeTreeReaderWide::prefetch(
void MergeTreeReaderWide::prefetchForColumn(
int64_t priority,
const NameAndTypePair & name_and_type,
const SerializationPtr & serialization,
size_t from_mark,
bool continue_reading,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams)
ISerialization::SubstreamsCache & cache)
{
deserializePrefix(serialization, name_and_type, current_task_last_mark, cache);
@ -272,9 +323,10 @@ void MergeTreeReaderWide::prefetch(
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
buf->prefetch();
prefetched_streams.insert(stream_name);
{
buf->prefetch(priority);
prefetched_streams.insert(stream_name);
}
}
});
}

View File

@ -33,11 +33,15 @@ public:
bool canReadIncompleteGranules() const override { return true; }
void prefetchBeginOfRange(int64_t priority) override;
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
private:
FileStreams streams;
void prefetchForAllColumns(int64_t priority, size_t num_columns, size_t from_mark, size_t current_task_last_mark, bool continue_reading);
void addStreams(
const NameAndTypePair & name_and_type,
const SerializationPtr & serialization,
@ -50,20 +54,24 @@ private:
ISerialization::SubstreamsCache & cache, bool was_prefetched);
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
void prefetch(
void prefetchForColumn(
int64_t priority,
const NameAndTypePair & name_and_type,
const SerializationPtr & serialization,
size_t from_mark,
bool continue_reading,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams); /// if stream was already prefetched do nothing
ISerialization::SubstreamsCache & cache);
void deserializePrefix(
const SerializationPtr & serialization,
const NameAndTypePair & name_and_type,
size_t current_task_last_mark,
ISerialization::SubstreamsCache & cache);
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;
ssize_t prefetched_from_mark = -1;
};
}

View File

@ -57,8 +57,8 @@ void MergeTreeSelectAlgorithm::initializeReaders()
owned_mark_cache = storage.getContext()->getMarkCache();
initializeMergeTreeReadersForPart(data_part, task_columns, storage_snapshot->getMetadataForQuery(),
all_mark_ranges, {}, {});
initializeMergeTreeReadersForPart(
data_part, task_columns, storage_snapshot->getMetadataForQuery(), all_mark_ranges, {}, {});
}

View File

@ -50,22 +50,18 @@ void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
IMergeTreeReader::ValueSizeMap value_size_map;
if (!reader)
{
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
owned_mark_cache = storage.getContext()->getMarkCache();
}
else if (part_name != last_read_part_name)
if (reader && part_name != last_read_part_name)
{
value_size_map = reader->getAvgValueSizeHints();
}
const bool init_new_readers = !reader || part_name != last_read_part_name;
/// task->reader.valid() means there is a prefetched reader in this test, use it.
const bool init_new_readers = !reader || task->reader.valid() || part_name != last_read_part_name;
if (init_new_readers)
{
initializeMergeTreeReadersForPart(task->data_part, task->task_columns, metadata_snapshot,
task->mark_ranges, value_size_map, profile_callback);
initializeMergeTreeReadersForPart(
task->data_part, task->task_columns, metadata_snapshot, task->mark_ranges, value_size_map, profile_callback);
initializeMergeTreeReadersForCurrentTask(metadata_snapshot, value_size_map, profile_callback);
}
last_read_part_name = part_name;

View File

@ -1,6 +1,7 @@
#include <Storages/MessageQueueSink.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -18,6 +19,10 @@ MessageQueueSink::MessageQueueSink(
void MessageQueueSink::onStart()
{
LOG_TEST(
&Poco::Logger::get("MessageQueueSink"),
"Executing startup for MessageQueueSink");
initialize();
producer->start(context);

View File

@ -46,6 +46,8 @@ RabbitMQProducer::RabbitMQProducer(
void RabbitMQProducer::initialize()
{
LOG_TRACE(log, "Initializing producer");
if (connection.connect())
setupChannel();
else
@ -74,11 +76,10 @@ void RabbitMQProducer::finishImpl()
void RabbitMQProducer::produce(const String & message, size_t, const Columns &, size_t)
{
LOG_DEBUG(&Poco::Logger::get("RabbitMQProducer"), "push {}", message);
Payload payload;
payload.message = message;
payload.id = ++payload_counter;
LOG_TEST(log, "Pushing message with id {}", payload.id);
if (!payloads.push(std::move(payload)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue");
}
@ -86,6 +87,7 @@ void RabbitMQProducer::produce(const String & message, size_t, const Columns &,
void RabbitMQProducer::setupChannel()
{
producer_channel = connection.createChannel();
LOG_TRACE(log, "Created a producer channel");
producer_channel->onError([&](const char * message)
{
@ -226,6 +228,8 @@ void RabbitMQProducer::publish(Payloads & messages, bool republishing)
void RabbitMQProducer::startProducingTaskLoop()
{
LOG_TRACE(log, "Starting producer loop");
while ((!payloads.isFinishedAndEmpty() || !returned.empty() || !delivery_record.empty()) && !shutdown_called.load())
{
/// If onReady callback is not received, producer->usable() will anyway return true,

View File

@ -683,7 +683,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
async_reader->prefetch();
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
return async_reader;
}

View File

@ -75,6 +75,7 @@ ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/marks.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/insert_keeper_retries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/prefetch_settings.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,9 @@
<clickhouse>
<profiles>
<default>
<allow_prefetched_read_pool_for_remote_filesystem>1</allow_prefetched_read_pool_for_remote_filesystem>
<allow_prefetched_read_pool_for_local_filesystem>0</allow_prefetched_read_pool_for_local_filesystem>
<filesystem_prefetch_max_memory_usage>1Gi</filesystem_prefetch_max_memory_usage>
</default>
</profiles>
</clickhouse>

View File

@ -301,7 +301,10 @@ def test_host_is_drop_from_cache_after_consecutive_failures(
"Cannot resolve host \\(InvalidHostThatDoesNotExist\\), error 0: Host not found."
)
assert node4.wait_for_log_line(
"Cached hosts not found:.*InvalidHostThatDoesNotExist**", repetitions=6
"Cached hosts not found:.*InvalidHostThatDoesNotExist**",
repetitions=6,
timeout=60,
look_behind_lines=500,
)
assert node4.wait_for_log_line(
"Cached hosts dropped:.*InvalidHostThatDoesNotExist.*"

View File

@ -0,0 +1,32 @@
<clickhouse>
<remote_servers>
<cluster_missing_replica>
<shard>
<replica>
<!-- note: this one doesn't actually exist -->
<host>node1</host>
<port>9000</port>
<default_database>r0</default_database>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
<default_database>r0</default_database>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
<default_database>r0</default_database>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
<default_database>r0</default_database>
</replica>
</shard>
</cluster_missing_replica>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,81 @@
"""
This test makes sure interserver cluster queries handle invalid DNS
records for replicas.
"""
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from contextlib import contextmanager
import multiprocessing.dummy
def bootstrap(cluster: ClickHouseCluster):
node: ClickHouseInstance
for node in cluster.instances.values():
node_number = int(node.name[-1])
# getaddrinfo(...) may hang for a log time without these options.
node.exec_in_container(
[
"bash",
"-c",
'echo -e "options timeout:1\noptions attempts:1" >> /etc/resolv.conf',
],
privileged=True,
user="root",
)
node.query(f"CREATE DATABASE IF NOT EXISTS r0")
node.query(f"CREATE TABLE r0.test_data(v UInt64) ENGINE = Memory()")
node.query(
f"INSERT INTO r0.test_data SELECT number + {node_number} * 10 FROM numbers(10)"
)
node.query(
f"""CREATE TABLE default.test AS r0.test_data ENGINE = Distributed(cluster_missing_replica, 'r0', test_data, rand())"""
)
@contextmanager
def start_cluster():
cluster = ClickHouseCluster(__file__)
# node1 is missing on purpose to test DNS resolution errors.
# It exists in configs/remote_servers.xml to create the failure condition.
for node in ["node2", "node3", "node4"]:
cluster.add_instance(node, main_configs=["configs/remote_servers.xml"])
try:
cluster.start()
bootstrap(cluster)
yield cluster
finally:
cluster.shutdown()
def test_query():
with start_cluster() as cluster:
n_queries = 16
# thread-based pool
p = multiprocessing.dummy.Pool(n_queries)
def send_query(x):
try:
# queries start at operational shard 2, and will hit either the
# 'normal' node2 or the missing node1 on shard 1.
node = (
cluster.instances["node3"]
if (x % 2 == 0)
else cluster.instances["node4"]
)
# numbers between 0 and 19 are on the first ("broken") shard.
# we need to make sure we're querying them successfully
assert node.query(
"SELECT count() FROM default.test where v < (rand64() % 20)"
)
return 1
except QueryRuntimeException as e:
# DNS_ERROR because node1 doesn't exist.
assert 198 == e.returncode
# We shouldn't be getting here due to interserver retries.
raise
p.map(send_query, range(n_queries))

View File

@ -34,6 +34,7 @@ def cluster():
init_list = {
"ReadBufferFromS3Bytes": 0,
"ReadBufferFromS3Microseconds": 0,
"ReadBufferFromS3InitMicroseconds": 0,
"ReadBufferFromS3RequestsErrors": 0,
"WriteBufferFromS3Bytes": 0,
"S3ReadMicroseconds": 0,

View File

@ -35,7 +35,13 @@ def test_move_and_s3_memory_usage(started_single_node_cluster):
)
# After this, we should have 5 columns per 10 * 100 * 400000 ~ 400 MB; total ~2G data in partition
small_node.query("optimize table s3_test_with_ttl final")
small_node.query(
"optimize table s3_test_with_ttl final",
settings={
"send_logs_level": "error",
"allow_prefetched_read_pool_for_remote_filesystem": 0,
},
)
small_node.query("system flush logs")
# Will take memory usage from metric_log.
@ -44,19 +50,26 @@ def test_move_and_s3_memory_usage(started_single_node_cluster):
small_node.query(
"alter table s3_test_with_ttl move partition 0 to volume 'external'",
settings={"send_logs_level": "error"},
settings={
"send_logs_level": "error",
"allow_prefetched_read_pool_for_remote_filesystem": 0,
},
)
small_node.query("system flush logs")
max_usage = small_node.query(
"""select max(m.val - am.val * 4096) from
(select toStartOfMinute(event_time) as time, max(CurrentMetric_MemoryTracking) as val from system.metric_log group by time) as m join
(select toStartOfMinute(event_time) as time, min(value) as val from system.asynchronous_metric_log where metric='jemalloc.arenas.all.pdirty' group by time) as am using time"""
"""
select max(m.val - am.val * 4096) from
(select toStartOfMinute(event_time) as time, max(CurrentMetric_MemoryTracking) as val from system.metric_log group by time) as m join
(select toStartOfMinute(event_time) as time, min(value) as val from system.asynchronous_metric_log where metric='jemalloc.arenas.all.pdirty' group by time) as am using time;"""
)
# 3G limit is a big one. However, we can hit it anyway with parallel s3 writes enabled.
# Also actual value can be bigger because of memory drift.
# Increase it a little bit if test fails.
assert int(max_usage) < 3e9
res = small_node.query(
"select * from system.errors where last_error_message like '%Memory limit%' limit 1"
"select * from system.errors where last_error_message like '%Memory limit%' limit 1",
settings={
"allow_prefetched_read_pool_for_remote_filesystem": 0,
},
)
assert res == ""

View File

@ -17,7 +17,7 @@ cached_query="SELECT count() FROM small_table where n > 0;"
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --use_uncompressed_cache=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 --query_id="test-query-uncompressed-cache" --query="$cached_query" &> /dev/null
$CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
@ -25,4 +25,3 @@ $CLICKHOUSE_CLIENT --query="SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --query="SELECT ProfileEvents['Seek'], ProfileEvents['ReadCompressedBytes'], ProfileEvents['UncompressedCacheHits'] AS hit FROM system.query_log WHERE (query_id = 'test-query-uncompressed-cache') and current_database = currentDatabase() AND (type = 2) AND event_date >= yesterday() ORDER BY event_time DESC LIMIT 1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS small_table"

View File

@ -2,7 +2,7 @@
SET prefer_localhost_replica = 1;
SELECT count() FROM remote('127.0.0.1,localhos', system.one); -- { serverError 198 }
SELECT count() FROM remote('127.0.0.1,localhos', system.one); -- { serverError 279 }
SELECT count() FROM remote('127.0.0.1|localhos', system.one);
-- Clear cache to avoid future errors in the logs

View File

@ -24,7 +24,7 @@ $CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 5"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM m ORDER BY a, s LIMIT 10"
# Not a single .sql test with max_rows_to_read because it doesn't work with Merge storage
rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 | grep "rows_read" | sed 's/[^0-9]*//g')
rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 | grep "rows_read" | sed 's/[^0-9]*//g')
# Expected number of read rows with a bit margin
if [[ $rows_read -lt 500 ]]
@ -36,7 +36,7 @@ fi
$CLICKHOUSE_CLIENT -q "SELECT '---StorageBuffer---'"
$CLICKHOUSE_CLIENT -q "CREATE TABLE buf (a UInt32, s String) engine = Buffer('$CLICKHOUSE_DATABASE', s2, 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM buf ORDER BY a, s LIMIT 10"
rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM buf ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 | grep "rows_read" | sed 's/[^0-9]*//g')
rows_read=$($CLICKHOUSE_CLIENT -q "SELECT a FROM buf ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 --optimize_read_in_order=1 --allow_prefetched_read_pool_for_remote_filesystem=0 --allow_prefetched_read_pool_for_local_filesystem=0 | grep "rows_read" | sed 's/[^0-9]*//g')
# Expected number of read rows with a bit margin
if [[ $rows_read -lt 500 ]]

View File

@ -5,3 +5,4 @@ connect_timeout_with_failover_secure_ms Milliseconds 3000
external_storage_connect_timeout_sec UInt64 10
max_untracked_memory UInt64 1048576
memory_profiler_step UInt64 1048576
filesystem_prefetch_max_memory_usage UInt64 1073741824

View File

@ -13,6 +13,6 @@ $CLICKHOUSE_CLIENT --xyzgarbage 2>&1 | grep -q "UNRECOGNIZED_ARGUMENTS" && echo
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' xyzgarbage 2>&1 | grep -q "BAD_ARGUMENTS" && echo 'OK' || echo 'FAIL'
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external -xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "UNRECOGNIZED_ARGUMENTS" && echo 'OK' || echo 'FAIL'
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external -xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "Bad arguments" && echo 'OK' || echo 'FAIL'
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "UNRECOGNIZED_ARGUMENTS" && echo 'OK' || echo 'FAIL'
cat /etc/passwd | sed 's/:/\t/g' | $CLICKHOUSE_CLIENT --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --xyzgarbage --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String' 2>&1 | grep -q "Bad arguments" && echo 'OK' || echo 'FAIL'

View File

@ -1,261 +1,335 @@
SET log_queries = 1;
SET allow_experimental_inverted_index = 1;
SET log_queries = 1;
-- create table for inverted(2)
DROP TABLE IF EXISTS simple1;
CREATE TABLE simple1(k UInt64,s String,INDEX af (s) TYPE inverted(2) GRANULARITY 1)
----------------------------------------------------
-- Test inverted(2)
DROP TABLE IF EXISTS tab;
CREATE TABLE tab(k UInt64, s String, INDEX af(s) TYPE inverted(2))
ENGINE = MergeTree() ORDER BY k
SETTINGS index_granularity = 2;
-- insert test data into table
INSERT INTO simple1 VALUES (101, 'Alick a01'), (102, 'Blick a02'), (103, 'Click a03'),(104, 'Dlick a04'),(105, 'Elick a05'),(106, 'Alick a06'),(107, 'Blick a07'),(108, 'Click a08'),(109, 'Dlick a09'),(110, 'Elick a10'),(111, 'Alick b01'),(112, 'Blick b02'),(113, 'Click b03'),(114, 'Dlick b04'),(115, 'Elick b05'),(116, 'Alick b06'),(117, 'Blick b07'),(118, 'Click b08'),(119, 'Dlick b09'),(120, 'Elick b10');
INSERT INTO tab VALUES (101, 'Alick a01'), (102, 'Blick a02'), (103, 'Click a03'), (104, 'Dlick a04'), (105, 'Elick a05'), (106, 'Alick a06'), (107, 'Blick a07'), (108, 'Click a08'), (109, 'Dlick a09'), (110, 'Elick a10'), (111, 'Alick b01'), (112, 'Blick b02'), (113, 'Click b03'), (114, 'Dlick b04'), (115, 'Elick b05'), (116, 'Alick b06'), (117, 'Blick b07'), (118, 'Click b08'), (119, 'Dlick b09'), (120, 'Elick b10');
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple1') limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table =='tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index with ==
SELECT * FROM simple1 WHERE s == 'Alick a01';
SYSTEM FLUSH LOGS;
SELECT * FROM tab WHERE s == 'Alick a01';
-- check the query only read 1 granules (2 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==2 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple1 WHERE s == \'Alick a01\';')
and type='QueryFinish'
and result_rows==1
limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s == \'Alick a01\';')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
-- search inverted index with LIKE
SELECT * FROM simple1 WHERE s LIKE '%01%' ORDER BY k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab WHERE s LIKE '%01%' ORDER BY k;
-- check the query only read 2 granules (4 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==4 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple1 WHERE s LIKE \'%01%\' ORDER BY k;')
and type='QueryFinish'
and result_rows==2
limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s LIKE \'%01%\' ORDER BY k;')
AND type='QueryFinish'
AND result_rows==2
LIMIT 1;
-- search inverted index with hasToken
SELECT * FROM simple1 WHERE hasToken(s, 'Click') ORDER BY k;
SYSTEM FLUSH LOGS;
-- check the query only read 4 granules (8 rows total; each granule has 2 rows)
SELECT read_rows==8 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple1 WHERE hasToken(s, \'Click\') ORDER BY k;')
and type='QueryFinish'
and result_rows==4 limit 1;
SELECT * FROM tab WHERE hasToken(s, 'Click') ORDER BY k;
-- create table for inverted()
DROP TABLE IF EXISTS simple2;
CREATE TABLE simple2(k UInt64,s String,INDEX af (s) TYPE inverted() GRANULARITY 1)
-- check the query only read 4 granules (8 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==8 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE hasToken(s, \'Click\') ORDER BY k;')
AND type='QueryFinish'
AND result_rows==4
LIMIT 1;
----------------------------------------------------
-- Test inverted()
DROP TABLE IF EXISTS tab_x;
CREATE TABLE tab_x(k UInt64, s String, INDEX af(s) TYPE inverted())
ENGINE = MergeTree() ORDER BY k
SETTINGS index_granularity = 2;
-- insert test data into table
INSERT INTO simple2 VALUES (101, 'Alick a01'), (102, 'Blick a02'), (103, 'Click a03'),(104, 'Dlick a04'),(105, 'Elick a05'),(106, 'Alick a06'),(107, 'Blick a07'),(108, 'Click a08'),(109, 'Dlick a09'),(110, 'Elick a10'),(111, 'Alick b01'),(112, 'Blick b02'),(113, 'Click b03'),(114, 'Dlick b04'),(115, 'Elick b05'),(116, 'Alick b06'),(117, 'Blick b07'),(118, 'Click b08'),(119, 'Dlick b09'),(120, 'Elick b10');
INSERT INTO tab_x VALUES (101, 'Alick a01'), (102, 'Blick a02'), (103, 'Click a03'), (104, 'Dlick a04'), (105, 'Elick a05'), (106, 'Alick a06'), (107, 'Blick a07'), (108, 'Click a08'), (109, 'Dlick a09'), (110, 'Elick a10'), (111, 'Alick b01'), (112, 'Blick b02'), (113, 'Click b03'), (114, 'Dlick b04'), (115, 'Elick b05'), (116, 'Alick b06'), (117, 'Blick b07'), (118, 'Click b08'), (119, 'Dlick b09'), (120, 'Elick b10');
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple2') limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab_x' AND database = currentDatabase() LIMIT 1;
-- search inverted index with hasToken
SELECT * FROM simple2 WHERE hasToken(s, 'Alick') order by k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab_x WHERE hasToken(s, 'Alick') ORDER BY k;
-- check the query only read 4 granules (8 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==8 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple2 WHERE hasToken(s, \'Alick\');')
and type='QueryFinish'
and result_rows==4 limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab_x WHERE hasToken(s, \'Alick\');')
AND type='QueryFinish'
AND result_rows==4
LIMIT 1;
-- search inverted index with IN operator
SELECT * FROM simple2 WHERE s IN ('Alick a01', 'Alick a06') ORDER BY k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab_x WHERE s IN ('Alick a01', 'Alick a06') ORDER BY k;
-- check the query only read 2 granules (4 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==4 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple2 WHERE s IN (\'Alick a01\', \'Alick a06\') ORDER BY k;')
and type='QueryFinish'
and result_rows==2 limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab_x WHERE s IN (\'Alick a01\', \'Alick a06\') ORDER BY k;')
AND type='QueryFinish'
AND result_rows==2
LIMIT 1;
-- search inverted index with multiSearch
SELECT * FROM simple2 WHERE multiSearchAny(s, ['a01', 'b01']) ORDER BY k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab_x WHERE multiSearchAny(s, ['a01', 'b01']) ORDER BY k;
-- check the query only read 2 granules (4 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==4 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple2 WHERE multiSearchAny(s, [\'a01\', \'b01\']) ORDER BY k;')
and type='QueryFinish'
and result_rows==2 limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab_x WHERE multiSearchAny(s, [\'a01\', \'b01\']) ORDER BY k;')
AND type='QueryFinish'
AND result_rows==2
LIMIT 1;
-- create table with an array column
DROP TABLE IF EXISTS simple_array;
create table simple_array (k UInt64, s Array(String), INDEX af (s) TYPE inverted(2) GRANULARITY 1)
----------------------------------------------------
-- Test on array columns
DROP TABLE IF EXISTS tab;
create table tab (k UInt64, s Array(String), INDEX af(s) TYPE inverted(2))
ENGINE = MergeTree() ORDER BY k
SETTINGS index_granularity = 2;
INSERT INTO simple_array SELECT rowNumberInBlock(), groupArray(s) FROM simple2 GROUP BY k%10;
INSERT INTO tab SELECT rowNumberInBlock(), groupArray(s) FROM tab_x GROUP BY k%10;
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple_array') limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index with has
SELECT * FROM simple_array WHERE has(s, 'Click a03') ORDER BY k;
SYSTEM FLUSH LOGS;
-- check the query must read all 10 granules (20 rows total; each granule has 2 rows)
SELECT read_rows==2 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple_array WHERE has(s, \'Click a03\') ORDER BY k;')
and type='QueryFinish'
and result_rows==1 limit 1;
SELECT * FROM tab WHERE has(s, 'Click a03') ORDER BY k;
-- create table with a map column
DROP TABLE IF EXISTS simple_map;
CREATE TABLE simple_map (k UInt64, s Map(String,String), INDEX af (mapKeys(s)) TYPE inverted(2) GRANULARITY 1)
-- check the query must read all 10 granules (20 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==2 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE has(s, \'Click a03\') ORDER BY k;')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
----------------------------------------------------
-- Test on map columns
DROP TABLE IF EXISTS tab;
CREATE TABLE tab (k UInt64, s Map(String,String), INDEX af(mapKeys(s)) TYPE inverted(2))
ENGINE = MergeTree() ORDER BY k
SETTINGS index_granularity = 2;
INSERT INTO simple_map VALUES (101, {'Alick':'Alick a01'}), (102, {'Blick':'Blick a02'}), (103, {'Click':'Click a03'}),(104, {'Dlick':'Dlick a04'}),(105, {'Elick':'Elick a05'}),(106, {'Alick':'Alick a06'}),(107, {'Blick':'Blick a07'}),(108, {'Click':'Click a08'}),(109, {'Dlick':'Dlick a09'}),(110, {'Elick':'Elick a10'}),(111, {'Alick':'Alick b01'}),(112, {'Blick':'Blick b02'}),(113, {'Click':'Click b03'}),(114, {'Dlick':'Dlick b04'}),(115, {'Elick':'Elick b05'}),(116, {'Alick':'Alick b06'}),(117, {'Blick':'Blick b07'}),(118, {'Click':'Click b08'}),(119, {'Dlick':'Dlick b09'}),(120, {'Elick':'Elick b10'});
INSERT INTO tab VALUES (101, {'Alick':'Alick a01'}), (102, {'Blick':'Blick a02'}), (103, {'Click':'Click a03'}), (104, {'Dlick':'Dlick a04'}), (105, {'Elick':'Elick a05'}), (106, {'Alick':'Alick a06'}), (107, {'Blick':'Blick a07'}), (108, {'Click':'Click a08'}), (109, {'Dlick':'Dlick a09'}), (110, {'Elick':'Elick a10'}), (111, {'Alick':'Alick b01'}), (112, {'Blick':'Blick b02'}), (113, {'Click':'Click b03'}), (114, {'Dlick':'Dlick b04'}), (115, {'Elick':'Elick b05'}), (116, {'Alick':'Alick b06'}), (117, {'Blick':'Blick b07'}), (118, {'Click':'Click b08'}), (119, {'Dlick':'Dlick b09'}), (120, {'Elick':'Elick b10'});
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple_map') limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index with mapContains
SELECT * FROM simple_map WHERE mapContains(s, 'Click') ORDER BY k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab WHERE mapContains(s, 'Click') ORDER BY k;
-- check the query must read all 4 granules (8 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==8 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple_map WHERE mapContains(s, \'Click\') ORDER BY k;')
and type='QueryFinish'
and result_rows==4 limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE mapContains(s, \'Click\') ORDER BY k;')
AND type='QueryFinish'
AND result_rows==4
LIMIT 1;
-- search inverted index with map key
SELECT * FROM simple_map WHERE s['Click'] = 'Click a03';
SYSTEM FLUSH LOGS;
-- check the query must read all 4 granules (8 rows total; each granule has 2 rows)
SELECT read_rows==8 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple_map WHERE s[\'Click\'] = \'Click a03\';')
and type='QueryFinish'
and result_rows==1 limit 1;
SELECT * FROM tab WHERE s['Click'] = 'Click a03';
-- create table for inverted(2) with two parts
DROP TABLE IF EXISTS simple3;
CREATE TABLE simple3(k UInt64,s String,INDEX af (s) TYPE inverted(2) GRANULARITY 1)
-- check the query must read all 4 granules (8 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==8 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s[\'Click\'] = \'Click a03\';')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
----------------------------------------------------
-- Test inverted(2) on a column with two parts
DROP TABLE IF EXISTS tab;
CREATE TABLE tab(k UInt64, s String, INDEX af(s) TYPE inverted(2))
ENGINE = MergeTree() ORDER BY k
SETTINGS index_granularity = 2;
-- insert test data into table
INSERT INTO simple3 VALUES (101, 'Alick a01'), (102, 'Blick a02'), (103, 'Click a03'),(104, 'Dlick a04'),(105, 'Elick a05'),(106, 'Alick a06'),(107, 'Blick a07'),(108, 'Click a08'),(109, 'Dlick a09'),(110, 'Elick b10'),(111, 'Alick b01'),(112, 'Blick b02'),(113, 'Click b03'),(114, 'Dlick b04'),(115, 'Elick b05'),(116, 'Alick b06'),(117, 'Blick b07'),(118, 'Click b08'),(119, 'Dlick b09'),(120, 'Elick b10');
INSERT INTO simple3 VALUES (201, 'rick c01'), (202, 'mick c02'),(203, 'nick c03');
INSERT INTO tab VALUES (101, 'Alick a01'), (102, 'Blick a02'), (103, 'Click a03'), (104, 'Dlick a04'), (105, 'Elick a05'), (106, 'Alick a06'), (107, 'Blick a07'), (108, 'Click a08'), (109, 'Dlick a09'), (110, 'Elick b10'), (111, 'Alick b01'), (112, 'Blick b02'), (113, 'Click b03'), (114, 'Dlick b04'), (115, 'Elick b05'), (116, 'Alick b06'), (117, 'Blick b07'), (118, 'Click b08'), (119, 'Dlick b09'), (120, 'Elick b10');
INSERT INTO tab VALUES (201, 'rick c01'), (202, 'mick c02'), (203, 'nick c03');
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple3') limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index
SELECT * FROM simple3 WHERE s LIKE '%01%' order by k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab WHERE s LIKE '%01%' ORDER BY k;
-- check the query only read 3 granules (6 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==6 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple3 WHERE s LIKE \'%01%\' order by k;')
and type='QueryFinish'
and result_rows==3 limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s LIKE \'%01%\' ORDER BY k;')
AND type='QueryFinish'
AND result_rows==3
LIMIT 1;
----------------------------------------------------
-- Test inverted(2) on UTF-8 data
DROP TABLE IF EXISTS tab;
CREATE TABLE tab(k UInt64, s String, INDEX af(s) TYPE inverted(2))
ENGINE = MergeTree()
ORDER BY k
SETTINGS index_granularity = 2;
INSERT INTO tab VALUES (101, 'Alick 好'), (102, 'clickhouse你好'), (103, 'Click 你'), (104, 'Dlick 你a好'), (105, 'Elick 好好你你'), (106, 'Alick 好a好a你a你');
-- create table for inverted(2) for utf8 string test
DROP TABLE IF EXISTS simple4;
CREATE TABLE simple4(k UInt64,s String,INDEX af (s) TYPE inverted(2) GRANULARITY 1) ENGINE = MergeTree() ORDER BY k
SETTINGS index_granularity = 2;
-- insert test data into table
INSERT INTO simple4 VALUES (101, 'Alick 好'),(102, 'clickhouse你好'), (103, 'Click 你'),(104, 'Dlick 你a好'),(105, 'Elick 好好你你'),(106, 'Alick 好a好a你a你');
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple4') limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index
SELECT * FROM simple4 WHERE s LIKE '%你好%' order by k;
SYSTEM FLUSH LOGS;
SELECT * FROM tab WHERE s LIKE '%你好%' ORDER BY k;
-- check the query only read 1 granule (2 rows total; each granule has 2 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==2 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT * FROM simple4 WHERE s LIKE \'%%\' order by k;')
and type='QueryFinish'
and result_rows==1 limit 1;
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT * FROM tab WHERE s LIKE \'%%\' ORDER BY k;')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
-- create table for max_digestion_size_per_segment test
DROP TABLE IF EXISTS simple5;
CREATE TABLE simple5(k UInt64,s String,INDEX af(s) TYPE inverted(0) GRANULARITY 1)
Engine=MergeTree
ORDER BY (k)
SETTINGS max_digestion_size_per_segment = 1024, index_granularity = 256
AS
SELECT
number,
format('{},{},{},{}', hex(12345678), hex(87654321), hex(number/17 + 5), hex(13579012)) as s
FROM numbers(10240);
----------------------------------------------------
-- Test max_digestion_size_per_segment
DROP TABLE IF EXISTS tab;
CREATE TABLE tab(k UInt64, s String, INDEX af(s) TYPE inverted(0))
Engine=MergeTree
ORDER BY (k)
SETTINGS max_digestion_size_per_segment = 1024, index_granularity = 256
AS
SELECT
number,
format('{},{},{},{}', hex(12345678), hex(87654321), hex(number/17 + 5), hex(13579012)) as s
FROM numbers(10240);
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple5') limit 1;
-- search inverted index
SELECT s FROM simple5 WHERE hasToken(s, '6969696969898240');
SYSTEM FLUSH LOGS;
-- check the query only read 1 granule (1 row total; each granule has 256 rows)
SELECT read_rows==256 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT s FROM simple5 WHERE hasToken(s, \'6969696969898240\');')
and type='QueryFinish'
and result_rows==1 limit 1;
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
DROP TABLE IF EXISTS simple6;
-- create inverted index with density==1
CREATE TABLE simple6(k UInt64,s String,INDEX af(s) TYPE inverted(0, 1.0) GRANULARITY 1)
-- search inverted index
SELECT s FROM tab WHERE hasToken(s, '6969696969898240');
-- check the query only read 1 granule (1 row total; each granule has 256 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==256 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT s FROM tab WHERE hasToken(s, \'6969696969898240\');')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
----------------------------------------------------
-- Test density==1
DROP TABLE IF EXISTS tab;
CREATE TABLE tab(k UInt64, s String, INDEX af(s) TYPE inverted(0, 1.0))
Engine=MergeTree
ORDER BY (k)
SETTINGS max_digestion_size_per_segment = 1, index_granularity = 512
AS
ORDER BY (k)
SETTINGS max_digestion_size_per_segment = 1, index_granularity = 512
AS
SELECT number, if(number%2, format('happy {}', hex(number)), format('birthday {}', hex(number)))
FROM numbers(1024);
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple6') limit 1;
-- search inverted index, no row has 'happy birthday'
SELECT count()==0 FROM simple6 WHERE s=='happy birthday';
SYSTEM FLUSH LOGS;
-- check the query only skip all granules (0 row total; each granule has 512 rows)
SELECT read_rows==0 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT count()==0 FROM simple6 WHERE s==\'happy birthday\';')
and type='QueryFinish'
and result_rows==1 limit 1;
DROP TABLE IF EXISTS simple7;
-- create inverted index with density==0.1
CREATE TABLE simple7(k UInt64,s String,INDEX af(s) TYPE inverted(0, 0.1) GRANULARITY 1)
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index, no row has 'happy birthday'
SELECT count() == 0 FROM tab WHERE s =='happy birthday';
-- check the query only skip all granules (0 row total; each granule has 512 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==0 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT count() == 0 FROM tab WHERE s ==\'happy birthday\';')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
----------------------------------------------------
-- Test density==0.1
DROP TABLE IF EXISTS tab;
CREATE TABLE tab(k UInt64, s String, INDEX af(s) TYPE inverted(0, 0.1))
Engine=MergeTree
ORDER BY (k)
SETTINGS max_digestion_size_per_segment = 1, index_granularity = 512
AS
ORDER BY (k)
SETTINGS max_digestion_size_per_segment = 1, index_granularity = 512
AS
SELECT number, if(number==1023, 'happy new year', if(number%2, format('happy {}', hex(number)), format('birthday {}', hex(number))))
FROM numbers(1024);
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices where (table =='simple7') limit 1;
-- search inverted index, no row has 'happy birthday'
SELECT count()==0 FROM simple7 WHERE s=='happy birthday';
SYSTEM FLUSH LOGS;
-- check the query does not skip any of the 2 granules(1024 rows total; each granule has 512 rows)
SELECT read_rows==1024 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT count()==0 FROM simple7 WHERE s==\'happy birthday\';')
and type='QueryFinish'
and result_rows==1 limit 1;
-- search inverted index, no row has 'happy new year'
SELECT count()==1 FROM simple7 WHERE s=='happy new year';
SYSTEM FLUSH LOGS;
-- check the query only read 1 granule because of density (1024 rows total; each granule has 512 rows)
SELECT read_rows==512 from system.query_log
where query_kind ='Select'
and current_database = currentDatabase()
and endsWith(trimRight(query), 'SELECT count()==1 FROM simple7 WHERE s==\'happy new year\';')
and type='QueryFinish'
and result_rows==1 limit 1;
-- check inverted index was created
SELECT name, type FROM system.data_skipping_indices WHERE table == 'tab' AND database = currentDatabase() LIMIT 1;
-- search inverted index, no row has 'happy birthday'
SELECT count() == 0 FROM tab WHERE s == 'happy birthday';
-- check the query does not skip any of the 2 granules(1024 rows total; each granule has 512 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==1024 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT count() == 0 FROM tab WHERE s == \'happy birthday\';')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;
-- search inverted index, no row has 'happy new year'
SELECT count() == 1 FROM tab WHERE s == 'happy new year';
-- check the query only read 1 granule because of density (1024 rows total; each granule has 512 rows)
SYSTEM FLUSH LOGS;
SELECT read_rows==512 from system.query_log
WHERE query_kind ='Select'
AND current_database = currentDatabase()
AND endsWith(trimRight(query), 'SELECT count() == 1 FROM tab WHERE s == \'happy new year\';')
AND type='QueryFinish'
AND result_rows==1
LIMIT 1;

View File

@ -0,0 +1,58 @@
#!/usr/bin/env bash
QUERIES_FILE=$1
DEBUG_MODE=$2
function random {
cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z' | fold -w ${1:-8} | head -n 1
}
while read line; do
query="$line"
echo "Query: $query"
function execute()
{
query_id=$(random)
echo "query id: $query_id"
result=($(clickhouse-client --time --query_id "$query_id" -q "$query" 2>&1))
query_result=${result[0]}
total_time=${result[1]}
clickhouse-client -q "system flush logs"
time_executing=$(clickhouse-client -q "select query_duration_ms / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
time_reading_from_prefetch=$(clickhouse-client -q "select ProfileEvents['AsynchronousRemoteReadWaitMicroseconds'] / 1000 / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
time_reading_without_prefetch=$(clickhouse-client -q "select ProfileEvents['SynchronousRemoteReadWaitMicroseconds'] / 1000 / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
function print()
{
echo " $1: $2"
}
print "time executing query" $time_executing
print "time reading data with prefetch" $time_reading_from_prefetch
print "time reading data without prefetch" $time_reading_without_prefetch
if (( $DEBUG_MODE == 1 )); then
remote_profiles=$(clickhouse-client -q "select mapExtractKeyLike(ProfileEvents, '%RemoteFS%') from system.query_log where type='QueryFinish' and query_id = '$query_id'")
threadpool_profiles=$(clickhouse-client -q "select mapExtractKeyLike(ProfileEvents, '%ThreadpoolReader%') from system.query_log where type='QueryFinish' and query_id = '$query_id'")
s3_profiles=$(clickhouse-client -q "select mapExtractKeyLike(ProfileEvents, '%S3%') from system.query_log where type='QueryFinish' and query_id = '$query_id'")
max_parallel_read_tasks=$(clickhouse-client -q "select AsyncReadCounters['max_parallel_read_tasks'] from system.query_log where type='QueryFinish' and query_id = '$query_id'")
max_parallel_prefetch_tasks=$(clickhouse-client -q "select AsyncReadCounters['max_parallel_prefetch_tasks'] from system.query_log where type='QueryFinish' and query_id = '$query_id'")
total_prefetch_tasks=$(clickhouse-client -q "select AsyncReadCounters['total_prefetch_tasks'] from system.query_log where type='QueryFinish' and query_id = '$query_id'")
init=$(clickhouse-client -q "select ProfileEvents['PrefetchedReadBufferInitMS'] / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
wait_prefetch_task=$(clickhouse-client -q "select ProfileEvents['WaitPrefetchTaskMicroseconds'] / 1000 / 1000 from system.query_log where type='QueryFinish' and query_id = '$query_id'")
print "max parallel read tasks" $max_parallel_read_tasks
print "max parallel prefetch tasks" $max_parallel_prefetch_tasks
print "total prefetch tasks" $total_prefetch_tasks
print "init tasks time" $init
print "wait prefetch task" $wait_prefetch_task
print "remote reader profile events" $remote_profiles
print "threadpool profile events" $threadpool_profiles
print "s3 profile events" $s3_profiles
fi
}
execute
done < $QUERIES_FILE