Remove more streams.

This commit is contained in:
Nikolai Kochetov 2021-07-22 19:05:52 +03:00
parent d1eeb37cac
commit fd754430eb
21 changed files with 313 additions and 748 deletions

View File

@ -15,8 +15,8 @@
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h> #include <DataTypes/DataTypeUUID.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <DataStreams/IBlockOutputStream.h> #include <Processors/Pipe.h>
#include <DataStreams/LimitBlockInputStream.h> #include <Processors/LimitTransform.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/UTF8Helpers.h> #include <Common/UTF8Helpers.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
@ -24,6 +24,10 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Formats/registerFormats.h> #include <Formats/registerFormats.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <common/StringRef.h> #include <common/StringRef.h>
#include <common/DateLUT.h> #include <common/DateLUT.h>
@ -1156,17 +1160,20 @@ try
if (!silent) if (!silent)
std::cerr << "Training models\n"; std::cerr << "Training models\n";
BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size); Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size));
input->readPrefix(); QueryPipeline pipeline;
while (Block block = input->read()) pipeline.init(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{ {
obfuscator.train(block.getColumns()); obfuscator.train(block.getColumns());
source_rows += block.rows(); source_rows += block.rows();
if (!silent) if (!silent)
std::cerr << "Processed " << source_rows << " rows\n"; std::cerr << "Processed " << source_rows << " rows\n";
} }
input->readSuffix();
} }
obfuscator.finalize(); obfuscator.finalize();
@ -1183,15 +1190,26 @@ try
file_in.seek(0, SEEK_SET); file_in.seek(0, SEEK_SET);
BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size); Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size));
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header);
if (processed_rows + source_rows > limit) if (processed_rows + source_rows > limit)
input = std::make_shared<LimitBlockInputStream>(input, limit - processed_rows, 0); {
pipe.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<LimitTransform>(cur_header, limit - processed_rows, 0);
});
}
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header);
PullingPipelineExecutor executor(pipeline);
input->readPrefix();
output->writePrefix(); output->writePrefix();
while (Block block = input->read()) Block block;
while (executor.pull(block))
{ {
Columns columns = obfuscator.generate(block.getColumns()); Columns columns = obfuscator.generate(block.getColumns());
output->write(header.cloneWithColumns(columns)); output->write(header.cloneWithColumns(columns));
@ -1200,7 +1218,6 @@ try
std::cerr << "Processed " << processed_rows << " rows\n"; std::cerr << "Processed " << processed_rows << " rows\n";
} }
output->writeSuffix(); output->writeSuffix();
input->readSuffix();
obfuscator.updateSeed(); obfuscator.updateSeed();
} }

View File

@ -45,6 +45,7 @@ SRCS(
SettingsProfilesCache.cpp SettingsProfilesCache.cpp
User.cpp User.cpp
UsersConfigAccessStorage.cpp UsersConfigAccessStorage.cpp
tests/gtest_access_rights_ops.cpp
) )

View File

@ -1,44 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A stream of blocks from which you can read the next block from an explicitly provided list.
* Also see OneBlockInputStream.
*/
class BlocksListBlockInputStream : public IBlockInputStream
{
public:
/// Acquires the ownership of the block list.
BlocksListBlockInputStream(BlocksList && list_)
: list(std::move(list_)), it(list.begin()), end(list.end()) {}
/// Uses a list of blocks lying somewhere else.
BlocksListBlockInputStream(BlocksList::iterator & begin_, BlocksList::iterator & end_)
: it(begin_), end(end_) {}
String getName() const override { return "BlocksList"; }
protected:
Block getHeader() const override { return list.empty() ? Block() : *list.begin(); }
Block readImpl() override
{
if (it == end)
return Block();
Block res = *it;
++it;
return res;
}
private:
BlocksList list;
BlocksList::iterator it;
const BlocksList::iterator end;
};
}

View File

