Merge pull request #3693 from yandex/simplier_merge_read_logic

Simplify merge stream read logic
This commit is contained in:
alexey-milovidov 2018-12-02 20:59:43 +03:00 committed by GitHub
commit af110b8202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 288 additions and 78 deletions

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
@ -19,7 +19,7 @@ namespace ErrorCodes
}
MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
const MergeTreeData & storage,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
@ -46,7 +46,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
}
Block MergeTreeBaseBlockInputStream::readImpl()
Block MergeTreeBaseSelectBlockInputStream::readImpl()
{
Block res;
@ -68,7 +68,7 @@ Block MergeTreeBaseBlockInputStream::readImpl()
}
Block MergeTreeBaseBlockInputStream::readFromPart()
Block MergeTreeBaseSelectBlockInputStream::readFromPart()
{
if (task->size_predictor)
task->size_predictor->startBlock();
@ -181,14 +181,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
}
void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block) const
void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) const
{
const auto rows = block.rows();
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
if (!virt_column_names.empty())
{
const auto rows = block.rows();
for (const auto & virt_column_name : virt_column_names)
{
if (virt_column_name == "_part")
@ -226,7 +226,7 @@ void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block) const
}
void MergeTreeBaseBlockInputStream::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
void MergeTreeBaseSelectBlockInputStream::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
{
if (prewhere_info)
{
@ -243,6 +243,6 @@ void MergeTreeBaseBlockInputStream::executePrewhereActions(Block & block, const
}
MergeTreeBaseBlockInputStream::~MergeTreeBaseBlockInputStream() = default;
MergeTreeBaseSelectBlockInputStream::~MergeTreeBaseSelectBlockInputStream() = default;
}

View File

@ -13,11 +13,11 @@ class UncompressedCache;
class MarkCache;
/// Base class for MergeTreeThreadBlockInputStream and MergeTreeBlockInputStream
class MergeTreeBaseBlockInputStream : public IProfilingBlockInputStream
/// Base class for MergeTreeThreadSelectBlockInputStream and MergeTreeSelectBlockInputStream
class MergeTreeBaseSelectBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeTreeBaseBlockInputStream(
MergeTreeBaseSelectBlockInputStream(
const MergeTreeData & storage,
const PrewhereInfoPtr & prewhere_info,
UInt64 max_block_size_rows,
@ -29,7 +29,7 @@ public:
bool save_marks_in_cache = true,
const Names & virt_column_names = {});
~MergeTreeBaseBlockInputStream() override;
~MergeTreeBaseSelectBlockInputStream() override;
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);

View File

@ -25,13 +25,13 @@ using MergeTreeBlockSizePredictorPtr = std::unique_ptr<MergeTreeBlockSizePredict
NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns);
/// A batch of work for MergeTreeThreadBlockInputStream
/// A batch of work for MergeTreeThreadSelectBlockInputStream
struct MergeTreeReadTask
{
/// data part which should be read while performing this task
MergeTreeData::DataPartPtr data_part;
/** Ranges to read from `data_part`.
* Specified in reverse order for MergeTreeThreadBlockInputStream's convenience of calling .pop_back(). */
* Specified in reverse order for MergeTreeThreadSelectBlockInputStream's convenience of calling .pop_back(). */
MarkRanges mark_ranges;
/// for virtual `part_index` virtual column
size_t part_index_in_query;

View File

@ -1,7 +1,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/StorageMergeTree.h>
@ -1223,10 +1223,8 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
/// Apply the expression and write the result to temporary files.
if (expression)
{
MarkRanges ranges{MarkRange(0, part->marks_count)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
*this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
auto compression_settings = this->context.chooseCompressionSettings(
part->bytes_on_disk,

View File

@ -1,5 +1,5 @@
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
@ -606,11 +606,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
BlockInputStreams src_streams;
UInt64 watch_prev_elapsed = 0;
/// Note: this is dirty hack. MergeTreeBlockInputStream expects minimal amount of bytes after which it will
/// use DIRECT_IO for every peace of data it reads.
/// When we send `min_bytes_when_use_direct_io = 1 (byte)`, it will use O_DIRECT in any case
/// because stream can't read less then single byte
size_t min_bytes_when_use_direct_io = 0;
/// We count total amount of bytes in parts
/// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
bool read_with_direct_io = false;
if (data.settings.min_merge_bytes_to_use_direct_io != 0)
{
size_t total_size = 0;
@ -620,7 +618,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (total_size >= data.settings.min_merge_bytes_to_use_direct_io)
{
LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT");
min_bytes_when_use_direct_io = 1;
read_with_direct_io = true;
break;
}
}
@ -629,9 +628,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)),
false, nullptr, true, min_bytes_when_use_direct_io, DBMS_DEFAULT_BUFFER_SIZE, false);
auto input = std::make_unique<MergeTreeSequentialBlockInputStream>(
data, part, merging_column_names, read_with_direct_io, true);
input->setProgressCallback(MergeProgressCallback(
merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg));
@ -703,7 +701,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
MergedBlockOutputStream to{
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, min_bytes_when_use_direct_io};
data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, data.settings.min_merge_bytes_to_use_direct_io};
merged_stream->readPrefix();
to.writePrefix();
@ -776,9 +774,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, true, min_bytes_when_use_direct_io, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
auto column_part_stream = std::make_shared<MergeTreeSequentialBlockInputStream>(
data, parts[part_num], column_names, read_with_direct_io, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed));

