Merge pull request #43260 from ClickHouse/read-from-mt-in-io-pool

Read from MergeTree in I/O pool
This commit is contained in:
Nikolai Kochetov 2022-11-29 12:09:03 +01:00 committed by GitHub
commit 51439e2c19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 645 additions and 120 deletions

63
src/Common/EventFD.cpp Normal file
View File

@ -0,0 +1,63 @@
#if defined(OS_LINUX)
#include <Common/EventFD.h>
#include <Common/Exception.h>
#include <sys/eventfd.h>
#include <unistd.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PIPE;
extern const int CANNOT_READ_FROM_SOCKET;
extern const int CANNOT_WRITE_TO_SOCKET;
}
EventFD::EventFD()
{
fd = eventfd(0 /* initval */, 0 /* flags */);
if (fd == -1)
throwFromErrno("Cannot create eventfd", ErrorCodes::CANNOT_PIPE);
}
uint64_t EventFD::read() const
{
uint64_t buf = 0;
while (-1 == ::read(fd, &buf, sizeof(buf)))
{
if (errno == EAGAIN)
break;
if (errno != EINTR)
throwFromErrno("Cannot read from eventfd", ErrorCodes::CANNOT_READ_FROM_SOCKET);
}
return buf;
}
bool EventFD::write(uint64_t increase) const
{
while (-1 == ::write(fd, &increase, sizeof(increase)))
{
if (errno == EAGAIN)
return false;
if (errno != EINTR)
throwFromErrno("Cannot write to eventfd", ErrorCodes::CANNOT_WRITE_TO_SOCKET);
}
return true;
}
EventFD::~EventFD()
{
if (fd != -1)
close(fd);
}
}
#endif

38
src/Common/EventFD.h Normal file
View File

@ -0,0 +1,38 @@
#pragma once
#if defined(OS_LINUX)
#include <cstddef>
#include <cstdint>
namespace DB
{
struct EventFD
{
EventFD();
~EventFD();
/// Both read() and write() are blocking.
/// TODO: add non-blocking flag to ctor.
uint64_t read() const;
bool write(uint64_t increase = 1) const;
int fd = -1;
};
}
#else
namespace DB
{
struct EventFD
{
};
}
#endif

View File

@ -631,6 +631,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \
M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \
M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \
M(Bool, allow_asynchronous_read_from_io_pool_for_merge_tree, false, "Use background I/O pool to read from MergeTree tables. This setting may increase performance for I/O bound queries", 0) \
M(UInt64, max_streams_for_merge_tree_reading, 0, "If is not zero, limit the number of reading streams for MergeTree table.", 0) \
\
M(Bool, force_grouping_standard_compatibility, true, "Make GROUPING function to return 1 when argument is not used as an aggregation key", 0) \
\

View File