@ -1,158 +0,0 @@
#include <algorithm>
#include <DataStreams/LimitBlockInputStream.h>
namespace DB
{
/// gets pointers to all columns of block, which were used for ORDER BY
static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
, description(description_)
{
if (use_limit_as_total_rows_approx)
{
addTotalRowsApprox(static_cast<size_t>(limit));
}
children.push_back(input);
}
Block LimitBlockInputStream::readImpl()
{
Block res;
UInt64 rows = 0;
/// pos >= offset + limit and all rows in the end of previous block were equal
/// to row at 'limit' position. So we check current block.
if (!ties_row_ref.empty() && pos >= offset + limit)
{
res = children.back()->read();
rows = res.rows();
if (!res)
return res;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ptr->sort_columns = extractSortColumns(*ptr, description);
UInt64 len;
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;
}
}
if (len < rows)
{
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
}
return *ptr;
}
if (pos >= offset + limit)
{
if (!always_read_till_end)
return res;
else
{
while (children.back()->read())
;
return res;
}
}
do
{
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
if (with_ties)
ptr->sort_columns = extractSortColumns(*ptr, description);
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
{
/// Save rowref for last row, because probalbly next block begins with the same row.
if (with_ties && pos == offset + limit)
ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1);
return *ptr;
}
/// give away a piece of the block
UInt64 start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));
UInt64 length = std::min(
static_cast<Int64>(limit), std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
/// check if other rows in current block equals to last one in limit
if (with_ties)
{
ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1);
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, i);
if (current_row == ties_row_ref)
++length;
else
{
ties_row_ref.reset();
break;
}
}
}
if (length == rows)
return *ptr;
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
return *ptr;
}
}

View File

@ -1,47 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/SortDescription.h>
namespace DB
{
/** Implements the LIMIT relational operation.
*/
class LimitBlockInputStream : public IBlockInputStream
{
public:
/** If always_read_till_end = false (by default), then after reading enough data,
* returns an empty block, and this causes the query to be canceled.
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
* with_ties = true, when query has WITH TIES modifier. If so, description should be provided
* description lets us know which row we should check for equality
*/
LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_,
bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false,
bool with_ties_ = false, const SortDescription & description_ = {});
String getName() const override { return "Limit"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
UInt64 limit;
UInt64 offset;
UInt64 pos = 0;
bool always_read_till_end;
bool with_ties;
const SortDescription description;
SharedBlockRowRef ties_row_ref;
};
}

View File

