Add processors to StorageMergeTree [WIP].

This commit is contained in:
Nikolai Kochetov 2019-09-13 18:41:09 +03:00
parent 1f5e62d741
commit 3c53dfd227
9 changed files with 112 additions and 65 deletions

View File

@ -19,7 +19,8 @@ namespace ErrorCodes
}
MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
MergeTreeBaseSelectBlockInputProcessor::MergeTreeBaseSelectBlockInputProcessor(
Block header,
const MergeTreeData & storage_,
const PrewhereInfoPtr & prewhere_info_,
UInt64 max_block_size_rows_,
@ -31,6 +32,7 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
bool save_marks_in_cache_,
const Names & virt_column_names_)
:
ISource(getHeader(std::move(header), prewhere_info_, virt_column_names_)),
storage(storage_),
prewhere_info(prewhere_info_),
max_block_size_rows(max_block_size_rows_),
@ -45,26 +47,27 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
}
Block MergeTreeBaseSelectBlockInputStream::readImpl()
Chunk MergeTreeBaseSelectBlockInputProcessor::generate()
{
Block res;
while (!res && !isCancelled())
while (!isCancelled())
{
if ((!task || task->isFinished()) && !getNewTask())
break;
return {};
res = readFromPart();
auto res = readFromPart();
if (res)
injectVirtualColumns(res);
if (!res.hasNoRows())
{
injectVirtualColumns(res, task.get(), virt_column_names);
return res;
}
}
return res;
return {};
}
void MergeTreeBaseSelectBlockInputStream::initializeRangeReaders(MergeTreeReadTask & current_task)
void MergeTreeBaseSelectBlockInputProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
if (prewhere_info)
{
@ -103,7 +106,7 @@ void MergeTreeBaseSelectBlockInputStream::initializeRangeReaders(MergeTreeReadTa
}
Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl()
Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPartImpl()
{
if (task->size_predictor)
task->size_predictor->startBlock();
@ -160,7 +163,8 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl()
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.block.rows();
progressImpl({ read_result.numReadRows(), read_result.numBytesRead() });
/// TODO
/// progressImpl({ read_result.numReadRows(), read_result.numBytesRead() });
if (task->size_predictor)
{
@ -177,13 +181,14 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPartImpl()
column.column = column.column->convertToFullColumnIfConst();
}
read_result.block.checkNumberOfRows();
UInt64 num_rows = read_result.columns.empty() ? 0
: read_result.columns[0]->size();
return read_result.block;
return Chunk(std::move(read_result.columns), num_rows);
}
Block MergeTreeBaseSelectBlockInputStream::readFromPart()
Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPart()
{
if (!task->range_reader.isInitialized())
initializeRangeReaders(*task);
@ -192,15 +197,18 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart()
}
void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) const
template <typename InsertCallback>
static void injectVirtualColumnsImpl(size_t rows, InsertCallback & callback, MergeTreeReadTask * task, const Names & virtual_columns)
{
/// add virtual columns
/// Except _sample_factor, which is added from the outside.
if (!virt_column_names.empty())
if (!virtual_columns.empty())
{
const auto rows = block.rows();
if (unlikely(rows && !task))
throw Exception("Cannot insert virtual columns to non-empty chunk without specified task.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & virt_column_name : virt_column_names)
for (const auto & virt_column_name : virtual_columns)
{
if (virt_column_name == "_part")
{
@ -210,7 +218,7 @@ void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) co
else
column = DataTypeString().createColumn();
block.insert({ column, std::make_shared<DataTypeString>(), virt_column_name});
callback.template insert<DataTypeString>(column, virt_column_name);
}
else if (virt_column_name == "_part_index")
{
@ -220,7 +228,7 @@ void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) co
else
column = DataTypeUInt64().createColumn();
block.insert({ column, std::make_shared<DataTypeUInt64>(), virt_column_name});
callback.template insert<DataTypeUInt64>(column, virt_column_name);
}
else if (virt_column_name == "_partition_id")
{
@ -230,14 +238,55 @@ void MergeTreeBaseSelectBlockInputStream::injectVirtualColumns(Block & block) co
else
column = DataTypeString().createColumn();
block.insert({ column, std::make_shared<DataTypeString>(), virt_column_name});
callback.template insert<DataTypeString>(column, virt_column_name);
}
}
}
}
namespace
{
struct InsertIntoBlockCallback
{
template <typename DataType>
void insert(const ColumnPtr & column, const String & name)
{
block.insert({column, std::make_shared<DataType>(), name});
}
void MergeTreeBaseSelectBlockInputStream::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
Block & block;
};
struct InsertIntoColumnsCallback
{
template <typename>
void insert(const ColumnPtr & column, const String &)
{
columns.push_back(column);
}
Columns & columns;
};
}
void MergeTreeBaseSelectBlockInputProcessor::injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns)
{
InsertIntoBlockCallback callback { block };
injectVirtualColumnsImpl(block.rows(), callback, task, virtual_columns);
}
void MergeTreeBaseSelectBlockInputProcessor::injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns)
{
UInt64 num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
InsertIntoColumnsCallback callback { columns };
injectVirtualColumnsImpl(num_rows, callback, task, virtual_columns);
chunk.setColumns(columns, num_rows);
}
void MergeTreeBaseSelectBlockInputProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
{
if (prewhere_info)
{
@ -253,7 +302,15 @@ void MergeTreeBaseSelectBlockInputStream::executePrewhereActions(Block & block,
}
}
Block MergeTreeBaseSelectBlockInputProcessor::getHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns)
{
executePrewhereActions(block, prewhere_info);
injectVirtualColumns(block, nullptr, virtual_columns);
return block;
}
MergeTreeBaseSelectBlockInputStream::~MergeTreeBaseSelectBlockInputStream() = default;
MergeTreeBaseSelectBlockInputProcessor::~MergeTreeBaseSelectBlockInputProcessor() = default;
}

