mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
78 lines
2.6 KiB
C++
78 lines
2.6 KiB
C++
#pragma once
|
|
|
|
#include <condition_variable>
|
|
#include <memory>
|
|
#include <optional>
|
|
#include <Interpreters/Context.h>
|
|
#include <Interpreters/ExpressionActions.h>
|
|
#include <Interpreters/HashJoin.h>
|
|
#include <Interpreters/IJoin.h>
|
|
#include <base/defines.h>
|
|
#include <base/types.h>
|
|
#include <Common/Stopwatch.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
/**
|
|
* Can run addBlockToJoin() parallelly to speedup the join process. On test, it almose linear speedup by
|
|
* the degree of parallelism.
|
|
*
|
|
* The default HashJoin is not thread safe for inserting right table's rows and run it in a single thread. When
|
|
* the right table is large, the join process is too slow.
|
|
*
|
|
* We create multiple HashJoin instances here. In addBlockToJoin(), one input block is split into multiple blocks
|
|
* corresponding to the HashJoin instances by hashing every row on the join keys. And make a guarantee that every HashJoin
|
|
* instance is written by only one thread.
|
|
*
|
|
* When come to the left table matching, the blocks from left table are alse split into different HashJoin instances.
|
|
*
|
|
*/
|
|
class ConcurrentHashJoin : public IJoin
|
|
{
|
|
|
|
public:
|
|
explicit ConcurrentHashJoin(
|
|
ContextPtr context_,
|
|
std::shared_ptr<TableJoin> table_join_,
|
|
size_t slots_,
|
|
const Block & right_sample_block,
|
|
bool any_take_last_row_ = false);
|
|
|
|
~ConcurrentHashJoin() override = default;
|
|
|
|
std::string getName() const override { return "ConcurrentHashJoin"; }
|
|
const TableJoin & getTableJoin() const override { return *table_join; }
|
|
bool addBlockToJoin(const Block & block, bool check_limits) override;
|
|
void checkTypesOfKeys(const Block & block) const override;
|
|
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
|
|
void setTotals(const Block & block) override;
|
|
const Block & getTotals() const override;
|
|
size_t getTotalRowCount() const override;
|
|
size_t getTotalByteCount() const override;
|
|
bool alwaysReturnsEmptySet() const override;
|
|
bool supportParallelJoin() const override { return true; }
|
|
IBlocksStreamPtr
|
|
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
|
|
|
|
private:
|
|
struct InternalHashJoin
|
|
{
|
|
std::mutex mutex;
|
|
std::unique_ptr<HashJoin> data;
|
|
};
|
|
|
|
ContextPtr context;
|
|
std::shared_ptr<TableJoin> table_join;
|
|
size_t slots;
|
|
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
|
|
|
|
std::mutex totals_mutex;
|
|
Block totals;
|
|
|
|
IColumn::Selector selectDispatchBlock(const Strings & key_columns_names, const Block & from_block);
|
|
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
|
|
};
|
|
|
|
}
|