@ -1,273 +0,0 @@
#include <queue>
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergingSortedBlockInputStream::MergingSortedBlockInputStream(
const BlockInputStreams & inputs_, SortDescription description_,
size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
: description(std::move(description_)), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, source_blocks(inputs_.size())
, cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
, log(&Poco::Logger::get("MergingSortedBlockInputStream"))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
header = children.at(0)->getHeader();
num_columns = header.columns();
}
void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
{
/// Read the first blocks, initialize the queue.
if (first)
{
first = false;
for (size_t i = 0; i < source_blocks.size(); ++i)
{
Block & block = source_blocks[i];
if (block)
continue;
block = children[i]->read();
const size_t rows = block.rows();
if (rows == 0)
continue;
if (expected_block_size < rows)
expected_block_size = std::min(rows, max_block_size);
cursors[i] = SortCursorImpl(block, description, i);
has_collation |= cursors[i].has_collation;
}
if (has_collation)
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
else
queue_without_collation = SortingHeap<SortCursor>(cursors);
}
/// Let's check that all source blocks have the same structure.
for (const auto & block : source_blocks)
{
if (!block)
continue;
assertBlocksHaveEqualStructure(block, header, getName());
}
merged_columns.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
merged_columns[i] = header.safeGetByPosition(i).column->cloneEmpty();
merged_columns[i]->reserve(expected_block_size);
}
}
Block MergingSortedBlockInputStream::readImpl()
{
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
MutableColumns merged_columns;
init(merged_columns);
if (merged_columns.empty())
return {};
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue_without_collation);
return header.cloneWithColumns(std::move(merged_columns));
}
template <typename TSortCursor>
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue)
{
size_t order = current->order;
size_t size = cursors.size();
if (order >= size || &cursors[order] != current.impl)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
while (true)
{
source_blocks[order] = children[order]->read();
if (!source_blocks[order])
{
queue.removeTop();
break;
}
if (source_blocks[order].rows())
{
cursors[order].reset(source_blocks[order]);
queue.replaceTop(&cursors[order]);
break;
}
}
}
template <typename TSortingHeap>
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue)
{
size_t merged_rows = 0;
/** Increase row counters.
* Return true if it's time to finish generating the current data block.
*/
auto count_row_and_check_limit = [&, this]()
{
++total_merged_rows;
if (limit && total_merged_rows == limit)
{
// std::cerr << "Limit reached\n";
cancel(false);
finished = true;
return true;
}
++merged_rows;
return merged_rows >= max_block_size;
};
/// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size`
while (queue.isValid())
{
auto current = queue.current();
/** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/
if (current->isFirst()
&& (queue.size() == 1
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
{
// std::cerr << "current block is totally less or equals\n";
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
if (merged_rows != 0)
{
//std::cerr << "merged rows is non-zero\n";
return;
}
/// Actually, current->order stores source number (i.e. cursors[current->order] == current)
size_t source_num = current->order;
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i] = IColumn::mutate(std::move(source_blocks[source_num].getByPosition(i).column));
// std::cerr << "copied columns\n";
merged_rows = merged_columns.at(0)->size();
/// Limit output
if (limit && total_merged_rows + merged_rows > limit)
{
merged_rows = limit - total_merged_rows;
for (size_t i = 0; i < num_columns; ++i)
{
auto & column = merged_columns[i];
column = IColumn::mutate(column->cut(0, merged_rows));
}
cancel(false);
finished = true;
}
/// Write order of rows for other columns
/// this data will be used in grather stream
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < merged_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
//std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
fetchNextBlock(current, queue);
return;
}
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
// std::cerr << "Inserting row\n";
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current->order);
out_row_sources_buf->write(row_source.data);
}
if (!current->isLast())
{
// std::cerr << "moving to next row\n";
queue.next();
}
else
{
/// We get the next block from the corresponding source, if there is one.
// std::cerr << "It was last row, fetching next block\n";
fetchNextBlock(current, queue);
}
if (count_row_and_check_limit())
return;
}
/// We have read all data. Ask children to cancel providing more data.
cancel(false);
finished = true;
}
void MergingSortedBlockInputStream::readSuffixImpl()
{
if (quiet)
return;
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
if (!seconds)
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", profile_info.blocks, profile_info.rows);
else
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
profile_info.blocks, profile_info.rows, seconds,
profile_info.rows / seconds,
ReadableSize(profile_info.bytes / seconds));
}
}

View File

@ -1,87 +0,0 @@
#pragma once
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/IBlockInputStream.h>
namespace Poco { class Logger; }
namespace DB
{
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IBlockInputStream
{
public:
/** limit - if isn't 0, then we can produce only first limit rows in sorted order.
* out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each read row (and needed flag)
* quiet - don't log profiling info
*/
MergingSortedBlockInputStream(
const BlockInputStreams & inputs_, SortDescription description_, size_t max_block_size_,
UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
void readSuffixImpl() override;
/// Initializes the queue and the columns of next result block.
void init(MutableColumns & merged_columns);
/// Gets the next block from the source corresponding to the `current`.
template <typename TSortCursor>
void fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue);
Block header;
const SortDescription description;
const size_t max_block_size;
UInt64 limit;
UInt64 total_merged_rows = 0;
bool first = true;
bool has_collation = false;
bool quiet = false;
/// May be smaller or equal to max_block_size. To do 'reserve' for columns.
size_t expected_block_size = 0;
/// Blocks currently being merged.
size_t num_columns = 0;
Blocks source_blocks;
SortCursorImpls cursors;
SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SortCursorWithCollation> queue_with_collation;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf;
private:
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
*/
template <typename TSortingHeap>
void merge(MutableColumns & merged_columns, TSortingHeap & queue);
Poco::Logger * log;
/// Read is finished.
bool finished = false;
};
}

View File

@ -4,6 +4,9 @@
#include <DataStreams/NativeBlockInputStream.h> #include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h> #include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
@ -32,32 +35,38 @@ struct TemporaryFileStream
{} {}
/// Flush data from input stream into file for future reading /// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, IBlockInputStream & input, static void write(const std::string & path, const Block & header, QueryPipeline pipeline, const std::string & codec)
std::atomic<bool> * is_cancelled, const std::string & codec)
{ {
WriteBufferFromFile file_buf(path); WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {})); CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header); NativeBlockOutputStream output(compressed_buf, 0, header);
copyData(input, output, is_cancelled);
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize(); compressed_buf.finalize();
} }
}; };
class TemporaryFileLazyInputStream : public IBlockInputStream class TemporaryFileLazySource : public ISource
{ {
public: public:
TemporaryFileLazyInputStream(const std::string & path_, const Block & header_) TemporaryFileLazySource(const std::string & path_, const Block & header_)
: path(path_) : ISource(header_)
, header(header_) , path(path_)
, done(false) , done(false)
{} {}
String getName() const override { return "TemporaryFile"; } String getName() const override { return "TemporaryFileLazySource"; }
Block getHeader() const override { return header; }
void readSuffix() override {}
protected: protected:
Block readImpl() override Chunk generate() override
{ {
if (done) if (done)
return {}; return {};
@ -71,7 +80,7 @@ protected:
done = true; done = true;
stream.reset(); stream.reset();
} }
return block; return Chunk(block.getColumns(), block.rows());
} }
private: private:

