get rid of SharedChunk

This commit is contained in:
Alexander Tokmakov 2020-03-02 15:56:47 +03:00
parent 286c65e89e
commit 053094be9c
7 changed files with 44 additions and 80 deletions

View File

@ -23,16 +23,17 @@ LimitTransform::LimitTransform(
}
}
SharedChunkPtr LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const
Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const
{
assert(row < chunk.getNumRows());
auto last_row_columns = chunk.cloneEmptyColumns();
const auto & current_columns = chunk.getColumns();
ColumnRawPtrs current_columns = extractSortColumns(chunk.getColumns());
MutableColumns last_row_sort_columns;
for (size_t i = 0; i < current_columns.size(); ++i)
last_row_columns[i]->insertFrom(*current_columns[i], row);
SharedChunkPtr shared_chunk = std::make_shared<detail::SharedChunk>(Chunk(std::move(last_row_columns), 1));
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
return shared_chunk;
{
last_row_sort_columns.emplace_back(current_columns[i]->cloneEmpty());
last_row_sort_columns[i]->insertFrom(*current_columns[i], row);
}
return Chunk(std::move(last_row_sort_columns), 1);
}
@ -158,11 +159,9 @@ LimitTransform::Status LimitTransform::prepare()
void LimitTransform::work()
{
SharedChunkPtr shared_chunk = std::make_shared<detail::SharedChunk>(std::move(current_chunk));
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
size_t num_rows = shared_chunk->getNumRows();
size_t num_columns = shared_chunk->getNumColumns();
auto current_chunk_sort_columns = extractSortColumns(current_chunk.getColumns());
size_t num_rows = current_chunk.getNumRows();
size_t num_columns = current_chunk.getNumColumns();
if (previous_row_chunk && rows_read >= offset + limit)
{
@ -170,11 +169,11 @@ void LimitTransform::work()
size_t current_row_num = 0;
for (; current_row_num < num_rows; ++current_row_num)
{
if (!shared_chunk->sortColumnsEqualAt(current_row_num, 0, *previous_row_chunk))
if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
break;
}
auto columns = shared_chunk->detachColumns();
auto columns = current_chunk.detachColumns();
if (current_row_num < num_rows)
{
@ -202,11 +201,11 @@ void LimitTransform::work()
if (with_ties && length)
{
size_t current_row_num = start + length;
previous_row_chunk = makeChunkWithPreviousRow(*shared_chunk, current_row_num - 1);
previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_row_num - 1);
for (; current_row_num < num_rows; ++current_row_num)
{
if (!shared_chunk->sortColumnsEqualAt(current_row_num, 0, *previous_row_chunk))
if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
{
previous_row_chunk = {};
break;
@ -218,12 +217,11 @@ void LimitTransform::work()
if (length == num_rows)
{
current_chunk = std::move(*shared_chunk);
block_processed = true;
return;
}
auto columns = shared_chunk->detachColumns();
auto columns = current_chunk.detachColumns();
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(start, length);
@ -243,5 +241,16 @@ ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const
return res;
}
bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const
{
assert(current_chunk_sort_columns.size() == previous_row_chunk.getNumColumns());
size_t size = current_chunk_sort_columns.size();
const auto & previous_row_sort_columns = previous_row_chunk.getColumns();
for (size_t i = 0; i < size; ++i)
if (0 != current_chunk_sort_columns[i]->compareAt(current_chunk_row_num, 0, *previous_row_sort_columns[i], 1))
return false;
return true;
}
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/SharedChunk.h>
#include <Core/SortDescription.h>
namespace DB
@ -28,11 +27,12 @@ private:
bool with_ties;
const SortDescription description;
SharedChunkPtr previous_row_chunk;
Chunk previous_row_chunk; /// for WITH TIES, contains only sort columns
std::vector<size_t> sort_column_positions;
SharedChunkPtr makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const;
Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const;
ColumnRawPtrs extractSortColumns(const Columns & columns) const;
bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const;
public:
LimitTransform(

View File

@ -1,45 +0,0 @@
#pragma once
#include <algorithm>
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
bool sortColumnsEqualAt(size_t row_num, size_t other_row_num, const detail::SharedChunk & other) const
{
size_t size = sort_columns.size();
for (size_t i = 0; i < size; ++i)
if (0 != sort_columns[i]->compareAt(row_num, other_row_num, *other.sort_columns[i], 1))
return false;
return true;
}
};
}
using SharedChunkPtr = std::shared_ptr<detail::SharedChunk>;
}

View File

@ -308,21 +308,22 @@ void MergingSortedTransform::insertFromChunk(size_t source_num)
//std::cerr << "copied columns\n";
auto num_rows = source_chunks[source_num]->getNumRows();
auto num_rows = source_chunks[source_num].getNumRows();
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
if (limit && total_merged_rows_after_insertion > limit)
{
num_rows = total_merged_rows_after_insertion - limit;
merged_data.insertFromChunk(std::move(*source_chunks[source_num]), num_rows);
merged_data.insertFromChunk(std::move(source_chunks[source_num]), num_rows);
is_finished = true;
}
else
{
merged_data.insertFromChunk(std::move(*source_chunks[source_num]), 0);
merged_data.insertFromChunk(std::move(source_chunks[source_num]), 0);
need_data = true;
next_input_to_read = source_num;
}
source_chunks[source_num] = Chunk();
if (out_row_sources_buf)
{

View File

@ -3,7 +3,6 @@
#include <Processors/IProcessor.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <Processors/SharedChunk.h>
namespace DB
@ -113,7 +112,7 @@ protected:
WriteBuffer * out_row_sources_buf = nullptr;
/// Chunks currently being merged.
std::vector<SharedChunkPtr> source_chunks;
std::vector<Chunk> source_chunks;
SortCursorImpls cursors;
@ -142,22 +141,19 @@ private:
chunk.setColumns(std::move(columns), num_rows);
auto & shared_chunk_ptr = source_chunks[source_num];
auto & source_chunk = source_chunks[source_num];
if (!shared_chunk_ptr)
if (source_chunk.empty())
{
shared_chunk_ptr = std::make_shared<detail::SharedChunk>(std::move(chunk));
cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num);
source_chunk = std::move(chunk);
cursors[source_num] = SortCursorImpl(source_chunk.getColumns(), description, source_num);
has_collation |= cursors[source_num].has_collation;
}
else
{
*shared_chunk_ptr = std::move(chunk);
cursors[source_num].reset(shared_chunk_ptr->getColumns(), {});
source_chunk = std::move(chunk);
cursors[source_num].reset(source_chunk.getColumns(), {});
}
shared_chunk_ptr->all_columns = cursors[source_num].all_columns;
shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns;
}
};

View File

@ -52,3 +52,4 @@
*
100
100
100

View File

@ -32,5 +32,7 @@ SELECT '*';
select count() from (select number > 100 from numbers(2000) order by number > 100 limit 1, 7 with ties); --TODO replace "number > 100" with "number > 100 as n"
select count() from (select number, number < 100 from numbers(2000) order by number < 100 desc limit 10 with ties);
SET max_block_size = 5;
select count() from (select number < 100, number from numbers(2000) order by number < 100 desc limit 10 with ties);
DROP TABLE ties;