2022-04-19 08:07:30 +00:00
|
|
|
#include <memory>
|
|
|
|
#include <mutex>
|
2022-06-13 12:22:15 +00:00
|
|
|
#include <Columns/ColumnSparse.h>
|
2022-04-19 08:53:24 +00:00
|
|
|
#include <Columns/FilterDescription.h>
|
|
|
|
#include <Columns/IColumn.h>
|
|
|
|
#include <Core/ColumnsWithTypeAndName.h>
|
|
|
|
#include <Core/NamesAndTypes.h>
|
|
|
|
#include <Interpreters/ConcurrentHashJoin.h>
|
2022-04-19 08:07:30 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2022-04-19 08:53:24 +00:00
|
|
|
#include <Interpreters/ExpressionActions.h>
|
|
|
|
#include <Interpreters/PreparedSets.h>
|
|
|
|
#include <Interpreters/SubqueryForSet.h>
|
|
|
|
#include <Interpreters/TableJoin.h>
|
2022-04-21 08:59:30 +00:00
|
|
|
#include <Interpreters/createBlockSelector.h>
|
2022-04-19 08:53:24 +00:00
|
|
|
#include <Parsers/DumpASTNode.h>
|
|
|
|
#include <Parsers/ExpressionListParsers.h>
|
|
|
|
#include <Parsers/IAST_fwd.h>
|
|
|
|
#include <Parsers/parseQuery.h>
|
|
|
|
#include <Common/Exception.h>
|
2022-05-06 15:17:46 +00:00
|
|
|
#include <Common/WeakHash.h>
|
2022-04-21 08:59:30 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2022-05-06 15:17:46 +00:00
|
|
|
|
2022-04-19 08:07:30 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
2022-05-06 15:17:46 +00:00
|
|
|
|
2022-04-19 08:07:30 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int SET_SIZE_LIMIT_EXCEEDED;
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
}
|
2022-05-06 15:17:46 +00:00
|
|
|
|
2022-06-14 12:13:39 +00:00
|
|
|
static UInt32 toPowerOfTwo(UInt32 x)
|
|
|
|
{
|
|
|
|
x = x - 1;
|
|
|
|
for (UInt32 i = 1; i < sizeof(UInt32) * 8; i <<= 1)
|
|
|
|
x = x | x >> i;
|
|
|
|
return x + 1;
|
|
|
|
}
|
|
|
|
|
2022-05-05 04:04:11 +00:00
|
|
|
ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_)
|
2022-04-19 08:07:30 +00:00
|
|
|
: context(context_)
|
|
|
|
, table_join(table_join_)
|
|
|
|
, slots(slots_)
|
|
|
|
{
|
2022-06-14 12:13:39 +00:00
|
|
|
slots = std::min<size_t>(std::max<size_t>(1, slots), 255);
|
|
|
|
slots = toPowerOfTwo(slots);
|
2022-04-19 08:07:30 +00:00
|
|
|
|
|
|
|
for (size_t i = 0; i < slots; ++i)
|
|
|
|
{
|
2022-04-21 04:14:36 +00:00
|
|
|
auto inner_hash_join = std::make_shared<InternalHashJoin>();
|
2022-04-19 08:07:30 +00:00
|
|
|
inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_);
|
|
|
|
hash_joins.emplace_back(std::move(inner_hash_join));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-06 15:17:46 +00:00
|
|
|
bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool check_limits)
|
2022-04-19 08:07:30 +00:00
|
|
|
{
|
2022-05-06 15:17:46 +00:00
|
|
|
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_right, right_block);
|
2022-04-21 05:19:33 +00:00
|
|
|
|
2022-05-10 12:09:49 +00:00
|
|
|
size_t blocks_left = 0;
|
2022-05-06 15:17:46 +00:00
|
|
|
for (const auto & block : dispatched_blocks)
|
|
|
|
{
|
|
|
|
if (block)
|
|
|
|
{
|
|
|
|
++blocks_left;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
while (blocks_left > 0)
|
2022-04-19 08:07:30 +00:00
|
|
|
{
|
2022-05-06 15:17:46 +00:00
|
|
|
/// insert blocks into corresponding HashJoin instances
|
|
|
|
for (size_t i = 0; i < dispatched_blocks.size(); ++i)
|
2022-04-21 05:19:33 +00:00
|
|
|
{
|
|
|
|
auto & hash_join = hash_joins[i];
|
|
|
|
auto & dispatched_block = dispatched_blocks[i];
|
2022-05-10 12:09:49 +00:00
|
|
|
|
|
|
|
if (dispatched_block)
|
2022-04-21 05:19:33 +00:00
|
|
|
{
|
2022-05-06 15:17:46 +00:00
|
|
|
/// if current hash_join is already processed by another thread, skip it and try later
|
|
|
|
std::unique_lock<std::mutex> lock(hash_join->mutex, std::try_to_lock);
|
|
|
|
if (!lock.owns_lock())
|
|
|
|
continue;
|
2022-04-21 04:14:36 +00:00
|
|
|
|
2022-05-06 15:17:46 +00:00
|
|
|
bool limit_exceeded = !hash_join->data->addJoinedBlock(dispatched_block, check_limits);
|
|
|
|
|
|
|
|
dispatched_block = {};
|
|
|
|
blocks_left--;
|
|
|
|
|
|
|
|
if (limit_exceeded)
|
|
|
|
return false;
|
2022-04-21 05:19:33 +00:00
|
|
|
}
|
|
|
|
}
|
2022-04-19 08:07:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (check_limits)
|
2022-04-21 05:19:33 +00:00
|
|
|
return table_join->sizeLimits().check(getTotalRowCount(), getTotalByteCount(), "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
|
2022-04-19 08:07:30 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2022-04-21 05:19:33 +00:00
|
|
|
void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> & /*not_processed*/)
|
2022-04-19 08:07:30 +00:00
|
|
|
{
|
2022-05-05 04:04:11 +00:00
|
|
|
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, block);
|
2022-04-19 08:07:30 +00:00
|
|
|
for (size_t i = 0; i < dispatched_blocks.size(); ++i)
|
|
|
|
{
|
|
|
|
std::shared_ptr<ExtraBlock> none_extra_block;
|
|
|
|
auto & hash_join = hash_joins[i];
|
|
|
|
auto & dispatched_block = dispatched_blocks[i];
|
|
|
|
hash_join->data->joinBlock(dispatched_block, none_extra_block);
|
|
|
|
if (none_extra_block && !none_extra_block->empty())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "not_processed should be empty");
|
|
|
|
}
|
|
|
|
|
2022-04-21 06:10:09 +00:00
|
|
|
block = concatenateBlocks(dispatched_blocks);
|
2022-04-19 08:07:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const
|
|
|
|
{
|
|
|
|
hash_joins[0]->data->checkTypesOfKeys(block);
|
|
|
|
}
|
|
|
|
|
|
|
|
void ConcurrentHashJoin::setTotals(const Block & block)
|
|
|
|
{
|
|
|
|
if (block)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(totals_mutex);
|
|
|
|
totals = block;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const Block & ConcurrentHashJoin::getTotals() const
|
|
|
|
{
|
|
|
|
return totals;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t ConcurrentHashJoin::getTotalRowCount() const
|
|
|
|
{
|
|
|
|
size_t res = 0;
|
|
|
|
for (const auto & hash_join : hash_joins)
|
|
|
|
{
|
2022-04-25 08:16:50 +00:00
|
|
|
std::lock_guard lock(hash_join->mutex);
|
2022-04-19 08:07:30 +00:00
|
|
|
res += hash_join->data->getTotalRowCount();
|
|
|
|
}
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t ConcurrentHashJoin::getTotalByteCount() const
|
|
|
|
{
|
|
|
|
size_t res = 0;
|
|
|
|
for (const auto & hash_join : hash_joins)
|
|
|
|
{
|
2022-04-25 08:16:50 +00:00
|
|
|
std::lock_guard lock(hash_join->mutex);
|
2022-04-19 08:07:30 +00:00
|
|
|
res += hash_join->data->getTotalByteCount();
|
|
|
|
}
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool ConcurrentHashJoin::alwaysReturnsEmptySet() const
|
|
|
|
{
|
|
|
|
for (const auto & hash_join : hash_joins)
|
|
|
|
{
|
2022-04-25 08:16:50 +00:00
|
|
|
std::lock_guard lock(hash_join->mutex);
|
2022-04-21 05:19:33 +00:00
|
|
|
if (!hash_join->data->alwaysReturnsEmptySet())
|
2022-04-19 08:07:30 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::shared_ptr<NotJoinedBlocks> ConcurrentHashJoin::getNonJoinedBlocks(
|
|
|
|
const Block & /*left_sample_block*/, const Block & /*result_sample_block*/, UInt64 /*max_block_size*/) const
|
|
|
|
{
|
|
|
|
if (table_join->strictness() == ASTTableJoin::Strictness::Asof ||
|
|
|
|
table_join->strictness() == ASTTableJoin::Strictness::Semi ||
|
|
|
|
!isRightOrFull(table_join->kind()))
|
|
|
|
{
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid join type. join kind: {}, strictness: {}", table_join->kind(), table_join->strictness());
|
|
|
|
}
|
|
|
|
|
2022-05-06 15:17:46 +00:00
|
|
|
static IColumn::Selector hashToSelector(const WeakHash32 & hash, size_t num_shards)
|
2022-04-19 08:07:30 +00:00
|
|
|
{
|
2022-06-14 12:13:39 +00:00
|
|
|
assert(num_shards > 0 && (num_shards & (num_shards - 1)) == 0);
|
2022-05-06 15:17:46 +00:00
|
|
|
const auto & data = hash.getData();
|
|
|
|
size_t num_rows = data.size();
|
|
|
|
|
|
|
|
IColumn::Selector selector(num_rows);
|
|
|
|
for (size_t i = 0; i < num_rows; ++i)
|
2022-06-14 12:13:39 +00:00
|
|
|
selector[i] = data[i] & (num_shards - 1);
|
2022-05-06 15:17:46 +00:00
|
|
|
return selector;
|
|
|
|
}
|
2022-05-05 04:04:11 +00:00
|
|
|
|
2022-05-06 15:17:46 +00:00
|
|
|
Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, const Block & from_block)
|
|
|
|
{
|
2022-05-05 04:04:11 +00:00
|
|
|
size_t num_shards = hash_joins.size();
|
|
|
|
size_t num_rows = from_block.rows();
|
|
|
|
size_t num_cols = from_block.columns();
|
|
|
|
|
2022-05-06 15:17:46 +00:00
|
|
|
WeakHash32 hash(num_rows);
|
2022-05-05 04:04:11 +00:00
|
|
|
for (const auto & key_name : key_columns_names)
|
2022-04-19 08:07:30 +00:00
|
|
|
{
|
2022-06-13 12:22:15 +00:00
|
|
|
const auto & key_col = from_block.getByName(key_name).column->convertToFullColumnIfConst();
|
|
|
|
const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col));
|
2022-06-07 15:10:41 +00:00
|
|
|
key_col_no_lc->updateWeakHash32(hash);
|
2022-04-21 08:59:30 +00:00
|
|
|
}
|
2022-05-06 15:17:46 +00:00
|
|
|
auto selector = hashToSelector(hash, num_shards);
|
2022-05-05 04:04:11 +00:00
|
|
|
|
2022-05-06 15:17:46 +00:00
|
|
|
Blocks result;
|
2022-05-05 04:04:11 +00:00
|
|
|
for (size_t i = 0; i < num_shards; ++i)
|
2022-04-21 08:59:30 +00:00
|
|
|
{
|
2022-05-05 04:04:11 +00:00
|
|
|
result.emplace_back(from_block.cloneEmpty());
|
2022-04-21 08:59:30 +00:00
|
|
|
}
|
|
|
|
|
2022-05-05 04:04:11 +00:00
|
|
|
for (size_t i = 0; i < num_cols; ++i)
|
2022-04-21 08:59:30 +00:00
|
|
|
{
|
|
|
|
auto dispatched_columns = from_block.getByPosition(i).column->scatter(num_shards, selector);
|
2022-05-05 04:04:11 +00:00
|
|
|
assert(result.size() == dispatched_columns.size());
|
2022-04-21 08:59:30 +00:00
|
|
|
for (size_t block_index = 0; block_index < num_shards; ++block_index)
|
|
|
|
{
|
2022-05-05 04:04:11 +00:00
|
|
|
result[block_index].getByPosition(i).column = std::move(dispatched_columns[block_index]);
|
2022-04-19 08:07:30 +00:00
|
|
|
}
|
|
|
|
}
|
2022-05-05 04:04:11 +00:00
|
|
|
return result;
|
2022-04-19 08:07:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|