View File

@ -1,11 +1,10 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Columns/ColumnVector.h> #include <Columns/ColumnVector.h>
#include <DataStreams/BlocksListBlockInputStream.h> #include <Processors/Sources/BlocksListSource.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Merges/MergingSortedTransform.h> #include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h> #include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
@ -40,7 +39,7 @@ static Pipe getInputStreams(const std::vector<std::string> & column_names, const
size_t start = stride; size_t start = stride;
while (blocks_count--) while (blocks_count--)
blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start)); blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<BlocksListBlockInputStream>(std::move(blocks)))); pipes.emplace_back(std::make_shared<BlocksListSource>(std::move(blocks)));
} }
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
@ -57,7 +56,7 @@ static Pipe getInputStreamsEqualStride(const std::vector<std::string> & column_n
size_t start = i; size_t start = i;
while (blocks_count--) while (blocks_count--)
blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start)); blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<BlocksListBlockInputStream>(std::move(blocks)))); pipes.emplace_back(std::make_shared<BlocksListSource>(std::move(blocks)));
i++; i++;
} }
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));

View File

@ -2,8 +2,10 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <DataStreams/BlocksListBlockInputStream.h> #include <Processors/Sources/BlocksListSource.h>
#include <DataStreams/CheckSortedBlockInputStream.h> #include <Processors/Transforms/CheckSortedTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
@ -89,14 +91,22 @@ TEST(CheckSortedBlockInputStream, CheckGoodCase)
for (size_t i = 0; i < 3; ++i) for (size_t i = 0; i < 3; ++i)
blocks.push_back(getSortedBlockWithSize(key_columns, 10, 1, i * 10)); blocks.push_back(getSortedBlockWithSize(key_columns, 10, 1, i * 10));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks)); Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description); QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_NO_THROW(sorted.read()); PullingPipelineExecutor executor(pipeline);
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read()); Chunk chunk;
EXPECT_EQ(sorted.read(), Block()); EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_TRUE(executor.pull(chunk));
} }
TEST(CheckSortedBlockInputStream, CheckBadLastRow) TEST(CheckSortedBlockInputStream, CheckBadLastRow)
@ -109,14 +119,21 @@ TEST(CheckSortedBlockInputStream, CheckBadLastRow)
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 0)); blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 0));
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 300)); blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 300));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks)); Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description); QueryPipeline pipeline;
pipeline.init(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
EXPECT_NO_THROW(sorted.read()); Chunk chunk;
EXPECT_NO_THROW(sorted.read()); EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_THROW(sorted.read(), DB::Exception); EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_THROW(executor.pull(chunk), DB::Exception);
} }
@ -127,11 +144,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1)
BlocksList blocks; BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 5, 1, 77)); blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 5, 1, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks)); Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description); QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_THROW(sorted.read(), DB::Exception); PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_THROW(executor.pull(chunk), DB::Exception);
} }
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2) TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
@ -141,11 +166,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
BlocksList blocks; BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 99, 2, 77)); blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 99, 2, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks)); Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description); QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_THROW(sorted.read(), DB::Exception); PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_THROW(executor.pull(chunk), DB::Exception);
} }
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3) TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
@ -155,11 +188,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
BlocksList blocks; BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 50, 0, 77)); blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 50, 0, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks)); Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description); QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_THROW(sorted.read(), DB::Exception); PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_THROW(executor.pull(chunk), DB::Exception);
} }
TEST(CheckSortedBlockInputStream, CheckEqualBlock) TEST(CheckSortedBlockInputStream, CheckEqualBlock)
@ -171,11 +212,19 @@ TEST(CheckSortedBlockInputStream, CheckEqualBlock)
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 10)); blocks.push_back(getEqualValuesBlockWithSize(key_columns, 10));
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 1)); blocks.push_back(getEqualValuesBlockWithSize(key_columns, 1));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks)); Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description); QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_NO_THROW(sorted.read()); PullingPipelineExecutor executor(pipeline);
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read()); Chunk chunk;
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
} }

