Fixing race.

This commit is contained in:
Nikolai Kochetov 2022-09-22 17:57:04 +00:00
parent 446453bdf5
commit 00965ce17a
2 changed files with 15 additions and 5 deletions

View File

@ -228,6 +228,11 @@ Chain InterpreterInsertQuery::buildChainImpl(
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
std::atomic_uint64_t * elapsed_counter_ms) std::atomic_uint64_t * elapsed_counter_ms)
{ {
ThreadStatus * thread_status = current_thread;
if (!thread_status_holder)
thread_status = nullptr;
auto context_ptr = getContext(); auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr; const ASTInsertQuery * query = nullptr;
if (query_ptr) if (query_ptr)
@ -247,7 +252,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
if (table->noPushingToViews() && !no_destination) if (table->noPushingToViews() && !no_destination)
{ {
auto sink = table->write(query_ptr, metadata_snapshot, context_ptr); auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);
sink->setRuntimeData(current_thread, elapsed_counter_ms); sink->setRuntimeData(thread_status, elapsed_counter_ms);
out.addSource(std::move(sink)); out.addSource(std::move(sink));
} }
else else
@ -290,7 +295,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0)); table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0));
} }
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), current_thread, getContext()->getQuota()); auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), thread_status, getContext()->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement()); counting->setProcessListElement(context_ptr->getProcessListElement());
out.addSource(std::move(counting)); out.addSource(std::move(counting));

View File

@ -199,8 +199,13 @@ Chain buildPushingToViewsChain(
checkStackSize(); checkStackSize();
Chain result_chain; Chain result_chain;
ThreadStatus * thread_status = current_thread;
if (!thread_status_holder) if (!thread_status_holder)
{
thread_status_holder = std::make_shared<ThreadStatusesHolder>(); thread_status_holder = std::make_shared<ThreadStatusesHolder>();
thread_status = nullptr;
}
/// If we don't write directly to the destination /// If we don't write directly to the destination
/// then expect that we're inserting with precalculated virtual columns /// then expect that we're inserting with precalculated virtual columns
@ -409,13 +414,13 @@ Chain buildPushingToViewsChain(
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get())) if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
{ {
auto sink = std::make_shared<PushingToLiveViewSink>(live_view_header, *live_view, storage, context); auto sink = std::make_shared<PushingToLiveViewSink>(live_view_header, *live_view, storage, context);
sink->setRuntimeData(current_thread, elapsed_counter_ms); sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink)); result_chain.addSource(std::move(sink));
} }
else if (auto * window_view = dynamic_cast<StorageWindowView *>(storage.get())) else if (auto * window_view = dynamic_cast<StorageWindowView *>(storage.get()))
{ {
auto sink = std::make_shared<PushingToWindowViewSink>(window_view->getInputHeader(), *window_view, storage, context); auto sink = std::make_shared<PushingToWindowViewSink>(window_view->getInputHeader(), *window_view, storage, context);
sink->setRuntimeData(current_thread, elapsed_counter_ms); sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink)); result_chain.addSource(std::move(sink));
} }
/// Do not push to destination table if the flag is set /// Do not push to destination table if the flag is set
@ -423,7 +428,7 @@ Chain buildPushingToViewsChain(
{ {
auto sink = storage->write(query_ptr, metadata_snapshot, context); auto sink = storage->write(query_ptr, metadata_snapshot, context);
metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName()); metadata_snapshot->check(sink->getHeader().getColumnsWithTypeAndName());
sink->setRuntimeData(current_thread, elapsed_counter_ms); sink->setRuntimeData(thread_status, elapsed_counter_ms);
result_chain.addSource(std::move(sink)); result_chain.addSource(std::move(sink));
} }