@ -31,6 +31,7 @@
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/logger_useful.h>
#include <base/sort.h>
@ -64,6 +65,8 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.checksum_on_read = settings.checksum_on_read,
.read_in_order = query_info.input_order_info != nullptr,
.apply_deleted_mask = context->applyDeletedMask(),
.use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree
&& (settings.max_streams_to_max_threads_ratio > 1 || settings.allow_asynchronous_read_from_io_pool_for_merge_tree),
};
}
@ -88,7 +91,7 @@ ReadFromMergeTree::ReadFromMergeTree(
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
: ISourceStep(DataStream{.header = IMergeTreeSelectAlgorithm::transformHeader(
storage_snapshot_->getSampleBlockForColumns(real_column_names_),
getPrewhereInfoFromQueryInfo(query_info_),
data_.getPartitionValueType(),
@ -124,6 +127,21 @@ ReadFromMergeTree::ReadFromMergeTree(
if (enable_parallel_reading)
read_task_callback = context->getMergeTreeReadTaskCallback();
const auto & settings = context->getSettingsRef();
if (settings.max_streams_for_merge_tree_reading)
{
if (settings.allow_asynchronous_read_from_io_pool_for_merge_tree)
{
/// When async reading is enabled, allow to read using more streams.
/// Will add resize to output_streams_limit to reduce memory usage.
output_streams_limit = std::min<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
requested_num_streams = std::max<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
}
else
/// Just limit requested_num_streams otherwise.
requested_num_streams = std::min<size_t>(requested_num_streams, settings.max_streams_for_merge_tree_reading);
}
/// Add explicit description.
setStepDescription(data.getStorageID().getFullNameNotQuoted());
@ -210,12 +228,14 @@ Pipe ReadFromMergeTree::readFromPool(
};
}
auto source = std::make_shared<MergeTreeThreadSelectProcessor>(
auto algorithm = std::make_unique<MergeTreeThreadSelectAlgorithm>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, storage_snapshot, use_uncompressed_cache,
prewhere_info, actions_settings, reader_settings, virt_column_names, std::move(extension));
auto source = std::make_shared<MergeTreeSource>(std::move(algorithm));
/// Set the approximate number of rows for the first source only
/// In case of parallel processing on replicas do not set approximate rows at all.
/// Because the value will be identical on every replicas and will be accounted
@ -223,13 +243,17 @@ Pipe ReadFromMergeTree::readFromPool(
if (i == 0 && !client_info.collaborate_with_initiator)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
}
return Pipe::unitePipes(std::move(pipes));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (output_streams_limit && output_streams_limit < pipe.numOutputPorts())
pipe.resize(output_streams_limit);
return pipe;
}
template<typename TSource>
template<typename Algorithm>
ProcessorPtr ReadFromMergeTree::createSource(
const RangesInDataPart & part,
const Names & required_columns,
@ -260,13 +284,15 @@ ProcessorPtr ReadFromMergeTree::createSource(
/// because we don't know actual amount of read rows in case when limit is set.
bool set_rows_approx = !extension.has_value() && !reader_settings.read_in_order;
auto source = std::make_shared<TSource>(
auto algorithm = std::make_unique<Algorithm>(
data, storage_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info,
actions_settings, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block, std::move(extension));
auto source = std::make_shared<MergeTreeSource>(std::move(algorithm));
if (set_rows_approx)
source -> addTotalRowsApprox(total_rows);
source->addTotalRowsApprox(total_rows);
return source;
}
@ -286,8 +312,8 @@ Pipe ReadFromMergeTree::readInOrder(
for (const auto & part : parts_with_range)
{
auto source = read_type == ReadType::InReverseOrder
? createSource<MergeTreeReverseSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block)
: createSource<MergeTreeInOrderSelectProcessor>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block);
? createSource<MergeTreeReverseSelectAlgorithm>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block)
: createSource<MergeTreeInOrderSelectAlgorithm>(part, required_columns, use_uncompressed_cache, has_limit_below_one_block);
pipes.emplace_back(std::move(source));
}
@ -1088,6 +1114,11 @@ void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
reader_settings.read_in_order = true;
/// In case or read-in-order, don't create too many reading streams.
/// Almost always we are reading from a single stream at a time because of merge sort.
if (output_streams_limit)
requested_num_streams = output_streams_limit;
/// update sort info for output stream
SortDescription sort_description;
const Names & sorting_key_columns = storage_snapshot->getMetadataForQuery()->getSortingKeyColumns();

View File

@ -184,7 +184,8 @@ private:
ContextPtr context;
const size_t max_block_size;
const size_t requested_num_streams;
size_t requested_num_streams;
size_t output_streams_limit = 0;
const size_t preferred_block_size_bytes;
const size_t preferred_max_column_in_block_size_bytes;
const bool sample_factor_column_queried;

View File

@ -25,8 +25,20 @@ namespace ErrorCodes
extern const int QUERY_WAS_CANCELLED;
}
static void injectNonConstVirtualColumns(
size_t rows,
Block & block,
const Names & virtual_columns);
MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
static void injectPartConstVirtualColumns(
size_t rows,
Block & block,
MergeTreeReadTask * task,
const DataTypePtr & partition_value_type,
const Names & virtual_columns);
IMergeTreeSelectAlgorithm::IMergeTreeSelectAlgorithm(
Block header,
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
@ -39,8 +51,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
bool use_uncompressed_cache_,
const Names & virt_column_names_,
std::optional<ParallelReadingExtension> extension_)
: ISource(transformHeader(std::move(header), prewhere_info_, storage_.getPartitionValueType(), virt_column_names_))
, storage(storage_)
: storage(storage_)
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings))
@ -53,30 +64,20 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, partition_value_type(storage.getPartitionValueType())
, extension(extension_)
{
header_without_virtual_columns = getPort().getHeader();
header_without_const_virtual_columns = applyPrewhereActions(std::move(header), prewhere_info);
size_t non_const_columns_offset = header_without_const_virtual_columns.columns();
injectNonConstVirtualColumns(0, header_without_const_virtual_columns, virt_column_names);
/// Reverse order is to minimize reallocations when removing columns from the block
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
{
if (*it == "_part_offset")
{
non_const_virtual_column_names.emplace_back(*it);
}
else if (*it == LightweightDeleteDescription::FILTER_COLUMN.name)
{
non_const_virtual_column_names.emplace_back(*it);
}
else
{
/// Remove virtual columns that are going to be filled with const values
if (header_without_virtual_columns.has(*it))
header_without_virtual_columns.erase(*it);
}
}
for (size_t col_num = non_const_columns_offset; col_num < header_without_const_virtual_columns.columns(); ++col_num)
non_const_virtual_column_names.emplace_back(header_without_const_virtual_columns.getByPosition(col_num).name);
result_header = header_without_const_virtual_columns;
injectPartConstVirtualColumns(0, result_header, nullptr, partition_value_type, virt_column_names);
}
std::unique_ptr<PrewhereExprInfo> MergeTreeBaseSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings)
std::unique_ptr<PrewhereExprInfo> IMergeTreeSelectAlgorithm::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings)
{
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
if (prewhere_info)
@ -111,7 +112,7 @@ std::unique_ptr<PrewhereExprInfo> MergeTreeBaseSelectProcessor::getPrewhereActio
}
bool MergeTreeBaseSelectProcessor::getNewTask()
bool IMergeTreeSelectAlgorithm::getNewTask()
{
/// No parallel reading feature
if (!extension.has_value())
@ -127,7 +128,7 @@ bool MergeTreeBaseSelectProcessor::getNewTask()
}
bool MergeTreeBaseSelectProcessor::getNewTaskParallelReading()
bool IMergeTreeSelectAlgorithm::getNewTaskParallelReading()
{
if (getTaskFromBuffer())
return true;
@ -152,7 +153,7 @@ bool MergeTreeBaseSelectProcessor::getNewTaskParallelReading()
}
bool MergeTreeBaseSelectProcessor::getTaskFromBuffer()
bool IMergeTreeSelectAlgorithm::getTaskFromBuffer()
{
while (!buffered_ranges.empty())
{
@ -174,7 +175,7 @@ bool MergeTreeBaseSelectProcessor::getTaskFromBuffer()
}
bool MergeTreeBaseSelectProcessor::getDelayedTasks()
bool IMergeTreeSelectAlgorithm::getDelayedTasks()
{
while (!delayed_tasks.empty())
{
@ -197,20 +198,23 @@ bool MergeTreeBaseSelectProcessor::getDelayedTasks()
}
Chunk MergeTreeBaseSelectProcessor::generate()
ChunkAndProgress IMergeTreeSelectAlgorithm::read()
{
while (!isCancelled())
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
while (!is_cancelled)
{
try
{
if ((!task || task->isFinished()) && !getNewTask())
return {};
break;
}
catch (const Exception & e)
{
/// See MergeTreeBaseSelectProcessor::getTaskFromBuffer()
if (e.code() == ErrorCodes::QUERY_WAS_CANCELLED)
return {};
break;
throw;
}
@ -220,24 +224,35 @@ Chunk MergeTreeBaseSelectProcessor::generate()
{
injectVirtualColumns(res.block, res.row_count, task.get(), partition_value_type, virt_column_names);
/// Reorder the columns according to output header
const auto & output_header = output.getHeader();
/// Reorder the columns according to result_header
Columns ordered_columns;
ordered_columns.reserve(output_header.columns());
for (size_t i = 0; i < output_header.columns(); ++i)
ordered_columns.reserve(result_header.columns());
for (size_t i = 0; i < result_header.columns(); ++i)
{
auto name = output_header.getByPosition(i).name;
auto name = result_header.getByPosition(i).name;
ordered_columns.push_back(res.block.getByName(name).column);
}
return Chunk(ordered_columns, res.row_count);
/// Account a progress from previous empty chunks.
res.num_read_rows += num_read_rows;
res.num_read_bytes += num_read_bytes;
return ChunkAndProgress{
.chunk = Chunk(ordered_columns, res.row_count),
.num_read_rows = res.num_read_rows,
.num_read_bytes = res.num_read_bytes};
}
else
{
num_read_rows += res.num_read_rows;
num_read_bytes += res.num_read_bytes;
}
}
return {};
return {Chunk(), num_read_rows, num_read_bytes};
}
void MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart(
void IMergeTreeSelectAlgorithm::initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns, const StorageMetadataPtr & metadata_snapshot,
const MarkRanges & mark_ranges, const IMergeTreeReader::ValueSizeMap & value_size_map,
@ -268,7 +283,7 @@ void MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart(
}
}
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
void IMergeTreeSelectAlgorithm::initializeRangeReaders(MergeTreeReadTask & current_task)
{
return initializeRangeReadersImpl(
current_task.range_reader, current_task.pre_range_readers, prewhere_info, prewhere_actions.get(),
@ -276,7 +291,7 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
pre_reader_for_step, lightweight_delete_filter_step, non_const_virtual_column_names);
}
void MergeTreeBaseSelectProcessor::initializeRangeReadersImpl(
void IMergeTreeSelectAlgorithm::initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader, std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info, const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader, bool has_lightweight_delete, const MergeTreeReaderSettings & reader_settings,
@ -368,7 +383,7 @@ static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 cur
}
MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::readFromPartImpl()
IMergeTreeSelectAlgorithm::BlockAndProgress IMergeTreeSelectAlgorithm::readFromPartImpl()
{
if (task->size_predictor)
task->size_predictor->startBlock();
@ -398,7 +413,8 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
progress(read_result.numReadRows(), read_result.numBytesRead());
size_t num_read_rows = read_result.numReadRows();
size_t num_read_bytes = read_result.numBytesRead();
if (task->size_predictor)
{
@ -408,16 +424,21 @@ MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::rea
task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows);
}
if (read_result.num_rows == 0)
return {};
Block block;
if (read_result.num_rows != 0)
block = sample_block.cloneWithColumns(read_result.columns);
BlockAndRowCount res = { sample_block.cloneWithColumns(read_result.columns), read_result.num_rows };
BlockAndProgress res = {
.block = std::move(block),
.row_count = read_result.num_rows,
.num_read_rows = num_read_rows,
.num_read_bytes = num_read_bytes };
return res;
}
MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeBaseSelectProcessor::readFromPart()
IMergeTreeSelectAlgorithm::BlockAndProgress IMergeTreeSelectAlgorithm::readFromPart()
{
if (!task->range_reader.isInitialized())
initializeRangeReaders(*task);
@ -474,9 +495,10 @@ namespace
/// Adds virtual columns that are not const for all rows
static void injectNonConstVirtualColumns(
size_t rows,
VirtualColumnsInserter & inserter,
Block & block,
const Names & virtual_columns)
{
VirtualColumnsInserter inserter(block);
for (const auto & virtual_column_name : virtual_columns)
{
if (virtual_column_name == "_part_offset")
@ -511,11 +533,12 @@ static void injectNonConstVirtualColumns(
/// Adds virtual columns that are const for the whole part
static void injectPartConstVirtualColumns(
size_t rows,
VirtualColumnsInserter & inserter,
Block & block,
MergeTreeReadTask * task,
const DataTypePtr & partition_value_type,
const Names & virtual_columns)
{
VirtualColumnsInserter inserter(block);
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
if (!virtual_columns.empty())
@ -584,19 +607,16 @@ static void injectPartConstVirtualColumns(
}
}
void MergeTreeBaseSelectProcessor::injectVirtualColumns(
void IMergeTreeSelectAlgorithm::injectVirtualColumns(
Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
VirtualColumnsInserter inserter{block};
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
/// Note that the order is important: virtual columns filled by the range reader must go first
injectNonConstVirtualColumns(row_count, inserter, virtual_columns);
injectPartConstVirtualColumns(row_count, inserter, task, partition_value_type, virtual_columns);
injectNonConstVirtualColumns(row_count, block, virtual_columns);
injectPartConstVirtualColumns(row_count, block, task, partition_value_type, virtual_columns);
}
Block MergeTreeBaseSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
Block IMergeTreeSelectAlgorithm::applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info)
{
if (prewhere_info)
{
@ -638,11 +658,18 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
}
}
injectVirtualColumns(block, 0, nullptr, partition_value_type, virtual_columns);
return block;
}
std::unique_ptr<MergeTreeBlockSizePredictor> MergeTreeBaseSelectProcessor::getSizePredictor(
Block IMergeTreeSelectAlgorithm::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
auto transformed = applyPrewhereActions(std::move(block), prewhere_info);
injectVirtualColumns(transformed, 0, nullptr, partition_value_type, virtual_columns);
return transformed;
}
std::unique_ptr<MergeTreeBlockSizePredictor> IMergeTreeSelectAlgorithm::getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block)
@ -660,7 +687,7 @@ std::unique_ptr<MergeTreeBlockSizePredictor> MergeTreeBaseSelectProcessor::getSi
}
MergeTreeBaseSelectProcessor::Status MergeTreeBaseSelectProcessor::performRequestToCoordinator(MarkRanges requested_ranges, bool delayed)
IMergeTreeSelectAlgorithm::Status IMergeTreeSelectAlgorithm::performRequestToCoordinator(MarkRanges requested_ranges, bool delayed)
{
String partition_id = task->data_part->info.partition_id;
String part_name;
@ -732,7 +759,7 @@ MergeTreeBaseSelectProcessor::Status MergeTreeBaseSelectProcessor::performReques
}
size_t MergeTreeBaseSelectProcessor::estimateMaxBatchSizeForHugeRanges()
size_t IMergeTreeSelectAlgorithm::estimateMaxBatchSizeForHugeRanges()
{
/// This is an empirical number and it is so,
/// because we have an adaptive granularity by default.
@ -768,7 +795,7 @@ size_t MergeTreeBaseSelectProcessor::estimateMaxBatchSizeForHugeRanges()
return max_size_for_one_request / sum_average_marks_size;
}
void MergeTreeBaseSelectProcessor::splitCurrentTaskRangesAndFillBuffer()
void IMergeTreeSelectAlgorithm::splitCurrentTaskRangesAndFillBuffer()
{
const size_t max_batch_size = estimateMaxBatchSizeForHugeRanges();
@ -824,6 +851,6 @@ void MergeTreeBaseSelectProcessor::splitCurrentTaskRangesAndFillBuffer()
buffered_ranges.pop_back();
}
MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default;
IMergeTreeSelectAlgorithm::~IMergeTreeSelectAlgorithm() = default;
}

View File

@ -1,12 +1,10 @@
#pragma once
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Processors/ISource.h>
#include <Processors/Chunk.h>
namespace DB
@ -17,6 +15,12 @@ class UncompressedCache;
class MarkCache;
struct PrewhereExprInfo;
struct ChunkAndProgress
{
Chunk chunk;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
struct ParallelReadingExtension
{
@ -29,11 +33,11 @@ struct ParallelReadingExtension
Names colums_to_read;
};
/// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor
class MergeTreeBaseSelectProcessor : public ISource
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm
class IMergeTreeSelectAlgorithm
{
public:
MergeTreeBaseSelectProcessor(
IMergeTreeSelectAlgorithm(
Block header,
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
@ -47,7 +51,7 @@ public:
const Names & virt_column_names_ = {},
std::optional<ParallelReadingExtension> extension_ = {});
~MergeTreeBaseSelectProcessor() override;
virtual ~IMergeTreeSelectAlgorithm();
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
@ -57,16 +61,26 @@ public:
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block);
Block getHeader() const { return result_header; }
ChunkAndProgress read();
void cancel() { is_cancelled = true; }
const MergeTreeReaderSettings & getSettings() const { return reader_settings; }
virtual std::string getName() const = 0;
protected:
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
struct BlockAndRowCount
struct BlockAndProgress
{
Block block;
size_t row_count = 0;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
Chunk generate() final;
/// Creates new this->task and return a flag whether it was successful or not
virtual bool getNewTaskImpl() = 0;
/// Creates new readers for a task it is needed. These methods are separate, because
@ -81,9 +95,9 @@ protected:
/// Closes readers and unlock part locks
virtual void finish() = 0;
virtual BlockAndRowCount readFromPart();
virtual BlockAndProgress readFromPart();
BlockAndRowCount readFromPartImpl();
BlockAndProgress readFromPartImpl();
/// Used for filling header with no rows as well as block with data
static void
@ -137,7 +151,9 @@ protected:
DataTypePtr partition_value_type;
/// This header is used for chunks from readFromPart().
Block header_without_virtual_columns;
Block header_without_const_virtual_columns;
/// A result of getHeader(). A chunk which this header is returned from read().
Block result_header;
std::shared_ptr<UncompressedCache> owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache;
@ -156,6 +172,8 @@ protected:
private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeBaseSelectProcessor");
std::atomic<bool> is_cancelled{false};
enum class Status
{
Accepted,
@ -194,6 +212,9 @@ private:
Status performRequestToCoordinator(MarkRanges requested_ranges, bool delayed);
void splitCurrentTaskRangesAndFillBuffer();
static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info);
};
using MergeTreeSelectAlgorithmPtr = std::unique_ptr<IMergeTreeSelectAlgorithm>;
}

View File

@ -27,6 +27,8 @@ struct MergeTreeReaderSettings
bool read_in_order = false;
/// Deleted mask is applied to all reads except internal select from mutate some part columns.
bool apply_deleted_mask = true;
/// Put reading task in a common I/O pool, return Async state on prepare()
bool use_asynchronous_read_from_pool = false;
};
struct MergeTreeWriterSettings

View File

@ -8,7 +8,7 @@ namespace ErrorCodes
extern const int MEMORY_LIMIT_EXCEEDED;
}
bool MergeTreeInOrderSelectProcessor::getNewTaskImpl()
bool MergeTreeInOrderSelectAlgorithm::getNewTaskImpl()
try
{
if (all_mark_ranges.empty())

View File

@ -8,12 +8,12 @@ namespace DB
/// Used to read data from single part with select query in order of primary key.
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeInOrderSelectProcessor final : public MergeTreeSelectProcessor
class MergeTreeInOrderSelectAlgorithm final : public MergeTreeSelectAlgorithm
{
public:
template <typename... Args>
explicit MergeTreeInOrderSelectProcessor(Args &&... args)
: MergeTreeSelectProcessor{std::forward<Args>(args)...}
explicit MergeTreeInOrderSelectAlgorithm(Args &&... args)
: MergeTreeSelectAlgorithm{std::forward<Args>(args)...}
{
LOG_TRACE(log, "Reading {} ranges in order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,

View File

@ -217,7 +217,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
column_names, virtual_column_names, prewhere_info, /*with_subcolumns=*/ true);
auto size_predictor = !predict_block_size_bytes ? nullptr
: MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, sample_block);
: IMergeTreeSelectAlgorithm::getSizePredictor(part.data_part, task_columns, sample_block);
auto & per_part = per_part_params.emplace_back();

View File

@ -8,7 +8,7 @@ namespace ErrorCodes
extern const int MEMORY_LIMIT_EXCEEDED;
}
bool MergeTreeReverseSelectProcessor::getNewTaskImpl()
bool MergeTreeReverseSelectAlgorithm::getNewTaskImpl()
try
{
if (chunks.empty() && all_mark_ranges.empty())
@ -44,9 +44,9 @@ catch (...)
throw;
}
MergeTreeBaseSelectProcessor::BlockAndRowCount MergeTreeReverseSelectProcessor::readFromPart()
MergeTreeReverseSelectAlgorithm::BlockAndProgress MergeTreeReverseSelectAlgorithm::readFromPart()
{
BlockAndRowCount res;
BlockAndProgress res;
if (!chunks.empty())
{

View File

@ -9,12 +9,12 @@ namespace DB
/// in reverse order of primary key.
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeReverseSelectProcessor final : public MergeTreeSelectProcessor
class MergeTreeReverseSelectAlgorithm final : public MergeTreeSelectAlgorithm
{
public:
template <typename... Args>
explicit MergeTreeReverseSelectProcessor(Args &&... args)
: MergeTreeSelectProcessor{std::forward<Args>(args)...}
explicit MergeTreeReverseSelectAlgorithm(Args &&... args)
: MergeTreeSelectAlgorithm{std::forward<Args>(args)...}
{
LOG_TRACE(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
@ -27,9 +27,9 @@ private:
bool getNewTaskImpl() override;
void finalizeNewTask() override {}
BlockAndRowCount readFromPart() override;
BlockAndProgress readFromPart() override;
std::vector<BlockAndRowCount> chunks;
std::vector<BlockAndProgress> chunks;
Poco::Logger * log = &Poco::Logger::get("MergeTreeReverseSelectProcessor");
};

View File

@ -8,7 +8,7 @@
namespace DB
{
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MergeTreeSelectAlgorithm::MergeTreeSelectAlgorithm(
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
@ -25,7 +25,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
size_t part_index_in_query_,
bool has_limit_below_one_block_,
std::optional<ParallelReadingExtension> extension_)
: MergeTreeBaseSelectProcessor{
: IMergeTreeSelectAlgorithm{
storage_snapshot_->getSampleBlockForColumns(required_columns_),
storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
@ -38,10 +38,10 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
has_limit_below_one_block(has_limit_below_one_block_),
total_rows(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
{
ordered_names = header_without_virtual_columns.getNames();
ordered_names = header_without_const_virtual_columns.getNames();
}
void MergeTreeSelectProcessor::initializeReaders()
void MergeTreeSelectAlgorithm::initializeReaders()
{
task_columns = getReadTaskColumns(
LoadedMergeTreeDataPartInfoForReader(data_part), storage_snapshot,
@ -61,7 +61,7 @@ void MergeTreeSelectProcessor::initializeReaders()
}
void MergeTreeSelectProcessor::finish()
void MergeTreeSelectAlgorithm::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
@ -72,6 +72,6 @@ void MergeTreeSelectProcessor::finish()
data_part.reset();
}
MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default;
MergeTreeSelectAlgorithm::~MergeTreeSelectAlgorithm() = default;
}

View File

@ -13,10 +13,10 @@ namespace DB
/// Used to read data from single part with select query
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeSelectProcessor : public MergeTreeBaseSelectProcessor
class MergeTreeSelectAlgorithm : public IMergeTreeSelectAlgorithm
{
public:
MergeTreeSelectProcessor(
MergeTreeSelectAlgorithm(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part,
@ -34,13 +34,13 @@ public:
bool has_limit_below_one_block_ = false,
std::optional<ParallelReadingExtension> extension_ = {});
~MergeTreeSelectProcessor() override;
~MergeTreeSelectAlgorithm() override;
protected:
/// Defer initialization from constructor, because it may be heavy
/// and it's better to do it lazily in `getNewTaskImpl`, which is executing in parallel.
void initializeReaders();
void finish() override final;
void finish() final;
/// Used by Task
Names required_columns;

View File

@ -0,0 +1,224 @@
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/IOThreadPool.h>
#include <Common/EventFD.h>
namespace DB
{
MergeTreeSource::MergeTreeSource(MergeTreeSelectAlgorithmPtr algorithm_)
: ISource(algorithm_->getHeader())
, algorithm(std::move(algorithm_))
{
#if defined(OS_LINUX)
if (algorithm->getSettings().use_asynchronous_read_from_pool)
async_reading_state = std::make_unique<AsyncReadingState>();
#endif
}
MergeTreeSource::~MergeTreeSource() = default;
std::string MergeTreeSource::getName() const
{
return algorithm->getName();
}
void MergeTreeSource::onCancel()
{
algorithm->cancel();
}
#if defined(OS_LINUX)
struct MergeTreeSource::AsyncReadingState
{
/// NotStarted -> InProgress -> IsFinished -> NotStarted ...
enum class Stage
{
NotStarted,
InProgress,
IsFinished,
};
struct Control
{
/// setResult and setException are the only methods
/// which can be called from background thread.
/// Invariant:
/// * background thread changes status InProgress -> IsFinished
/// * (status == InProgress) => (MergeTreeBaseSelectProcessor is alive)
void setResult(ChunkAndProgress chunk_)
{
chassert(stage == Stage::InProgress);
chunk = std::move(chunk_);
finish();
}
void setException(std::exception_ptr exception_)
{
chassert(stage == Stage::InProgress);
exception = exception_;
finish();
}
private:
/// Executor requires file descriptor (which can be polled) to be returned for async execution.
/// We are using EventFD here.
/// Thread from background pool writes to fd when task is finished.
/// Working thread should read from fd when task is finished or canceled to wait for bg thread.
EventFD event;
std::atomic<Stage> stage = Stage::NotStarted;
ChunkAndProgress chunk;
std::exception_ptr exception;
void finish()
{
stage = Stage::IsFinished;
event.write();
}
ChunkAndProgress getResult()
{
chassert(stage == Stage::IsFinished);
event.read();
stage = Stage::NotStarted;
if (exception)
std::rethrow_exception(exception);
return std::move(chunk);
}
friend struct AsyncReadingState;
};
std::shared_ptr<Control> start()
{
chassert(control->stage == Stage::NotStarted);
control->stage = Stage::InProgress;
return control;
}
void schedule(ThreadPool::Job job)
{
callback_runner(std::move(job), 0);
}
ChunkAndProgress getResult()
{
return control->getResult();
}
Stage getStage() const { return control->stage; }
int getFD() const { return control->event.fd; }
AsyncReadingState()
{
control = std::make_shared<Control>();
callback_runner = threadPoolCallbackRunner<void>(IOThreadPool::get(), "MergeTreeRead");
}
~AsyncReadingState()
{
/// Here we wait for async task if needed.
/// ~AsyncReadingState and Control::finish can be run concurrently.
/// It's important to store std::shared_ptr<Control> into bg pool task.
/// Otherwise following is possible:
///
/// (executing thread) (bg pool thread)
/// Control::finish()
/// stage = Stage::IsFinished;
/// ~MergeTreeBaseSelectProcessor()
/// ~AsyncReadingState()
/// control->stage != Stage::InProgress
/// ~EventFD()
/// event.write()
if (control->stage == Stage::InProgress)
control->event.read();
}
private:
ThreadPoolCallbackRunner<void> callback_runner;
std::shared_ptr<Control> control;
};
#endif
ISource::Status MergeTreeSource::prepare()
{
#if defined(OS_LINUX)
if (!async_reading_state)
return ISource::prepare();
/// Check if query was cancelled before returning Async status. Otherwise it may lead to infinite loop.
if (isCancelled())
{
getPort().finish();
return ISource::Status::Finished;
}
if (async_reading_state && async_reading_state->getStage() == AsyncReadingState::Stage::InProgress)
return ISource::Status::Async;
#endif
return ISource::prepare();
}
std::optional<Chunk> MergeTreeSource::reportProgress(ChunkAndProgress chunk)
{
if (chunk.num_read_rows || chunk.num_read_bytes)
progress(chunk.num_read_rows, chunk.num_read_bytes);
if (chunk.chunk.hasRows())
return std::move(chunk.chunk);
return {};
}
std::optional<Chunk> MergeTreeSource::tryGenerate()
{
#if defined(OS_LINUX)
if (async_reading_state)
{
if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished)
return reportProgress(async_reading_state->getResult());
chassert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted);
/// It is important to store control into job.
/// Otherwise, race between job and ~MergeTreeBaseSelectProcessor is possible.
auto job = [this, control = async_reading_state->start()]() mutable
{
auto holder = std::move(control);
try
{
holder->setResult(algorithm->read());
}
catch (...)
{
holder->setException(std::current_exception());
}
};
async_reading_state->schedule(std::move(job));
return Chunk();
}
#endif
return reportProgress(algorithm->read());
}
#if defined(OS_LINUX)
int MergeTreeSource::schedule()
{
return async_reading_state->getFD();
}
#endif
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/ISource.h>
namespace DB
{
class IMergeTreeSelectAlgorithm;
using MergeTreeSelectAlgorithmPtr = std::unique_ptr<IMergeTreeSelectAlgorithm>;
struct ChunkAndProgress;
class MergeTreeSource final : public ISource
{
public:
explicit MergeTreeSource(MergeTreeSelectAlgorithmPtr algorithm_);
~MergeTreeSource() override;
std::string getName() const override;
Status prepare() override;
#if defined(OS_LINUX)
int schedule() override;
#endif
protected:
std::optional<Chunk> tryGenerate() override;
void onCancel() override;
private:
MergeTreeSelectAlgorithmPtr algorithm;
#if defined(OS_LINUX)
struct AsyncReadingState;
std::unique_ptr<AsyncReadingState> async_reading_state;
#endif
std::optional<Chunk> reportProgress(ChunkAndProgress chunk);
};
}

View File

@ -12,7 +12,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
MergeTreeThreadSelectProcessor::MergeTreeThreadSelectProcessor(
MergeTreeThreadSelectAlgorithm::MergeTreeThreadSelectAlgorithm(
size_t thread_,
const MergeTreeReadPoolPtr & pool_,
size_t min_marks_to_read_,
@ -28,7 +28,7 @@ MergeTreeThreadSelectProcessor::MergeTreeThreadSelectProcessor(
const Names & virt_column_names_,
std::optional<ParallelReadingExtension> extension_)
:
MergeTreeBaseSelectProcessor{
IMergeTreeSelectAlgorithm{
pool_->getHeader(), storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_, extension_},
@ -86,18 +86,18 @@ MergeTreeThreadSelectProcessor::MergeTreeThreadSelectProcessor(
}
ordered_names = getPort().getHeader().getNames();
ordered_names = getHeader().getNames();
}
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadSelectProcessor::getNewTaskImpl()
bool MergeTreeThreadSelectAlgorithm::getNewTaskImpl()
{
task = pool->getTask(min_marks_to_read, thread, ordered_names);
return static_cast<bool>(task);
}
void MergeTreeThreadSelectProcessor::finalizeNewTask()
void MergeTreeThreadSelectAlgorithm::finalizeNewTask()
{
const std::string part_name = task->data_part->isProjectionPart() ? task->data_part->getParentPart()->name : task->data_part->name;
@ -129,13 +129,13 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
}
void MergeTreeThreadSelectProcessor::finish()
void MergeTreeThreadSelectAlgorithm::finish()
{
reader.reset();
pre_reader_for_step.clear();
}
MergeTreeThreadSelectProcessor::~MergeTreeThreadSelectProcessor() = default;
MergeTreeThreadSelectAlgorithm::~MergeTreeThreadSelectAlgorithm() = default;
}

View File

@ -11,10 +11,10 @@ class MergeTreeReadPool;
/** Used in conjunction with MergeTreeReadPool, asking it for more work to do and performing whatever reads it is asked
* to perform.
*/
class MergeTreeThreadSelectProcessor final : public MergeTreeBaseSelectProcessor
class MergeTreeThreadSelectAlgorithm final : public IMergeTreeSelectAlgorithm
{
public:
MergeTreeThreadSelectProcessor(
MergeTreeThreadSelectAlgorithm(
size_t thread_,
const std::shared_ptr<MergeTreeReadPool> & pool_,
size_t min_marks_to_read_,
@ -32,7 +32,7 @@ public:
String getName() const override { return "MergeTreeThread"; }
~MergeTreeThreadSelectProcessor() override;
~MergeTreeThreadSelectAlgorithm() override;
protected:
/// Requests read task from MergeTreeReadPool and signals whether it got one

View File

@ -11,4 +11,8 @@
)
SETTINGS max_threads = 2, max_distributed_connections = 2
</query>
<!--<query>select sum(length(URL)) from hits_100m_single settings max_threads=8, max_streams_to_max_threads_ratio=2, allow_asynchronous_read_from_io_pool_for_merge_tree=0</query>-->
<query>select sum(length(URL)) from hits_10m_single settings max_threads=2, max_streams_to_max_threads_ratio=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1</query>
<query>select sum(length(URL)) from hits_10m_single settings max_threads=2, max_streams_for_merge_tree_reading=32, allow_asynchronous_read_from_io_pool_for_merge_tree=1</query>
</test>

View File

@ -1,5 +1,6 @@
DROP TABLE IF EXISTS data_01283;
set allow_asynchronous_read_from_io_pool_for_merge_tree = 0;
set remote_filesystem_read_method = 'read';
set local_filesystem_read_method = 'pread';
set load_marks_asynchronously = 0;

View File

@ -3,6 +3,7 @@ drop table if exists table_01323_many_parts;
set remote_filesystem_read_method = 'read';
set local_filesystem_read_method = 'pread';
set load_marks_asynchronously = 0;
set allow_asynchronous_read_from_io_pool_for_merge_tree = 0;
create table table_01323_many_parts (x UInt64) engine = MergeTree order by x partition by x % 100;
set max_partitions_per_insert_block = 100;

View File

@ -1,5 +1,6 @@
DROP TABLE IF EXISTS select_final;
SET allow_asynchronous_read_from_io_pool_for_merge_tree = 0;
SET do_not_merge_across_partitions_select_final = 1;
SET max_threads = 16;

View File

@ -19,16 +19,16 @@ $CLICKHOUSE_CLIENT -q "create table ${name}_n_x engine=MergeTree order by (n, x)
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n final"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n_x final"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_order=0, optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --allow_asynchronous_read_from_io_pool_for_merge_tree=0 -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_order=0, optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=0, optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --allow_asynchronous_read_from_io_pool_for_merge_tree=0 -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=0, optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=0, optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --allow_asynchronous_read_from_io_pool_for_merge_tree=0 -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=0, optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n+x%2 ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --allow_asynchronous_read_from_io_pool_for_merge_tree=0 -q "select n, sum(x) OVER (PARTITION BY n+x%2 ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "drop table ${name}"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n"

View File

@ -0,0 +1,41 @@
-- { echo }
-- The number of output streams is limited by max_streams_for_merge_tree_reading
select sum(x) from t settings max_threads=32, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0;
49999995000000
select * from (explain pipeline select sum(x) from t settings max_threads=32, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0) where explain like '%Resize%' or explain like '%MergeTreeThread%';
Resize 16 → 32
StrictResize 16 → 16
MergeTreeThread × 16 0 → 1
-- Without asynchronous_read, max_streams_for_merge_tree_reading limits max_streams * max_streams_to_max_threads_ratio
select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0, max_streams_to_max_threads_ratio=8;
49999995000000
select * from (explain pipeline select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0, max_streams_to_max_threads_ratio=8) where explain like '%Resize%' or explain like '%MergeTreeThread%';
Resize 16 → 4
StrictResize 16 → 16
MergeTreeThread × 16 0 → 1
-- With asynchronous_read, read in max_streams_for_merge_tree_reading async streams and resize to max_threads
select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1;
49999995000000
select * from (explain pipeline select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1) where explain like '%Resize%' or explain like '%MergeTreeThread%';
Resize 4 → 4
StrictResize 4 → 4
Resize 16 → 4
MergeTreeThread × 16 0 → 1
-- With asynchronous_read, read using max_streams * max_streams_to_max_threads_ratio async streams, resize to max_streams_for_merge_tree_reading outp[ut streams, resize to max_threads after aggregation
select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8;
49999995000000
select * from (explain pipeline select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8) where explain like '%Resize%' or explain like '%MergeTreeThread%';
Resize 16 → 4
StrictResize 16 → 16
Resize 32 → 16
MergeTreeThread × 32 0 → 1
-- For read-in-order, disable everything
select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, optimize_read_in_order=1, query_plan_read_in_order=1;
49999995000000
select * from (explain pipeline select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, optimize_read_in_order=1, query_plan_read_in_order=1) where explain like '%Resize%';
Resize 1 → 4
select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8, optimize_read_in_order=1, query_plan_read_in_order=1;
49999995000000
select * from (explain pipeline select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8, optimize_read_in_order=1, query_plan_read_in_order=1) where explain like '%Resize%';
Resize 1 → 4

View File

@ -0,0 +1,26 @@
create table t (x UInt64) engine = MergeTree order by x;
insert into t select number from numbers_mt(10000000) settings max_insert_threads=8;
-- { echo }
-- The number of output streams is limited by max_streams_for_merge_tree_reading
select sum(x) from t settings max_threads=32, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0;
select * from (explain pipeline select sum(x) from t settings max_threads=32, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0) where explain like '%Resize%' or explain like '%MergeTreeThread%';
-- Without asynchronous_read, max_streams_for_merge_tree_reading limits max_streams * max_streams_to_max_threads_ratio
select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0, max_streams_to_max_threads_ratio=8;
select * from (explain pipeline select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=0, max_streams_to_max_threads_ratio=8) where explain like '%Resize%' or explain like '%MergeTreeThread%';
-- With asynchronous_read, read in max_streams_for_merge_tree_reading async streams and resize to max_threads
select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1;
select * from (explain pipeline select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1) where explain like '%Resize%' or explain like '%MergeTreeThread%';
-- With asynchronous_read, read using max_streams * max_streams_to_max_threads_ratio async streams, resize to max_streams_for_merge_tree_reading outp[ut streams, resize to max_threads after aggregation
select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8;
select * from (explain pipeline select sum(x) from t settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8) where explain like '%Resize%' or explain like '%MergeTreeThread%';
-- For read-in-order, disable everything
select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, optimize_read_in_order=1, query_plan_read_in_order=1;
select * from (explain pipeline select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, optimize_read_in_order=1, query_plan_read_in_order=1) where explain like '%Resize%';
select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8, optimize_read_in_order=1, query_plan_read_in_order=1;
select * from (explain pipeline select sum(x) from (select x from t order by x) settings max_threads=4, max_streams_for_merge_tree_reading=16, allow_asynchronous_read_from_io_pool_for_merge_tree=1, max_streams_to_max_threads_ratio=8, optimize_read_in_order=1, query_plan_read_in_order=1) where explain like '%Resize%';