View File

@ -19,7 +19,6 @@ SRCS(
BlockIO.cpp BlockIO.cpp
BlockStreamProfileInfo.cpp BlockStreamProfileInfo.cpp
CheckConstraintsBlockOutputStream.cpp CheckConstraintsBlockOutputStream.cpp
CheckSortedBlockInputStream.cpp
ColumnGathererStream.cpp ColumnGathererStream.cpp
ConvertingBlockInputStream.cpp ConvertingBlockInputStream.cpp
CountingBlockOutputStream.cpp CountingBlockOutputStream.cpp
@ -30,9 +29,7 @@ SRCS(
ITTLAlgorithm.cpp ITTLAlgorithm.cpp
InputStreamFromASTInsertQuery.cpp InputStreamFromASTInsertQuery.cpp
InternalTextLogsRowOutputStream.cpp InternalTextLogsRowOutputStream.cpp
LimitBlockInputStream.cpp
MaterializingBlockInputStream.cpp MaterializingBlockInputStream.cpp
MergingSortedBlockInputStream.cpp
MongoDBBlockInputStream.cpp MongoDBBlockInputStream.cpp
NativeBlockInputStream.cpp NativeBlockInputStream.cpp
NativeBlockOutputStream.cpp NativeBlockOutputStream.cpp

View File

@ -9,11 +9,10 @@
#include <Interpreters/join_common.h> #include <Interpreters/join_common.h>
#include <DataStreams/materializeBlock.h> #include <DataStreams/materializeBlock.h>
#include <DataStreams/TemporaryFileStream.h> #include <DataStreams/TemporaryFileStream.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/BlocksListSource.h>
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
#include <Processors/Transforms/MergeSortingTransform.h> #include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h> #include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <DataStreams/BlocksListBlockInputStream.h>
namespace DB namespace DB
@ -518,8 +517,7 @@ void MergeJoin::mergeInMemoryRightBlocks()
if (right_blocks.empty()) if (right_blocks.empty())
return; return;
auto stream = std::make_shared<BlocksListBlockInputStream>(std::move(right_blocks.blocks)); Pipe source(std::make_shared<BlocksListSource>(std::move(right_blocks.blocks)));
Pipe source(std::make_shared<SourceFromInputStream>(std::move(stream)));
right_blocks.clear(); right_blocks.clear();
QueryPipeline pipeline; QueryPipeline pipeline;

View File

@ -17,7 +17,7 @@
#include <Processors/QueryPlan/FilterStep.h> #include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h> #include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <DataStreams/CheckSortedBlockInputStream.h> #include <Processors/Transforms/CheckSortedTransform.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
@ -901,12 +901,18 @@ BlockInputStreamPtr MutationsInterpreter::execute()
select_interpreter->buildQueryPlan(plan); select_interpreter->buildQueryPlan(plan);
auto pipeline = addStreamsForLaterStages(stages, plan); auto pipeline = addStreamsForLaterStages(stages, plan);
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(*pipeline));
/// Sometimes we update just part of columns (for example UPDATE mutation) /// Sometimes we update just part of columns (for example UPDATE mutation)
/// in this case we don't read sorting key, so just we don't check anything. /// in this case we don't read sorting key, so just we don't check anything.
if (auto sort_desc = getStorageSortDescriptionIfPossible(result_stream->getHeader())) if (auto sort_desc = getStorageSortDescriptionIfPossible(pipeline->getHeader()))
result_stream = std::make_shared<CheckSortedBlockInputStream>(result_stream, *sort_desc); {
pipeline->addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, *sort_desc);
});
}
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(*pipeline));
if (!updated_header) if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader()); updated_header = std::make_unique<Block>(result_stream->getHeader());

View File

