stash doesn't work

This commit is contained in:
Nikita Taranov 2024-08-09 22:07:29 +01:00
parent eb8af558f8
commit 662c67dcc0
6 changed files with 171 additions and 27 deletions

View File

@ -55,7 +55,7 @@ void updateStatistics(const auto & hash_joins, const DB::StatsCollectingParams &
DB::getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
}
Block concatenateBlocks(const HashJoin::ScatteredBlocks & blocks)
Block concatenateBlocks(const ScatteredBlocks & blocks)
{
Blocks inner_blocks;
for (const auto & block : blocks)
@ -332,7 +332,7 @@ IColumn::Selector selectDispatchBlock(size_t num_shards, const Strings & key_col
return hashToSelector(hash, num_shards);
}
HashJoin::ScatteredBlocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block)
ScatteredBlocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block)
{
size_t num_shards = hash_joins.size();
IColumn::Selector selector = selectDispatchBlock(num_shards, key_columns_names, from_block);
@ -344,7 +344,7 @@ HashJoin::ScatteredBlocks ConcurrentHashJoin::dispatchBlock(const Strings & key_
const size_t shard = selector[i];
selectors[shard].push_back(i);
}
HashJoin::ScatteredBlocks result;
ScatteredBlocks result;
result.reserve(num_shards);
for (size_t i = 0; i < num_shards; ++i)
result.emplace_back(from_block, std::move(selectors[i]));

View File

@ -78,7 +78,7 @@ private:
std::mutex totals_mutex;
Block totals;
HashJoin::ScatteredBlocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
ScatteredBlocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
};
UInt64 calculateCacheKey(std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression);

View File