View File

@ -3,9 +3,9 @@
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -654,7 +654,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
for (size_t i = 0; i < num_streams; ++i)
{
res.emplace_back(std::make_shared<MergeTreeThreadBlockInputStream>(
res.emplace_back(std::make_shared<MergeTreeThreadSelectBlockInputStream>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
prewhere_info, settings, virt_columns));
@ -730,7 +730,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
parts.emplace_back(part);
}
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, prewhere_info, true, settings.min_bytes_to_use_direct_io,
@ -775,7 +775,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
{
RangesInDataPart & part = parts[part_index];
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <ext/range.h>
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
namespace ProfileEvents
@ -114,7 +114,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
need_marks -= marks_to_get_from_range;
}
/** Change order to right-to-left, for MergeTreeThreadBlockInputStream to get ranges with .pop_back()
/** Change order to right-to-left, for MergeTreeThreadSelectBlockInputStream to get ranges with .pop_back()
* (order was changed to left-to-right due to .pop_back() above).
*/
std::reverse(std::begin(ranges_to_get_from_part), std::end(ranges_to_get_from_part));

View File

@ -12,7 +12,7 @@ namespace DB
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
/** Provides read tasks for MergeTreeThreadBlockInputStream`s in fine-grained batches, allowing for more
/** Provides read tasks for MergeTreeThreadSelectBlockInputStream`s in fine-grained batches, allowing for more
* uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads`
* workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads
* in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted

View File

@ -53,6 +53,10 @@ public:
const NamesAndTypesList & getColumns() const { return columns; }
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
private:
class Stream
{
@ -125,9 +129,6 @@ private:
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool read_offsets = true);
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
friend class MergeTreeRangeReader::DelayedStream;
};

View File

@ -1,5 +1,5 @@
#include <Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Core/Defines.h>
@ -9,13 +9,11 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int NOT_IMPLEMENTED;
}
MergeTreeBlockInputStream::MergeTreeBlockInputStream(
MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & owned_data_part_,
size_t max_block_size_rows_,
@ -33,7 +31,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseBlockInputStream{storage_, prewhere_info, max_block_size_rows_,
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names},
required_columns{column_names},
@ -84,13 +82,13 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
}
Block MergeTreeBlockInputStream::getHeader() const
Block MergeTreeSelectBlockInputStream::getHeader() const
{
return header;
}
bool MergeTreeBlockInputStream::getNewTask()
bool MergeTreeSelectBlockInputStream::getNewTask()
try
{
/// Produce no more than one task
@ -196,7 +194,7 @@ catch (...)
}
void MergeTreeBlockInputStream::finish()
void MergeTreeSelectBlockInputStream::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
@ -209,7 +207,7 @@ void MergeTreeBlockInputStream::finish()
}
MergeTreeBlockInputStream::~MergeTreeBlockInputStream() = default;
MergeTreeSelectBlockInputStream::~MergeTreeSelectBlockInputStream() = default;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeThreadBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
@ -10,13 +10,13 @@ namespace DB
{
/// Used to read data from single part.
/// To read data from multiple parts, a Storage creates multiple such objects.
/// TODO: Make special lightweight version of the reader for merges and other utilites, remove this from SelectExecutor.
class MergeTreeBlockInputStream : public MergeTreeBaseBlockInputStream
/// Used to read data from single part with select query
/// Cares about PREWHERE, virtual columns, indexes etc.
/// To read data from multiple parts, Storage (MergeTree) creates multiple such objects.
class MergeTreeSelectBlockInputStream : public MergeTreeBaseSelectBlockInputStream
{
public:
MergeTreeBlockInputStream(
MergeTreeSelectBlockInputStream(
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & owned_data_part,
size_t max_block_size_rows,
@ -34,7 +34,7 @@ public:
size_t part_index_in_query = 0,
bool quiet = false);
~MergeTreeBlockInputStream() override;
~MergeTreeSelectBlockInputStream() override;
String getName() const override { return "MergeTree"; }
@ -74,7 +74,7 @@ private:
String path;
bool is_first_task = true;
Logger * log = &Logger::get("MergeTreeBlockInputStream");
Logger * log = &Logger::get("MergeTreeSelectBlockInputStream");
};
}

View File

@ -0,0 +1,137 @@
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet)
: storage(storage_)
, data_part(data_part_)
, part_columns_lock(data_part->columns_lock)
, columns_to_read(columns_to_read_)
, read_with_direct_io(read_with_direct_io_)
, mark_cache(storage.context.getMarkCache())
{
if (!quiet)
LOG_TRACE(log, "Reading " << data_part->marks_count << " marks from part " << data_part->name
<< ", totaly " << data_part->rows_count
<< " rows starting from the beginning of the part");
addTotalRowsApprox(data_part->rows_count);
header = storage.getSampleBlockForColumns(columns_to_read);
fixHeader(header);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)
{
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
columns_for_reader = physical_columns.addTypes(columns_to_read);
}
else
{
/// take columns from data_part
columns_for_reader = data_part->columns.addTypes(columns_to_read);
}
reader = std::make_unique<MergeTreeReader>(
data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr,
mark_cache.get(), /* save_marks_in_cache = */ false, storage,
MarkRanges{MarkRange(0, data_part->marks_count)},
/* bytes to use AIO (this is hack) */
read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
DBMS_DEFAULT_BUFFER_SIZE);
}
void MergeTreeSequentialBlockInputStream::fixHeader(Block & header_block) const
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
for (const auto & name_type : data_part->columns)
{
if (header_block.has(name_type.name))
{
auto & elem = header_block.getByName(name_type.name);
if (!elem.type->equals(*name_type.type))
{
elem.type = name_type.type;
elem.column = elem.type->createColumn();
}
}
}
}
Block MergeTreeSequentialBlockInputStream::getHeader() const
{
return header;
}
Block MergeTreeSequentialBlockInputStream::readImpl()
try
{
Block res;
if (!isCancelled() && current_row < data_part->rows_count)
{
bool continue_reading = (current_mark != 0);
size_t rows_readed = reader->readRows(current_mark, continue_reading, storage.index_granularity, res);
if (res)
{
res.checkNumberOfRows();
current_row += rows_readed;
current_mark += (rows_readed / storage.index_granularity);
bool should_reorder = false, should_evaluate_missing_defaults = false;
reader->fillMissingColumns(res, should_reorder, should_evaluate_missing_defaults, res.rows());
if (should_evaluate_missing_defaults)
reader->evaluateMissingDefaults(res);
if (should_reorder)
reader->reorderColumns(res, header.getNames(), nullptr);
}
}
else
{
finish();
}
return res;
}
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
void MergeTreeSequentialBlockInputStream::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
part_columns_lock.unlock();
data_part.reset();
}
MergeTreeSequentialBlockInputStream::~MergeTreeSequentialBlockInputStream() = default;
}