View File

@ -5,6 +5,8 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Processors/ISource.h>
namespace DB
{
@ -14,10 +16,11 @@ class MarkCache;
/// Base class for MergeTreeThreadSelectBlockInputStream and MergeTreeSelectBlockInputStream
class MergeTreeBaseSelectBlockInputStream : public IBlockInputStream
class MergeTreeBaseSelectBlockInputProcessor : public ISource
{
public:
MergeTreeBaseSelectBlockInputStream(
MergeTreeBaseSelectBlockInputProcessor(
Block header,
const MergeTreeData & storage_,
const PrewhereInfoPtr & prewhere_info_,
UInt64 max_block_size_rows_,
@ -29,24 +32,23 @@ public:
bool save_marks_in_cache_ = true,
const Names & virt_column_names_ = {});
~MergeTreeBaseSelectBlockInputStream() override;
~MergeTreeBaseSelectBlockInputProcessor() override;
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
protected:
Block readImpl() final;
Chunk generate() final;
/// Creates new this->task, and initilizes readers
virtual bool getNewTask() = 0;
/// We will call progressImpl manually.
void progress(const Progress &) override {}
virtual Chunk readFromPart();
virtual Block readFromPart();
Chunk readFromPartImpl();
Block readFromPartImpl();
void injectVirtualColumns(Block & block) const;
static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns);
static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
void initializeRangeReaders(MergeTreeReadTask & task);

View File

@ -141,7 +141,7 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
}
BlockInputStreams MergeTreeDataSelectExecutor::read(
Pipes MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
const Context & context,
@ -154,7 +154,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
max_block_size, num_streams, max_block_numbers_to_read);
}
BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
Pipes MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const SelectQueryInfo & query_info,
@ -565,7 +565,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
BlockInputStreams res;
Pipes res;
if (select.final())
{
@ -658,7 +658,7 @@ size_t roundRowsOrBytesToMarks(
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
@ -707,7 +707,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams res;
Pipes res;
if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1)
{
@ -817,7 +817,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
return res;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
@ -1026,7 +1026,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
UInt64 max_block_size,

View File

@ -24,7 +24,7 @@ public:
*/
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
BlockInputStreams read(
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -32,7 +32,7 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
BlockInputStreams readFromParts(
Pipes readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const SelectQueryInfo & query_info,
@ -46,7 +46,7 @@ private:
Logger * log;
BlockInputStreams spreadMarkRangesAmongStreams(
Pipes spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
@ -56,7 +56,7 @@ private:
const Names & virt_columns,
const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsWithOrder(
Pipes spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
@ -67,7 +67,7 @@ private:
const Names & virt_columns,
const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsFinal(
Pipes spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
UInt64 max_block_size,

View File

@ -157,7 +157,7 @@ public:
void addNumBytesRead(size_t count) { num_bytes_read += count; }
Block block;
Columns columns;
private:
RangesInfo started_ranges;

View File

@ -20,7 +20,7 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
const Settings & settings,
const Names & virt_column_names_)
:
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
MergeTreeBaseSelectBlockInputProcessor{pool->getHeader(), storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache_, true, virt_column_names_},
thread{thread_},
@ -38,19 +38,9 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
else
min_marks_to_read = min_marks_to_read_;
ordered_names = getHeader().getNames();
ordered_names = getPort().getHeader().getNames();
}
Block MergeTreeThreadSelectBlockInputStream::getHeader() const
{
auto res = pool->getHeader();
executePrewhereActions(res, prewhere_info);
injectVirtualColumns(res);
return res;
}
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool MergeTreeThreadSelectBlockInputStream::getNewTask()
{

View File

@ -11,7 +11,7 @@ class MergeTreeReadPool;
/** Used in conjunction with MergeTreeReadPool, asking it for more work to do and performing whatever reads it is asked
* to perform.
*/
class MergeTreeThreadSelectBlockInputStream : public MergeTreeBaseSelectBlockInputStream
class MergeTreeThreadSelectBlockInputStream : public MergeTreeBaseSelectBlockInputProcessor
{
public:
MergeTreeThreadSelectBlockInputStream(
@ -31,8 +31,6 @@ public:
~MergeTreeThreadSelectBlockInputStream() override;
Block getHeader() const override;
protected:
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool getNewTask() override;

View File

@ -121,7 +121,7 @@ StorageMergeTree::~StorageMergeTree()
shutdown();
}
BlockInputStreams StorageMergeTree::read(
Pipes StorageMergeTree::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,

View File

@ -35,7 +35,7 @@ public:
bool supportsIndexForIn() const override { return true; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,