Something

This commit is contained in:
Alexey Milovidov 2024-10-21 02:09:15 +02:00
parent bb3bfa536a
commit 0f3f15338d
4 changed files with 20 additions and 16 deletions

View File

@ -122,7 +122,7 @@ public:
void scheduleOrThrowOnError(Job job, Priority priority = {});
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, Priority priority = {}, uint64_t wait_microseconds = 0) noexcept;
[[nodiscard]] bool trySchedule(Job job, Priority priority = {}, uint64_t wait_microseconds = 0) noexcept;
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, Priority priority = {}, uint64_t wait_microseconds = 0, bool propagate_opentelemetry_tracing_context = true);
@ -142,7 +142,7 @@ public:
/// Returns true if the pool already terminated
/// (and any further scheduling will produce CANNOT_SCHEDULE_TASK exception)
bool finished() const;
[[nodiscard]] bool finished() const;
void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value);

View File

@ -46,7 +46,7 @@ void ParallelCompressedWriteBuffer::nextImpl()
current_buffer->sequence_num = current_sequence_num;
++current_sequence_num;
current_buffer->uncompressed_size = offset();
pool.trySchedule([this, my_current_buffer = current_buffer, thread_group = CurrentThread::getGroup()]
pool.scheduleOrThrowOnError([this, my_current_buffer = current_buffer, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)

View File

@ -334,22 +334,26 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
if (container.empty())
return;
pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (!pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictDtor");
/// Do not account memory that was occupied by the dictionaries for the query/user context.
clearContainer(container);
}))
{
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictDtor");
clearContainer(container);
});
}
++hash_tables_count;
};

View File

@ -107,7 +107,7 @@ struct ManyAggregatedData
if (variant->aggregator)
{
// variant is moved here and will be destroyed in the destructor of the lambda function.
pool->trySchedule(
pool->scheduleOrThrowOnError(
[my_variant = std::move(variant), thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(