View File

@ -0,0 +1,75 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MarkRange.h>
#include <memory>
namespace DB
{
/// Lightweight (in terms of logic) stream for reading single part from MergeTree
class MergeTreeSequentialBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeTreeSequentialBlockInputStream(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet = false
);
~MergeTreeSequentialBlockInputStream() override;
String getName() const override { return "MergeTreeSequentialBlockInputStream"; }
Block getHeader() const override;
/// Closes readers and unlock part locks
void finish();
size_t getCurrentMark() const { return current_mark; }
size_t getCurrentRow() const { return current_row; }
protected:
Block readImpl() override;
private:
const MergeTreeData & storage;
Block header;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Forbids to change columns list of the part during reading
std::shared_lock<std::shared_mutex> part_columns_lock;
/// Columns we have to read (each Block from read will contain them)
Names columns_to_read;
/// Should read using direct IO
bool read_with_direct_io;
Logger * log = &Logger::get("MergeTreeSequentialBlockInputStream");
std::shared_ptr<MarkCache> mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<MergeTreeReader>;
MergeTreeReaderPtr reader;
/// current mark at which we stop reading
size_t current_mark = 0;
/// current row at which we stop reading
size_t current_row = 0;
private:
void fixHeader(Block & header_block) const;
};
}

View File

