This commit is contained in:
Nikita Taranov 2024-10-14 14:22:17 +01:00
parent db0f6fd07b
commit 5cf92fe964
2 changed files with 26 additions and 29 deletions

View File

@ -164,8 +164,19 @@ ScatteredBlock HashJoinMethods<KIND, STRICTNESS, MapsTemplate>::joinBlockImpl(
if constexpr (join_features.need_replication)
{
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate;
block.replicate(*offsets_to_replicate, existing_columns, right_keys_to_replicate);
IColumn::Offsets & offsets = *added_columns.offsets_to_replicate;
chassert(block);
chassert(offsets.size() == block.rows());
auto && columns = block.getSourceBlock().getColumns();
for (size_t i = 0; i < existing_columns; ++i)
columns[i] = columns[i]->replicate(offsets);
for (size_t pos : right_keys_to_replicate)
columns[pos] = columns[pos]->replicate(offsets);
block.getSourceBlock().setColumns(columns);
block = ScatteredBlock(std::move(block).getSourceBlock());
}
return remaining_block;
}

View File

@ -1,18 +1,13 @@
#pragma once
#include <memory>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <base/defines.h>
#include <Common/PODArray.h>
#include <boost/math/distributions/fwd.hpp>
#include <boost/noncopyable.hpp>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -24,6 +19,11 @@ extern const int LOGICAL_ERROR;
namespace detail
{
/// Previously ConcurrentHashJoin used IColumn::scatter method to split input blocks to sub-blocks by hash.
/// To avoid copying of columns, we introduce a new class ScatteredBlock that holds a block and a selector.
/// So now each threads get a copy of the source input block and a selector that tells which rows are meant for the given thread.
/// Selector can be seen as just a list of indexes or rows that belong to the given thread.
/// One optimization is to use a continuous range instead of explicit list of indexes when selector contains all indexes from [L, R).
class Selector
{
public:
@ -31,12 +31,11 @@ public:
using Indexes = ColumnUInt64;
using IndexesPtr = ColumnUInt64::MutablePtr;
Selector() : Selector(0, 0) { }
/// [begin, end)
Selector(size_t begin, size_t end) : data(Range{begin, end}) { }
Selector() : Selector(0, 0) { }
explicit Selector(size_t size) : Selector(0, size) { }
explicit Selector(IndexesPtr && selector_) : data(initializeFromSelector(std::move(selector_))) { }
class Iterator
@ -105,6 +104,7 @@ public:
}
}
/// First selector contains first `num_rows` rows, second selector contains the rest
std::pair<Selector, Selector> split(size_t num_rows)
{
chassert(num_rows <= size());
@ -179,6 +179,7 @@ private:
}
/// Source block + list of selected rows. See detail::Selector for more details.
struct ScatteredBlock : private boost::noncopyable
{
using Selector = detail::Selector;
@ -224,7 +225,7 @@ struct ScatteredBlock : private boost::noncopyable
/// Accounts only selected rows
size_t rows() const { return selector.size(); }
/// Whether block was scattered, i.e. has non-trivial selector
/// Whether `block` was scattered, i.e. `selector` != [0, block.rows())
bool wasScattered() const
{
chassert(block);
@ -248,7 +249,7 @@ struct ScatteredBlock : private boost::noncopyable
selector = Selector(std::move(new_selector));
}
/// Applies selector to block in place
/// Applies `selector` to the `block` in-place
void filterBySelector()
{
if (!block || !wasScattered())
@ -266,7 +267,7 @@ struct ScatteredBlock : private boost::noncopyable
return;
}
/// The general case when selector is non-trivial (likely the result of applying a filter)
/// The general case when `selector` is non-trivial (likely the result of applying a filter)
auto columns = block.getColumns();
for (auto & col : columns)
col = col->index(selector.getIndexes(), /*limit*/ 0);
@ -274,7 +275,7 @@ struct ScatteredBlock : private boost::noncopyable
selector = Selector(block.rows());
}
/// Cut first num_rows rows from block in place and returns block with remaining rows
/// Cut first `num_rows` rows from `block` in place and returns block with remaining rows
ScatteredBlock cut(size_t num_rows)
{
SCOPE_EXIT(filterBySelector());
@ -293,21 +294,6 @@ struct ScatteredBlock : private boost::noncopyable
return remaining;
}
void replicate(const IColumn::Offsets & offsets, size_t existing_columns, const std::vector<size_t> & right_keys_to_replicate)
{
chassert(block);
chassert(offsets.size() == rows());
auto && columns = block.getColumns();
for (size_t i = 0; i < existing_columns; ++i)
columns[i] = columns[i]->replicate(offsets);
for (size_t pos : right_keys_to_replicate)
columns[pos] = columns[pos]->replicate(offsets);
block.setColumns(columns);
selector = Selector(block.rows());
}
private:
Block block;
Selector selector;