get rid of cloning single-row-chunk for last row

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-10-26 10:39:21 +00:00
parent 16687632da
commit f7c0914c2b
5 changed files with 83 additions and 93 deletions

View File

@ -39,9 +39,8 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
version_column_number = header_.getPositionByName(version_column);
}
detail::SharedChunkPtr ReplacingSortedAlgorithm::insertRow()
void ReplacingSortedAlgorithm::insertRow()
{
detail::SharedChunkPtr res;
if (out_row_sources_buf)
{
/// true flag value means "skip row"
@ -56,47 +55,35 @@ detail::SharedChunkPtr ReplacingSortedAlgorithm::insertRow()
/// We just record the position to be selected in the chunk
if (!selected_row.owned_chunk->replace_final_selection)
selected_row.owned_chunk->replace_final_selection = ColumnUInt64::create();
// fmt::print(stderr, "Adding row {} for chunk {}\n", selected_row.row_num, static_cast<void *>(selected_row.owned_chunk.get()));
selected_row.owned_chunk->replace_final_selection->insert(selected_row.row_num);
if (selected_row.current_cursor == nullptr) /// This is the "lonely" chunk w/o cursor, we keep and then emit it later
res = std::move(selected_row.owned_chunk);
/// This is the last row we can select from `selected_row.owned_chunk`, keep it to emit later
if (selected_row.current_cursor == nullptr)
to_be_emitted.push(std::move(selected_row.owned_chunk));
}
else
merged_data.insertRow(*selected_row.all_columns, selected_row.row_num, selected_row.owned_chunk->getNumRows());
selected_row.clear();
return res;
}
IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
{
/// Skipping final: we've done processing some chunk and can emit them
if (!to_be_emitted.empty())
{
auto chunk = std::move(to_be_emitted.front());
to_be_emitted.pop();
return emitChunk(chunk);
}
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
while (queue.isValid())
{
SortCursor current = queue.current();
if (sources[current.impl->order].chunk->empty() || (current->isLast() && skipLastRowFor(current->order)))
if (current->isLast() && skipLastRowFor(current->order))
{
auto & chunk = sources[current.impl->order].chunk;
if (!chunk->empty() && use_skipping_final)
{
if (selected_row.owned_chunk.get() == chunk.get())
{
/// selected_row points to current source chunk but the chunk will be destroy soon, either in emitChunk() or queue.removeTop()
/// In the first case, we create a cloned chunk with only one row from `selected_row.row_num` and let selected_row point to it
/// In the second case, we mark selected_row.owned_chunk = nullptr
/// In either case, the chunk is "lonely" and if later selected_row is inserted to final result, the chunk will be emitted
/// immediately. This will create some blocks with only single row, but it's not a big problem.
if (chunk->replace_final_selection)
selected_row.set(selected_row.owned_chunk->cloneForSelectedRow(selected_row.row_num), 0);
else
selected_row.current_cursor = nullptr;
}
if (chunk->replace_final_selection)
return emitChunk(chunk);
}
saveChunkForSkippingFinalFromSource(current.impl->order);
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
@ -104,12 +91,11 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
RowRef current_row;
setRowRef(current_row, current);
// fmt::print(stderr, "Current row owned chunk: {}, selected row owned chunk: {}\n", static_cast<void *>(current_row.owned_chunk.get()), static_cast<void *>(selected_row.owned_chunk.get()));
bool key_differs = selected_row.empty() || !current_row.hasEqualSortColumnsWith(selected_row);
if (key_differs)
{
/// if there are enough rows and the last one is calculated completely
/// If there are enough rows and the last one is calculated completely
if (merged_data.hasEnoughRows())
return Status(merged_data.pull());
@ -122,7 +108,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
{
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
chunk_to_emit = insertRow();
insertRow();
else if (cleanup && cleanedup_rows_count != nullptr)
{
*cleanedup_rows_count += current_row_sources.size();
@ -130,13 +116,14 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
}
}
else
chunk_to_emit = insertRow();
insertRow();
/// It's possible that insertRow() wasn't called
saveChunkForSkippingFinalFromSelectedRow();
}
selected_row.clear();
if (chunk_to_emit)
return emitChunk(chunk_to_emit);
selected_row.clear();
}
/// Initially, skip all rows. Unskip last on insert.
@ -160,6 +147,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/* nan_direction_hint = */ 1) >= 0)
{
max_pos = current_pos;
saveChunkForSkippingFinalFromSelectedRow();
setRowRef(selected_row, current);
}
@ -169,26 +157,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
}
else
{
auto & chunk = sources[current.impl->order].chunk;
if (use_skipping_final && !chunk->empty())
{
if (selected_row.owned_chunk.get() == chunk.get())
{
/// selected_row points to current source chunk but the chunk will be destroy soon, either in emitChunk() or queue.removeTop()
/// In the first case, we create a cloned chunk with only one row from `selected_row.row_num` and let selected_row point to it
/// In the second case, we mark selected_row.owned_chunk = nullptr
/// In either case, the chunk is "lonely" and if later selected_row is inserted to final result, the chunk will be emitted
/// immediately. This will create some blocks with only single row, but it's not a big problem.
if (chunk->replace_final_selection)
selected_row.set(selected_row.owned_chunk->cloneForSelectedRow(selected_row.row_num), 0);
else
selected_row.current_cursor = nullptr;
}
if (chunk->replace_final_selection)
return emitChunk(chunk);
}
saveChunkForSkippingFinalFromSource(current.impl->order);
/// We get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
@ -202,12 +171,11 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
/// We will write the data for the last primary key.
if (!selected_row.empty())
{
detail::SharedChunkPtr chunk;
if (is_deleted_column_number != -1)
{
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
chunk = insertRow();
insertRow();
else if (cleanup && cleanedup_rows_count != nullptr)
{
*cleanedup_rows_count += current_row_sources.size();
@ -215,15 +183,53 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
}
}
else
chunk = insertRow();
insertRow();
if (chunk)
return emitChunk(chunk, true);
/// It's possible that insertRow() wasn't called
saveChunkForSkippingFinalFromSelectedRow();
}
/// Skipping final: emit the remaining chunks
if (!to_be_emitted.empty())
{
auto chunk = std::move(to_be_emitted.front());
to_be_emitted.pop();
return emitChunk(chunk, to_be_emitted.empty());
}
return Status(merged_data.pull(), true);
}
void ReplacingSortedAlgorithm::saveChunkForSkippingFinalFromSelectedRow()
{
if (selected_row.owned_chunk && selected_row.owned_chunk->replace_final_selection && selected_row.current_cursor == nullptr)
to_be_emitted.push(std::move(selected_row.owned_chunk));
}
void ReplacingSortedAlgorithm::saveChunkForSkippingFinalFromSource(size_t current_source_index)
{
if (use_skipping_final)
{
auto & chunk = sources[current_source_index].chunk;
if (selected_row.owned_chunk.get() == chunk.get())
{
/// selected_row is the last row (or the row before last row) of chunk, so we cannot emit the chunk now.
/// But after this function, queue.removeTop() will destroy the chunk's cursor, so we mark `selected_row.current_cursor` to `nullptr`
/// to indicate that `selected_row` is now the sole owner of the chunk
/// Later when we change the value of `selected_row`, if `selected_row` is the sole owner of its chunk and the chunk has selected rows,
/// we will emit it
selected_row.current_cursor = nullptr;
}
else
{
/// Otherwise, its safe to emit the chunk
if (chunk->replace_final_selection)
to_be_emitted.push(std::move(chunk));
}
}
}
IMergingAlgorithm::Status ReplacingSortedAlgorithm::emitChunk(detail::SharedChunkPtr & chunk, bool finished)
{
chunk->setChunkInfo(std::make_shared<ChunkSelectFinalIndices>(std::move(chunk->replace_final_selection)));

View File

@ -52,7 +52,9 @@ private:
ssize_t version_column_number = -1;
bool cleanup = false;
size_t * cleanedup_rows_count = nullptr;
bool use_skipping_final;
bool use_skipping_final; /// Either we use skipping final algorithm
std::queue<detail::SharedChunkPtr> to_be_emitted; /// To save chunks when using skipping final
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 2; /// last, current.
@ -62,7 +64,10 @@ private:
/// Sources of rows with the current primary key.
PODArray<RowSourcePart> current_row_sources;
detail::SharedChunkPtr insertRow();
void insertRow();
void saveChunkForSkippingFinalFromSource(size_t current_source_index);
void saveChunkForSkippingFinalFromSelectedRow();
static Status emitChunk(detail::SharedChunkPtr & chunk, bool finished = false);
};

