This commit is contained in:
Nikita Taranov 2024-05-28 15:08:21 +01:00
parent 97376119dd
commit d529ff911c
2 changed files with 19 additions and 20 deletions

View File

@ -3,6 +3,7 @@
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/ConcurrentHashJoin.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
@ -13,14 +14,13 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/parseQuery.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/WeakHash.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/ThreadPool.h>
#include <memory>
#include <mutex>
#include <Common/WeakHash.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
namespace CurrentMetrics
{
@ -50,11 +50,11 @@ ConcurrentHashJoin::ConcurrentHashJoin(
: context(context_)
, table_join(table_join_)
, slots(toPowerOfTwo(std::min<UInt32>(static_cast<UInt32>(slots_), 256)))
, pool(
, pool(std::make_unique<ThreadPool>(
CurrentMetrics::ConcurrentHashJoinPoolThreads,
CurrentMetrics::ConcurrentHashJoinPoolThreadsActive,
CurrentMetrics::ConcurrentHashJoinPoolThreadsScheduled,
slots)
slots))
{
hash_joins.resize(slots);
@ -62,7 +62,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(
{
for (size_t i = 0; i < slots; ++i)
{
pool.trySchedule(
pool->scheduleOrThrow(
[&, idx = i, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE({
@ -72,11 +72,9 @@ ConcurrentHashJoin::ConcurrentHashJoin(
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("ConcurrentJoin");
auto inner_hash_join = std::make_shared<InternalHashJoin>();
inner_hash_join->data = std::make_unique<HashJoin>(
table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", idx));
/// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
@ -85,13 +83,12 @@ ConcurrentHashJoin::ConcurrentHashJoin(
hash_joins[idx] = std::move(inner_hash_join);
});
}
pool.wait();
pool->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
pool.wait();
pool->wait();
throw;
}
}
@ -102,7 +99,10 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
{
for (size_t i = 0; i < slots; ++i)
{
pool.trySchedule(
// Hash tables destruction may be very time-consuming.
// Without the following code, they would be destroyed in the current thread (i.e. sequentially).
// `InternalHashJoin` is moved here and will be destroyed in the destructor of the lambda function.
pool->scheduleOrThrow(
[join = std::move(hash_joins[i]), thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE({
@ -112,17 +112,15 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("ConcurrentJoin");
});
}
pool.wait();
pool->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
pool.wait();
pool->wait();
}
}

View File

@ -10,6 +10,7 @@
#include <base/defines.h>
#include <base/types.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool_fwd.h>
namespace DB
{
@ -66,7 +67,7 @@ private:
ContextPtr context;
std::shared_ptr<TableJoin> table_join;
size_t slots;
ThreadPool pool;
std::unique_ptr<ThreadPool> pool;
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
std::mutex totals_mutex;