@ -1,7 +1,9 @@
#include <Core/SortCursor.h> #include <Core/SortCursor.h>
#include <Interpreters/SortedBlocksWriter.h> #include <Interpreters/SortedBlocksWriter.h>
#include <DataStreams/MergingSortedBlockInputStream.h> #include <Processors/QueryPipeline.h>
#include <DataStreams/OneBlockInputStream.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <DataStreams/TemporaryFileStream.h> #include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h> #include <DataStreams/materializeBlock.h>
#include <Disks/IVolume.h> #include <Disks/IVolume.h>
@ -18,32 +20,33 @@ namespace ErrorCodes
namespace namespace
{ {
std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, IBlockInputStream & stream, const String & codec) std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, QueryPipeline pipeline, const String & codec)
{ {
auto tmp_file = createTemporaryFile(tmp_path); auto tmp_file = createTemporaryFile(tmp_path);
std::atomic<bool> is_cancelled{false}; TemporaryFileStream::write(tmp_file->path(), header, std::move(pipeline), codec);
TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled, codec);
if (is_cancelled)
throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
return tmp_file; return tmp_file;
} }
SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream, SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, QueryPipeline pipeline,
const String & codec, std::function<void(const Block &)> callback = [](const Block &){}) const String & codec, std::function<void(const Block &)> callback = [](const Block &){})
{ {
std::vector<std::unique_ptr<TemporaryFile>> files; std::vector<std::unique_ptr<TemporaryFile>> files;
PullingPipelineExecutor executor(pipeline);
while (Block block = stream.read()) Block block;
while (executor.pull(block))
{ {
if (!block.rows()) if (!block.rows())
continue; continue;
callback(block); callback(block);
OneBlockInputStream block_stream(block); QueryPipeline one_block_pipeline;
auto tmp_file = flushToFile(tmp_path, header, block_stream, codec); Chunk chunk(block.getColumns(), block.rows());
one_block_pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk))));
auto tmp_file = flushToFile(tmp_path, header, std::move(one_block_pipeline), codec);
files.emplace_back(std::move(tmp_file)); files.emplace_back(std::move(tmp_file));
} }
@ -119,23 +122,30 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
{ {
const std::string path = getPath(); const std::string path = getPath();
if (blocks.empty()) Pipes pipes;
pipes.reserve(blocks.size());
for (const auto & block : blocks)
if (auto num_rows = block.rows())
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), num_rows)));
if (pipes.empty())
return {}; return {};
if (blocks.size() == 1) QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
if (pipeline.getNumStreams() > 1)
{ {
OneBlockInputStream sorted_input(blocks.front()); auto transform = std::make_shared<MergingSortedTransform>(
return flushToFile(path, sample_block, sorted_input, codec); pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
pipeline.addTransform(std::move(transform));
} }
BlockInputStreams inputs; return flushToFile(path, sample_block, std::move(pipeline), codec);
inputs.reserve(blocks.size());
for (const auto & block : blocks)
if (block.rows())
inputs.push_back(std::make_shared<OneBlockInputStream>(block));
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
return flushToFile(path, sample_block, sorted_input, codec);
} }
SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
@ -158,8 +168,8 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
if (!blocks.empty()) if (!blocks.empty())
files.emplace_back(flush(blocks)); files.emplace_back(flush(blocks));
BlockInputStreams inputs; Pipes pipes;
inputs.reserve(num_files_for_merge); pipes.reserve(num_files_for_merge);
/// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge. /// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge.
{ {
@ -170,13 +180,26 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
{ {
for (const auto & file : files) for (const auto & file : files)
{ {
inputs.emplace_back(streamFromFile(file)); pipes.emplace_back(streamFromFile(file));
if (inputs.size() == num_files_for_merge || &file == &files.back()) if (pipes.size() == num_files_for_merge || &file == &files.back())
{ {
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); QueryPipeline pipeline;
new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec)); pipeline.init(Pipe::unitePipes(std::move(pipes)));
inputs.clear(); pipes = Pipes();
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
pipeline.addTransform(std::move(transform));
}
new_files.emplace_back(flushToFile(getPath(), sample_block, std::move(pipeline), codec));
} }
} }
@ -185,22 +208,35 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
} }
for (const auto & file : files) for (const auto & file : files)
inputs.emplace_back(streamFromFile(file)); pipes.emplace_back(streamFromFile(file));
} }
return PremergedFiles{std::move(files), std::move(inputs)}; return PremergedFiles{std::move(files), Pipe::unitePipes(std::move(pipes))};
} }
SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback) SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback)
{ {
PremergedFiles files = premerge(); PremergedFiles files = premerge();
MergingSortedBlockInputStream sorted_input(files.streams, sort_description, rows_in_block); QueryPipeline pipeline;
return flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback); pipeline.init(std::move(files.pipe));
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
pipeline.addTransform(std::move(transform));
}
return flushToManyFiles(getPath(), sample_block, std::move(pipeline), codec, callback);
} }
BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{ {
return std::make_shared<TemporaryFileLazyInputStream>(file->path(), materializeBlock(sample_block)); return Pipe(std::make_shared<TemporaryFileLazySource>(file->path(), materializeBlock(sample_block)));
} }
String SortedBlocksWriter::getPath() const String SortedBlocksWriter::getPath() const
@ -250,18 +286,35 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
size_t num_rows = 0; size_t num_rows = 0;
{ /// Merge sort blocks { /// Merge sort blocks
BlockInputStreams inputs; Pipes pipes;
inputs.reserve(blocks.size()); pipes.reserve(blocks.size());
for (auto & block : blocks) for (auto & block : blocks)
{ {
num_rows += block.rows(); num_rows += block.rows();
inputs.emplace_back(std::make_shared<OneBlockInputStream>(block)); Chunk chunk(block.getColumns(), block.rows());
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk)));
} }
Blocks tmp_blocks; Blocks tmp_blocks;
MergingSortedBlockInputStream stream(inputs, sort_description, num_rows);
while (const auto & block = stream.read()) QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
num_rows);
pipeline.addTransform(std::move(transform));
}
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
tmp_blocks.emplace_back(block); tmp_blocks.emplace_back(block);
blocks.swap(tmp_blocks); blocks.swap(tmp_blocks);