@ -4,7 +4,7 @@
namespace DB
{
JoinOnKeyColumns::JoinOnKeyColumns(
const HashJoin::ScatteredBlock & block_, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
const ScatteredBlock & block_, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_)
: block(block_)
, key_names(key_names_)
/// Rare case, when keys are constant or low cardinality. To avoid code bloat, simply materialize them.

View File

@ -14,7 +14,7 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct JoinOnKeyColumns
{
const HashJoin::ScatteredBlock & block;
const ScatteredBlock & block;
Names key_names;
@ -30,7 +30,7 @@ struct JoinOnKeyColumns
Sizes key_sizes;
JoinOnKeyColumns(
const HashJoin::ScatteredBlock & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
const ScatteredBlock & block, const Names & key_names_, const String & cond_column_name, const Sizes & key_sizes_);
bool isRowFiltered(size_t i) const
{
@ -62,7 +62,7 @@ public:
};
AddedColumns(
const HashJoin::ScatteredBlock & left_block_,
const ScatteredBlock & left_block_,
const Block & block_with_columns_to_add,
const Block & saved_block_sample,
const HashJoin & join,
@ -142,7 +142,7 @@ public:
const IColumn & leftAsofKey() const { return *left_asof_key; }
const HashJoin::ScatteredBlock & src_block;
const ScatteredBlock & src_block;
Block left_block;
std::vector<JoinOnKeyColumns> join_on_keys;
ExpressionActionsPtr additional_filter_expression;

View File

@ -11,6 +11,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include "Interpreters/HashJoin/ScatteredBlock.h"
namespace DB
@ -77,7 +78,7 @@ public:
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
Block * stored_block,
const IColumn::Selector & selector,
const ScatteredBlock::Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,
@ -113,7 +114,7 @@ public:
const MapsTemplateVector & maps_,
bool is_join_get = false)
{
HashJoin::ScatteredBlock scattered_block{block};
ScatteredBlock scattered_block{block};
auto ret = joinBlockImpl(join, scattered_block, block_with_columns_to_add, maps_, is_join_get);
ret.filterBySelector();
scattered_block.filterBySelector();
@ -121,9 +122,9 @@ public:
return ret.getSourceBlock();
}
static HashJoin::ScatteredBlock joinBlockImpl(
static ScatteredBlock joinBlockImpl(
const HashJoin & join,
HashJoin::ScatteredBlock & block,
ScatteredBlock & block,
const Block & block_with_columns_to_add,
const MapsTemplateVector & maps_,
bool is_join_get = false)
@ -247,7 +248,7 @@ private:
const ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
Block * stored_block,
const IColumn::Selector & selector,
const ScatteredBlock::Selector & selector,
ConstNullMapPtr null_map,
UInt8ColumnDataPtr join_mask,
Arena & pool,

View File

@ -3,24 +3,161 @@
#include <Columns/IColumn.h>
#include <Core/Block.h>
#include <Common/PODArray.h>
#include "base/defines.h"
#include <boost/math/distributions/fwd.hpp>
#include <boost/noncopyable.hpp>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace detail
{
class Selector
{
public:
using Range = std::pair<size_t, size_t>;
/// [begin, end)
Selector(size_t begin, size_t end) : data(Range{begin, end}) { }
Selector() : Selector(0, 0) { }
Selector(IColumn::Selector && selector_) : data(initializeFromSelector(std::move(selector_))) { }
class Iterator
{
public:
using iterator_category = std::forward_iterator_tag;
using value_type = size_t;
using difference_type = std::ptrdiff_t;
using pointer = size_t *;
using reference = size_t &;
Iterator(const Selector & selector_, size_t idx_) : selector(selector_), idx(idx_) { }
size_t operator*() const
{
chassert(idx < selector.size());
if (idx >= selector.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range size {}", idx, selector.size());
return selector[idx];
}
Iterator & operator++()
{
if (idx >= selector.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range size {}", idx, selector.size());
++idx;
return *this;
}
bool operator!=(const Iterator & other) const { return idx != other.idx; }
private:
const Selector & selector;
size_t idx;
};
Iterator begin() const { return Iterator(*this, 0); }
Iterator end() const { return Iterator(*this, size()); }
size_t operator[](size_t idx) const
{
chassert(idx < size());
if (idx >= size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range size {}", idx, size());
if (std::holds_alternative<Range>(data))
{
auto range = std::get<Range>(data);
return range.first + idx;
}
else
{
return std::get<IColumn::Selector>(data)[idx];
}
}
size_t size() const
{
if (std::holds_alternative<Range>(data))
{
auto range = std::get<Range>(data);
return range.second - range.first;
}
else
{
return std::get<IColumn::Selector>(data).size();
}
}
std::pair<Selector, Selector> split(size_t num_rows)
{
if (num_rows > size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range size {}", num_rows, size());
if (std::holds_alternative<Range>(data))
{
auto range = std::get<Range>(data);
if (num_rows == 0)
return {Selector(), Selector{range.first, range.second}};
if (num_rows == size())
return {Selector{range.first, range.second}, Selector()};
return {Selector(range.first, range.first + num_rows), Selector(range.first + num_rows, range.second)};
}
else
{
auto & selector = std::get<IColumn::Selector>(data);
return {
Selector(IColumn::Selector(selector.begin(), selector.begin() + num_rows)),
Selector(IColumn::Selector(selector.begin() + num_rows, selector.end()))};
}
}
private:
using Data = std::variant<Range, IColumn::Selector>;
Data initializeFromSelector(IColumn::Selector && selector)
{
if (selector.empty())
return Range{0, 0};
/// selector represents continuous range
if (selector.back() == selector.front() + selector.size() - 1)
return Range{selector.front(), selector.front() + selector.size()};
return std::move(selector);
}
Data data;
};
}
struct ScatteredBlock : private boost::noncopyable
{
using Selector = detail::Selector;
ScatteredBlock() = default;
explicit ScatteredBlock(Block block_) : block(std::move(block_)), selector(createTrivialSelector(block.rows())) { }
ScatteredBlock(Block block_, IColumn::Selector && selector_) : block(std::move(block_)), selector(std::move(selector_)) { }
ScatteredBlock(Block block_, Selector selector_) : block(std::move(block_)), selector(std::move(selector_)) { }
ScatteredBlock(ScatteredBlock && other) noexcept : block(std::move(other.block)), selector(std::move(other.selector))
{
other.block.clear();
other.selector.clear();
other.selector = {};
}
ScatteredBlock & operator=(ScatteredBlock && other) noexcept
@ -31,7 +168,7 @@ struct ScatteredBlock : private boost::noncopyable
selector = std::move(other.selector);
other.block.clear();
other.selector.clear();
other.selector = {};
}
return *this;
}
@ -67,8 +204,10 @@ struct ScatteredBlock : private boost::noncopyable
void filter(const IColumn::Filter & filter)
{
chassert(block && block.rows() == filter.size());
auto * it = std::remove_if(selector.begin(), selector.end(), [&](size_t idx) { return !filter[idx]; });
selector.resize(std::distance(selector.begin(), it));
IColumn::Selector new_selector;
new_selector.reserve(selector.size());
std::copy_if(selector.begin(), selector.end(), std::back_inserter(new_selector), [&](size_t idx) { return filter[idx]; });
selector = std::move(new_selector);
}
/// Applies selector to block in place
@ -105,10 +244,19 @@ struct ScatteredBlock : private boost::noncopyable
chassert(block);
IColumn::Selector remaining_selector(selector.begin() + num_rows, selector.end());
LOG_DEBUG(&Poco::Logger::get("debug"), "selector=({})", fmt::join(selector, ","));
auto && [first_num_rows, remaining_selector] = selector.split(num_rows);
LOG_DEBUG(
&Poco::Logger::get("debug"),
"first_num_rows=({}), remaining_selector=({})",
fmt::join(first_num_rows, ","),
fmt::join(remaining_selector, ","));
auto remaining = ScatteredBlock{block, std::move(remaining_selector)};
selector.erase(selector.begin() + num_rows, selector.end());
selector = std::move(first_num_rows);
return remaining;
}
@ -135,15 +283,10 @@ struct ScatteredBlock : private boost::noncopyable
}
private:
IColumn::Selector createTrivialSelector(size_t size)
{
IColumn::Selector res(size);
std::iota(res.begin(), res.end(), 0);
return res;
}
Selector createTrivialSelector(size_t size) { return Selector(0, size - 1); }
Block block;
IColumn::Selector selector;
Selector selector;
};
using ScatteredBlocks = std::vector<ScatteredBlock>;