@ -1,13 +1,13 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
namespace DB
{
MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
const size_t thread,
const MergeTreeReadPoolPtr & pool,
const size_t min_marks_to_read_,
@ -20,7 +20,7 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
const Settings & settings,
const Names & virt_column_names)
:
MergeTreeBaseBlockInputStream{storage, prewhere_info, max_block_size_rows,
MergeTreeBaseSelectBlockInputStream{storage, prewhere_info, max_block_size_rows,
preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache, true, virt_column_names},
thread{thread},
@ -39,7 +39,7 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
}
Block MergeTreeThreadBlockInputStream::getHeader() const
Block MergeTreeThreadSelectBlockInputStream::getHeader() const
{
auto res = pool->getHeader();
executePrewhereActions(res, prewhere_info);
@ -49,7 +49,7 @@ Block MergeTreeThreadBlockInputStream::getHeader() const
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadBlockInputStream::getNewTask()
bool MergeTreeThreadSelectBlockInputStream::getNewTask()
{
task = pool->getTask(min_marks_to_read, thread, ordered_names);
@ -112,6 +112,6 @@ bool MergeTreeThreadBlockInputStream::getNewTask()
}
MergeTreeThreadBlockInputStream::~MergeTreeThreadBlockInputStream() = default;
MergeTreeThreadSelectBlockInputStream::~MergeTreeThreadSelectBlockInputStream() = default;
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
namespace DB
@ -11,10 +11,10 @@ class MergeTreeReadPool;
/** Used in conjunction with MergeTreeReadPool, asking it for more work to do and performing whatever reads it is asked
* to perform.
*/
class MergeTreeThreadBlockInputStream : public MergeTreeBaseBlockInputStream
class MergeTreeThreadSelectBlockInputStream : public MergeTreeBaseSelectBlockInputStream
{
public:
MergeTreeThreadBlockInputStream(
MergeTreeThreadSelectBlockInputStream(
const size_t thread,
const std::shared_ptr<MergeTreeReadPool> & pool,
const size_t min_marks_to_read,
@ -29,7 +29,7 @@ public:
String getName() const override { return "MergeTreeThread"; }
~MergeTreeThreadBlockInputStream() override;
~MergeTreeThreadSelectBlockInputStream() override;
Block getHeader() const override;

View File

@ -256,17 +256,21 @@ MergedBlockOutputStream::MergedBlockOutputStream(
columns_list(columns_list_), part_path(part_path_)
{
init();
for (const auto & it : columns_list)
/// If summary size is more than threshold than we will use AIO
size_t total_size = 0;
if (aio_threshold > 0)
{
size_t estimated_size = 0;
if (aio_threshold > 0)
for (const auto & it : columns_list)
{
auto it2 = merged_column_to_size_.find(it.name);
if (it2 != merged_column_to_size_.end())
estimated_size = it2->second;
total_size += it2->second;
}
addStreams(part_path, it.name, *it.type, estimated_size, false);
}
for (const auto & it : columns_list)
addStreams(part_path, it.name, *it.type, total_size, false);
}
std::string MergedBlockOutputStream::getPartPath() const