View File

@ -6,6 +6,7 @@
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <Processors/Pipe.h>
#include <DataStreams/SizeLimits.h> #include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h> #include <DataStreams/IBlockStream_fwd.h>
@ -17,6 +18,8 @@ class TableJoin;
class MergeJoinCursor; class MergeJoinCursor;
struct MergeJoinEqualRange; struct MergeJoinEqualRange;
class Pipe;
class IVolume; class IVolume;
using VolumePtr = std::shared_ptr<IVolume>; using VolumePtr = std::shared_ptr<IVolume>;
@ -56,7 +59,7 @@ struct SortedBlocksWriter
struct PremergedFiles struct PremergedFiles
{ {
SortedFiles files; SortedFiles files;
BlockInputStreams streams; Pipe pipe;
}; };
static constexpr const size_t num_streams = 2; static constexpr const size_t num_streams = 2;
@ -94,7 +97,7 @@ struct SortedBlocksWriter
} }
String getPath() const; String getPath() const;
BlockInputStreamPtr streamFromFile(const TmpFilePtr & file) const; Pipe streamFromFile(const TmpFilePtr & file) const;
void insert(Block && block); void insert(Block && block);
TmpFilePtr flush(const BlocksList & blocks) const; TmpFilePtr flush(const BlocksList & blocks) const;

View File

@ -0,0 +1,47 @@
#pragma once
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
/** A stream of blocks from which you can read the next block from an explicitly provided list.
* Also see OneBlockInputStream.
*/
class BlocksListSource : public SourceWithProgress
{
public:
/// Acquires the ownership of the block list.
explicit BlocksListSource(BlocksList && list_)
: SourceWithProgress(list_.empty() ? Block() : list_.front().cloneEmpty())
, list(std::move(list_)), it(list.begin()), end(list.end()) {}
/// Uses a list of blocks lying somewhere else.
BlocksListSource(BlocksList::iterator & begin_, BlocksList::iterator & end_)
: SourceWithProgress(begin_ == end_ ? Block() : begin_->cloneEmpty())
, it(begin_), end(end_) {}
String getName() const override { return "BlocksListSource"; }
protected:
Chunk generate() override
{
if (it == end)
return {};
Block res = *it;
++it;
size_t num_rows = res.rows();
return Chunk(res.getColumns(), num_rows);
}
private:
BlocksList list;
BlocksList::iterator it;
const BlocksList::iterator end;
};
}

View File