View File

@ -35,9 +35,6 @@ struct SharedChunk : Chunk
using Chunk::Chunk;
using Chunk::operator=;
/// Create a new chunk from row `row_num`
boost::intrusive_ptr<detail::SharedChunk> cloneForSelectedRow(size_t row_num);
private:
int refcount = 0;
size_t position = 0;
@ -128,29 +125,6 @@ inline void intrusive_ptr_release(SharedChunk * ptr)
ptr->allocator->release(ptr);
}
inline boost::intrusive_ptr<detail::SharedChunk> SharedChunk::cloneForSelectedRow(size_t row_num)
{
auto columns = cloneEmptyColumns();
ColumnRawPtrs columns_raws;
ColumnRawPtrs sort_columns_raws;
sort_columns_raws.resize(sort_columns.size());
std::map<const IColumn *, size_t> all_previous_sorted_column;
for (size_t i = 0; i < sort_columns.size(); ++i)
all_previous_sorted_column[sort_columns[i]] = i;
for (size_t i = 0; i < all_columns.size(); ++i)
{
columns[i]->insertFrom(*all_columns[i], row_num);
columns_raws.push_back(columns[i].get());
if (auto it = all_previous_sorted_column.find(all_columns[i]); it != all_previous_sorted_column.end())
sort_columns_raws[it->second] = columns[i].get();
}
auto single_chunk = Chunk(std::move(columns), 1);
auto shared_single_chunk = allocator->alloc(single_chunk);
shared_single_chunk->all_columns = std::move(columns_raws);
shared_single_chunk->sort_columns = std::move(sort_columns_raws);
return shared_single_chunk;
}
/// This class represents a row in a chunk.
struct RowRef
{

View File

@ -211,8 +211,6 @@ public:
void applyFilters() override;
bool useSkippingFinal() const { return use_skipping_final; }
private:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,

View File

@ -3,6 +3,13 @@
#include <memory>
#include <Processors/ISimpleTransform.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Common/Exception.h>
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
{
@ -21,12 +28,12 @@ public:
size_t num_rows = chunk.getNumRows();
auto select_final_indices_info = std::dynamic_pointer_cast<const ChunkSelectFinalIndices>(chunk.getChunkInfo());
if (!select_final_indices_info)
return;
if (!select_final_indices_info || !select_final_indices_info->select_final_indices)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk passed to SelectByIndicesTransform without indices column");
const auto & index_column = select_final_indices_info->select_final_indices;
if (index_column && index_column->size() != num_rows)
if (index_column->size() != num_rows)
{
auto columns = chunk.detachColumns();
for (auto & column : columns)