mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 03:12:43 +00:00
2ca47a6eb6
BackgroundSchedulePool is used for some peridic jobs, not from the query context, i.e. flush of Buffer table. And for such jobs there cannot be any query context, and more importantly it will not work correctly since that query_context will eventually expires. And this is the reason of this failures [1]. [1]: https://s3.amazonaws.com/clickhouse-test-reports/46668/015991bc5e20c787851050c2eaa13f0fef3aac00/stateless_tests_flaky_check__asan_.html Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
73 lines
2.6 KiB
C++
73 lines
2.6 KiB
C++
#pragma once
|
|
|
|
#include <condition_variable>
|
|
#include <memory>
|
|
#include <optional>
|
|
#include <Functions/FunctionsLogical.h>
|
|
#include <Interpreters/Context.h>
|
|
#include <Interpreters/ExpressionActions.h>
|
|
#include <Interpreters/HashJoin.h>
|
|
#include <Interpreters/IJoin.h>
|
|
#include <base/defines.h>
|
|
#include <base/types.h>
|
|
#include <Common/Stopwatch.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
/**
|
|
* Can run addJoinedBlock() parallelly to speedup the join process. On test, it almose linear speedup by
|
|
* the degree of parallelism.
|
|
*
|
|
* The default HashJoin is not thread safe for inserting right table's rows and run it in a single thread. When
|
|
* the right table is large, the join process is too slow.
|
|
*
|
|
* We create multiple HashJoin instances here. In addJoinedBlock(), one input block is split into multiple blocks
|
|
* corresponding to the HashJoin instances by hashing every row on the join keys. And make a guarantee that every HashJoin
|
|
* instance is written by only one thread.
|
|
*
|
|
* When come to the left table matching, the blocks from left table are alse split into different HashJoin instances.
|
|
*
|
|
*/
|
|
class ConcurrentHashJoin : public IJoin
|
|
{
|
|
|
|
public:
|
|
explicit ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_ = false);
|
|
~ConcurrentHashJoin() override = default;
|
|
|
|
const TableJoin & getTableJoin() const override { return *table_join; }
|
|
bool addJoinedBlock(const Block & block, bool check_limits) override;
|
|
void checkTypesOfKeys(const Block & block) const override;
|
|
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
|
|
void setTotals(const Block & block) override;
|
|
const Block & getTotals() const override;
|
|
size_t getTotalRowCount() const override;
|
|
size_t getTotalByteCount() const override;
|
|
bool alwaysReturnsEmptySet() const override;
|
|
bool supportParallelJoin() const override { return true; }
|
|
IBlocksStreamPtr
|
|
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
|
|
|
|
private:
|
|
struct InternalHashJoin
|
|
{
|
|
std::mutex mutex;
|
|
std::unique_ptr<HashJoin> data;
|
|
};
|
|
|
|
ContextPtr context;
|
|
std::shared_ptr<TableJoin> table_join;
|
|
size_t slots;
|
|
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
|
|
|
|
std::mutex totals_mutex;
|
|
Block totals;
|
|
|
|
IColumn::Selector selectDispatchBlock(const Strings & key_columns_names, const Block & from_block);
|
|
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
|
|
|
|
};
|
|
|
|
}
|