@ -1,4 +1,4 @@
#include <DataStreams/CheckSortedBlockInputStream.h> #include <Processors/Transforms/CheckSortedTransform.h>
#include <Common/FieldVisitorDump.h> #include <Common/FieldVisitorDump.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
@ -12,20 +12,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
CheckSortedBlockInputStream::CheckSortedBlockInputStream( CheckSortedTransform::CheckSortedTransform(
const BlockInputStreamPtr & input_, const Block & header_,
const SortDescription & sort_description_) const SortDescription & sort_description_)
: header(input_->getHeader()) : ISimpleTransform(header_, header_, false)
, sort_description_map(addPositionsToSortDescriptions(sort_description_)) , sort_description_map(addPositionsToSortDescriptions(sort_description_))
{ {
children.push_back(input_);
} }
SortDescriptionsWithPositions SortDescriptionsWithPositions
CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescription & sort_description) CheckSortedTransform::addPositionsToSortDescriptions(const SortDescription & sort_description)
{ {
SortDescriptionsWithPositions result; SortDescriptionsWithPositions result;
result.reserve(sort_description.size()); result.reserve(sort_description.size());
const auto & header = getInputPort().getHeader();
for (SortColumnDescription description_copy : sort_description) for (SortColumnDescription description_copy : sort_description)
{ {
@ -39,11 +39,11 @@ CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescriptio
} }
Block CheckSortedBlockInputStream::readImpl() void CheckSortedTransform::transform(Chunk & chunk)
{ {
Block block = children.back()->read(); size_t num_rows = chunk.getNumRows();
if (!block || block.rows() == 0) if (num_rows == 0)
return block; return;
auto check = [this](const Columns & left, size_t left_index, const Columns & right, size_t right_index) auto check = [this](const Columns & left, size_t left_index, const Columns & right, size_t right_index)
{ {
@ -70,23 +70,20 @@ Block CheckSortedBlockInputStream::readImpl()
} }
}; };
auto block_columns = block.getColumns(); const auto & chunk_columns = chunk.getColumns();
if (!last_row.empty()) if (!last_row.empty())
check(last_row, 0, block_columns, 0); check(last_row, 0, chunk_columns, 0);
size_t rows = block.rows(); for (size_t i = 1; i < num_rows; ++i)
for (size_t i = 1; i < rows; ++i) check(chunk_columns, i - 1, chunk_columns, i);
check(block_columns, i - 1, block_columns, i);
last_row.clear(); last_row.clear();
for (size_t i = 0; i < block.columns(); ++i) for (const auto & chunk_column : chunk_columns)
{ {
auto column = block_columns[i]->cloneEmpty(); auto column = chunk_column->cloneEmpty();
column->insertFrom(*block_columns[i], rows - 1); column->insertFrom(*chunk_column, num_rows - 1);
last_row.emplace_back(std::move(column)); last_row.emplace_back(std::move(column));
} }
return block;
} }
} }

View File

@ -1,5 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <Processors/ISimpleTransform.h>
#include <Core/SortDescription.h> #include <Core/SortDescription.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
@ -9,26 +9,23 @@ using SortDescriptionsWithPositions = std::vector<SortColumnDescription>;
/// Streams checks that flow of blocks is sorted in the sort_description order /// Streams checks that flow of blocks is sorted in the sort_description order
/// Othrewise throws exception in readImpl function. /// Othrewise throws exception in readImpl function.
class CheckSortedBlockInputStream : public IBlockInputStream class CheckSortedTransform : public ISimpleTransform
{ {
public: public:
CheckSortedBlockInputStream( CheckSortedTransform(
const BlockInputStreamPtr & input_, const Block & header_,
const SortDescription & sort_description_); const SortDescription & sort_description_);
String getName() const override { return "CheckingSorted"; } String getName() const override { return "CheckSortedTransform"; }
Block getHeader() const override { return header; }
protected: protected:
Block readImpl() override; void transform(Chunk & chunk) override;
private: private:
Block header;
SortDescriptionsWithPositions sort_description_map; SortDescriptionsWithPositions sort_description_map;
Columns last_row; Columns last_row;
private:
/// Just checks, that all sort_descriptions has column_number /// Just checks, that all sort_descriptions has column_number
SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description); SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description);
}; };

View File

@ -143,6 +143,7 @@ SRCS(
Transforms/AggregatingInOrderTransform.cpp Transforms/AggregatingInOrderTransform.cpp
Transforms/AggregatingTransform.cpp Transforms/AggregatingTransform.cpp
Transforms/ArrayJoinTransform.cpp Transforms/ArrayJoinTransform.cpp
Transforms/CheckSortedTransform.cpp
Transforms/CopyTransform.cpp Transforms/CopyTransform.cpp
Transforms/CreatingSetsTransform.cpp Transforms/CreatingSetsTransform.cpp
Transforms/CubeTransform.cpp Transforms/CubeTransform.cpp

View File

@ -16,7 +16,7 @@ limitations under the License. */
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlocksSource.h> #include <Processors/Sources/BlocksSource.h>
#include <DataStreams/MaterializingBlockInputStream.h> #include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h> #include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>