Fix a couple of bugs

- Don't use threads if there is only one dependent view and parallel_view
- Reset the current_thread when destroying views ThreadStatus
This commit is contained in:
Raúl Marín 2021-06-23 16:14:30 +02:00
parent 7c1fcc94b0
commit eb994d8f04
5 changed files with 33 additions and 25 deletions

View File

@ -31,8 +31,8 @@ Columns:
- `written_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written rows.
- `written_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Number of written bytes.
- `peak_memory_usage` ([Int64](../../sql-reference/data-types/int-uint.md)) — The maximum difference between the amount of allocated and freed memory in context of this view.
- `ProfileEvents.Names` ([Array(String)](../../sql-reference/data-types/array.md)) — Counters that measure different metrics for this thread. The description of them could be found in the table [system.events](#system_tables-events).
- `ProfileEvents.Values` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Values of metrics for this thread that are listed in the `ProfileEvents.Names` column.
- `ProfileEvents.Names` ([Array(String)](../../sql-reference/data-types/array.md)) — Counters that measure different metrics for this view. The description of them could be found in the table [system.events](#system_tables-events). It does not include events of views dependent on this one.
- `ProfileEvents.Values` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Values of metrics for this view that are listed in the `ProfileEvents.Names` column.
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the view. Values:
- `'Init' = 1` — The view was cancelled before writing anything to storage.
- `'WrittenPrefix' = 2` — The view was cancelled after writing its prefix to storage.

View File

@ -78,8 +78,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value);
}
auto thread_group = CurrentThread::getGroup();
for (const auto & database_table : dependencies)
{
auto dependent_table = DatabaseCatalog::instance().getTable(database_table, getContext());
@ -137,14 +135,18 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
out = std::make_shared<PushingToViewsBlockOutputStream>(
dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr());
auto * main_thread = current_thread;
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * running_thread = current_thread;
auto thread_status = std::make_shared<ThreadStatus>();
current_thread = main_thread;
thread_status->attachQueryContext(getContext());
QueryViewsLogElement::ViewRuntimeStats runtime_stats{
target_name, type, thread_status, 0, std::chrono::system_clock::now(), QueryViewsLogElement::ViewStatus::INIT};
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
current_thread = running_thread;
}
/// Do not push to destination table if the flag is set
@ -159,6 +161,14 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
}
}
PushingToViewsBlockOutputStream::~PushingToViewsBlockOutputStream()
{
/// ThreadStatus destructor modifies current_thread and we don't want that
auto * running_thread = current_thread;
views.clear();
current_thread = running_thread;
}
Block PushingToViewsBlockOutputStream::getHeader() const
{
@ -200,12 +210,11 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
return;
const Settings & settings = getContext()->getSettingsRef();
const size_t max_threads = settings.parallel_view_processing ? settings.max_threads : 1;
const size_t max_threads = std::min(views.size(), (settings.parallel_view_processing ? static_cast<size_t>(settings.max_threads) : 1));
bool exception_happened = false;
if (max_threads > 1)
{
ThreadPool pool(std::min(max_threads, views.size()));
auto thread_group = CurrentThread::getGroup();
ThreadPool pool(max_threads);
std::atomic_uint8_t exception_count = 0;
for (auto & view : views)
{
@ -266,12 +275,11 @@ void PushingToViewsBlockOutputStream::writeSuffix()
/// In could have been done in PushingToViewsBlockOutputStream::process, however
/// it is not good if insert into main table fail but into view succeed.
const Settings & settings = getContext()->getSettingsRef();
const size_t max_threads = settings.parallel_view_processing ? settings.max_threads : 1;
const size_t max_threads = std::min(views.size(), (settings.parallel_view_processing ? static_cast<size_t>(settings.max_threads) : 1));
bool exception_happened = false;
if (max_threads > 1)
{
ThreadPool pool(std::min(max_threads, views.size()));
auto thread_group = CurrentThread::getGroup();
ThreadPool pool(max_threads);
std::atomic_uint8_t exception_count = 0;
for (auto & view : views)
{
@ -313,8 +321,6 @@ void PushingToViewsBlockOutputStream::writeSuffix()
void PushingToViewsBlockOutputStream::flush()
{
LOG_DEBUG(log, "{} FLUSH CALLED", storage->getStorageID().getNameForLogs());
if (output)
output->flush();
@ -325,13 +331,13 @@ void PushingToViewsBlockOutputStream::flush()
void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view)
{
Stopwatch watch;
// Change thread context to store individual metrics per view
auto * source_thread = current_thread;
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
auto * running_thread = current_thread;
current_thread = view.runtime_stats.thread_status.get();
*current_thread->last_rusage = RUsageCounters::current();
SCOPE_EXIT({
current_thread->updatePerformanceCounters();
current_thread = source_thread;
current_thread = running_thread;
});
try
@ -400,13 +406,13 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & vi
void PushingToViewsBlockOutputStream::process_prefix(ViewInfo & view)
{
Stopwatch watch;
// Change thread context to store individual metrics per view
auto * source_thread = current_thread;
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
auto * running_thread = current_thread;
current_thread = view.runtime_stats.thread_status.get();
*current_thread->last_rusage = RUsageCounters::current();
SCOPE_EXIT({
current_thread->updatePerformanceCounters();
current_thread = source_thread;
current_thread = running_thread;
});
try
@ -430,13 +436,13 @@ void PushingToViewsBlockOutputStream::process_prefix(ViewInfo & view)
void PushingToViewsBlockOutputStream::process_suffix(ViewInfo & view)
{
Stopwatch watch;
// Change thread context to store individual metrics per view
auto * source_thread = current_thread;
/// Change thread context to store individual metrics per view. Once the work in done, go back to the original thread
auto * running_thread = current_thread;
current_thread = view.runtime_stats.thread_status.get();
*current_thread->last_rusage = RUsageCounters::current();
SCOPE_EXIT({
current_thread->updatePerformanceCounters();
current_thread = source_thread;
current_thread = running_thread;
});
try

View File

@ -37,6 +37,8 @@ public:
const ASTPtr & query_ptr_,
bool no_destination = false);
~PushingToViewsBlockOutputStream() override;
Block getHeader() const override;
void write(const Block & block) override;

View File

@ -43,7 +43,7 @@ struct QueryViewsLogElement
{
String target_name;
ViewType type = ViewType::DEFAULT;
std::shared_ptr<ThreadStatus> thread_status = std::make_shared<ThreadStatus>();
std::shared_ptr<ThreadStatus> thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::chrono::time_point<std::chrono::system_clock> event_time;
ViewStatus event_status = ViewStatus::INIT;

View File

@ -494,7 +494,7 @@ void ThreadStatus::logToQueryViewsLog(const ViewInfo & vinfo)
element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats.event_time);
element.view_duration_ms = vinfo.runtime_stats.elapsed_ms;
element.initial_query_id = query_id; // query_context_ptr->getInitialQueryId();
element.initial_query_id = query_id;
element.view_name = vinfo.table_id.getNameForLogs();
element.view_uuid = vinfo.table_id.uuid;
element.view_type = vinfo.runtime_stats.type;