Views: Once started, finish all concurrent views before throwing

Since sequential processing executes things in phases too, try to finish
all (non errored out) views before throwing any exception
This commit is contained in:
Raúl Marín 2021-06-28 13:32:27 +02:00
parent ce501bcf25
commit 69577a782f
7 changed files with 55 additions and 60 deletions

View File

@ -34,10 +34,10 @@ Columns:
- `ProfileEvents.Names` ([Array(String)](../../sql-reference/data-types/array.md)) — Counters that measure different metrics for this view. The description of them could be found in the table [system.events](#system_tables-events). It does not include events of views dependent on this one.
- `ProfileEvents.Values` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Values of metrics for this view that are listed in the `ProfileEvents.Names` column.
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the view. Values:
- `'Init' = 1` — The view was cancelled before writing anything to storage.
- `'WrittenPrefix' = 2` — The view was cancelled after writing its prefix to storage.
- `'WrittenBlock' = 3` — The view was cancelled after writing its blocks to storage. It might have materialized the input wholly, partially or none at all.
- `'WrittenSuffix' = 4` — The view wrote its suffix to storage. It completed successfully.
- `'QueryStart' = 1` — Successful start the view execution. Should not appear.
- `'QueryFinish' = 2` — Successful end of the view execution.
- `'ExceptionBeforeStart' = 3` — Exception before the start of the view execution.
- `'ExceptionWhileProcessing' = 4` — Exception during the view execution.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — Code of an exception.
- `exception` ([String](../../sql-reference/data-types/string.md)) — Exception message.
- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [Stack trace](https://en.wikipedia.org/wiki/Stack_trace). An empty string, if the query was completed successfully.

View File

@ -149,7 +149,12 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
thread_status->memory_tracker.setParent(running_memory_tracker);
QueryViewsLogElement::ViewRuntimeStats runtime_stats{
target_name, type, thread_status, 0, std::chrono::system_clock::now(), QueryViewsLogElement::ViewStatus::INIT};
target_name,
type,
thread_status,
0,
std::chrono::system_clock::now(),
QueryViewsLogElement::ViewStatus::EXCEPTION_BEFORE_START};
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, std::move(runtime_stats)});
current_thread = running_thread;
@ -210,45 +215,30 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
return;
/// Don't process materialized views if this block is duplicate
if (!getContext()->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate())
const Settings & settings = getContext()->getSettingsRef();
if (!settings.deduplicate_blocks_in_dependent_materialized_views && replicated_output && replicated_output->lastBlockIsDuplicate())
return;
const Settings & settings = getContext()->getSettingsRef();
const size_t max_threads = std::min(views.size(), (settings.parallel_view_processing ? static_cast<size_t>(settings.max_threads) : 1));
bool exception_happened = false;
if (max_threads > 1)
{
ThreadPool pool(max_threads);
std::atomic_uint8_t exception_count = 0;
for (auto & view : views)
{
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
if (exception_count.load(std::memory_order_relaxed))
return;
process(block, view);
if (view.exception)
exception_count.fetch_add(1, std::memory_order_relaxed);
});
}
pool.wait();
exception_happened = exception_count.load(std::memory_order_relaxed) != 0;
}
else
{
for (auto & view : views)
{
process(block, view);
if (view.exception)
{
exception_happened = true;
break;
}
}
}
if (exception_happened)
check_exceptions_in_views();
}
void PushingToViewsBlockOutputStream::writePrefix()
@ -262,7 +252,7 @@ void PushingToViewsBlockOutputStream::writePrefix()
if (view.exception)
{
log_query_views();
throw;
std::rethrow_exception(view.exception);
}
}
}
@ -287,10 +277,13 @@ void PushingToViewsBlockOutputStream::writeSuffix()
std::atomic_uint8_t exception_count = 0;
for (auto & view : views)
{
if (view.exception)
{
exception_happened = true;
continue;
}
pool.scheduleOrThrowOnError([&] {
setThreadName("PushingToViews");
if (exception_count.load(std::memory_order_relaxed))
return;
process_suffix(view);
if (view.exception)
@ -298,18 +291,20 @@ void PushingToViewsBlockOutputStream::writeSuffix()
});
}
pool.wait();
exception_happened = exception_count.load(std::memory_order_relaxed) != 0;
exception_happened |= exception_count.load(std::memory_order_relaxed) != 0;
}
else
{
for (auto & view : views)
{
process_suffix(view);
if (view.exception)
{
exception_happened = true;
break;
continue;
}
process_suffix(view);
if (view.exception)
exception_happened = true;
}
}
if (exception_happened)
@ -392,16 +387,15 @@ void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & vi
}
in->readSuffix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::WRITTEN_BLOCK);
}
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
view.exception = std::current_exception();
view.set_exception(std::current_exception());
}
catch (...)
{
view.exception = std::current_exception();
view.set_exception(std::current_exception());
}
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
@ -422,16 +416,15 @@ void PushingToViewsBlockOutputStream::process_prefix(ViewInfo & view)
try
{
view.out->writePrefix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::WRITTEN_PREFIX);
}
catch (Exception & ex)
{
ex.addMessage("while writing prefix to view " + view.table_id.getNameForLogs());
view.exception = std::current_exception();
view.set_exception(std::current_exception());
}
catch (...)
{
view.exception = std::current_exception();
view.set_exception(std::current_exception());
}
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
}
@ -452,16 +445,16 @@ void PushingToViewsBlockOutputStream::process_suffix(ViewInfo & view)
try
{
view.out->writeSuffix();
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::WRITTEN_SUFFIX);
view.runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::QUERY_FINISH);
}
catch (Exception & ex)
{
ex.addMessage("while writing suffix to view " + view.table_id.getNameForLogs());
view.exception = std::current_exception();
view.set_exception(std::current_exception());
}
catch (...)
{
view.exception = std::current_exception();
view.set_exception(std::current_exception());
}
view.runtime_stats.elapsed_ms += watch.elapsedMilliseconds();
if (!view.exception)
@ -491,12 +484,13 @@ void PushingToViewsBlockOutputStream::log_query_views()
{
const auto & settings = getContext()->getSettingsRef();
const UInt64 min_query_duration = settings.log_queries_min_query_duration_ms.totalMilliseconds();
const QueryViewsLogElement::ViewStatus min_status = settings.log_queries_min_type;
if (views.empty() || !settings.log_queries || !settings.log_query_views)
return;
for (auto & view : views)
{
if (min_query_duration && view.runtime_stats.elapsed_ms <= min_query_duration)
if ((min_query_duration && view.runtime_stats.elapsed_ms <= min_query_duration) || (view.runtime_stats.event_status < min_status))
continue;
try

View File

@ -23,6 +23,12 @@ struct ViewInfo
BlockOutputStreamPtr out;
std::exception_ptr exception;
QueryViewsLogElement::ViewRuntimeStats runtime_stats;
void set_exception(std::exception_ptr e)
{
exception = e;
runtime_stats.setStatus(QueryViewsLogElement::ViewStatus::EXCEPTION_WHILE_PROCESSING);
}
};
/** Writes data to the specified table and to all dependent materialized views.

View File

@ -19,10 +19,10 @@ namespace DB
Block QueryViewsLogElement::createBlock()
{
auto view_status_datatype = std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"Init", static_cast<Int8>(ViewStatus::INIT)},
{"WrittenPrefix", static_cast<Int8>(ViewStatus::WRITTEN_PREFIX)},
{"WrittenBlock", static_cast<Int8>(ViewStatus::WRITTEN_BLOCK)},
{"WrittenSuffix", static_cast<Int8>(ViewStatus::WRITTEN_SUFFIX)}});
{"QueryStart", static_cast<Int8>(QUERY_START)},
{"QueryFinish", static_cast<Int8>(QUERY_FINISH)},
{"ExceptionBeforeStart", static_cast<Int8>(EXCEPTION_BEFORE_START)},
{"ExceptionWhileProcessing", static_cast<Int8>(EXCEPTION_WHILE_PROCESSING)}});
auto view_type_datatype = std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"Default", static_cast<Int8>(ViewType::DEFAULT)},

View File

@ -23,14 +23,7 @@ class ThreadStatus;
struct QueryViewsLogElement
{
enum class ViewStatus : int8_t
{
INIT = 1,
WRITTEN_PREFIX = 2,
WRITTEN_BLOCK = 3,
WRITTEN_SUFFIX = 4
};
using ViewStatus = QueryLogElementType;
enum class ViewType : int8_t
{
@ -46,7 +39,7 @@ struct QueryViewsLogElement
std::shared_ptr<ThreadStatus> thread_status = nullptr;
UInt64 elapsed_ms = 0;
std::chrono::time_point<std::chrono::system_clock> event_time;
ViewStatus event_status = ViewStatus::INIT;
ViewStatus event_status = ViewStatus::QUERY_START;
void setStatus(ViewStatus s)
{

View File

@ -1,6 +1,7 @@
{"stage":"Query log rows","read_rows":"100","written_rows":"201","databases":["_table_function","default"],"tables":["_table_function.numbers","default.table_a","default.table_b","default.table_b_live_view","default.table_c"],"views":["default.matview_a_to_b","default.matview_b_to_c","default.table_b_live_view"]}
{"stage":"Depending views","view_name":"default.matview_a_to_b","view_target":"default.table_b","view_query":"SELECT toFloat64(a) AS a, b AS count FROM default.table_a"}
{"stage":"Depending views","view_name":"default.matview_b_to_c","view_target":"default.table_c","view_query":"SELECT sum(a) AS a FROM default.table_b"}
{"stage":"Depending views","view_name":"default.table_b_live_view","view_target":"default.table_b_live_view","view_query":"SELECT sum(a + b) FROM default.table_b"}
{"stage":"Query log rows","read_rows":"100","written_rows":"202","databases":["_table_function","default"],"tables":["_table_function.numbers","default.`.inner_id.12312312-1234-8765-8420-000000000001`","default.table_a","default.table_b","default.table_b_live_view","default.table_c"],"views":["default.matview_a_to_b","default.matview_b_to_c","default.matview_b_to_c_inner","default.table_b_live_view"]}
{"stage":"Depending views","view_name":"default.matview_a_to_b","status":"QueryFinish","view_target":"default.table_b","view_query":"SELECT toFloat64(a) AS a, b AS count FROM default.table_a"}
{"stage":"Depending views","view_name":"default.matview_b_to_c","status":"QueryFinish","view_target":"default.table_c","view_query":"SELECT sum(a) AS a FROM default.table_b"}
{"stage":"Depending views","view_name":"default.matview_b_to_c_inner","status":"QueryFinish","view_target":"default.`.inner_id.12312312-1234-8765-8420-000000000001`","view_query":"SELECT sum(a) AS a FROM default.table_b"}
{"stage":"Depending views","view_name":"default.table_b_live_view","status":"QueryFinish","view_target":"default.table_b_live_view","view_query":"SELECT sum(a + b) FROM default.table_b"}
{"stage":"Query log rows 2","read_rows":"50","written_rows":"100","databases":["_table_function","default"],"tables":["_table_function.numbers","default.table_d","default.table_e","default.table_f"],"views":["default.matview_join_d_e"]}
{"stage":"Depending views 2","view_name":"default.matview_join_d_e","view_target":"default.table_f","view_query":"SELECT table_d.a AS a, table_e.count AS count FROM default.table_d LEFT JOIN default.table_e ON table_d.a = table_e.a"}
{"stage":"Depending views 2","view_name":"default.matview_join_d_e","status":"QueryFinish","view_target":"default.table_f","view_query":"SELECT table_d.a AS a, table_e.count AS count FROM default.table_d LEFT JOIN default.table_e ON table_d.a = table_e.a"}

View File

@ -14,12 +14,9 @@ CREATE TABLE table_f (a Float64, count Int64) ENGINE MergeTree ORDER BY a;
-- SETUP MATERIALIZED VIEWS
CREATE MATERIALIZED VIEW matview_a_to_b TO table_b AS SELECT toFloat64(a) AS a, b AS count FROM table_a;
CREATE MATERIALIZED VIEW matview_b_to_c TO table_c AS SELECT SUM(a) as a FROM table_b;
CREATE MATERIALIZED VIEW matview_b_to_c_inner UUID '12312312-1234-8765-8420-000000000001' ENGINE TinyLog AS SELECT SUM(a) as a FROM table_b;
CREATE MATERIALIZED VIEW matview_join_d_e TO table_f AS SELECT table_d.a as a, table_e.count as count FROM table_d LEFT JOIN table_e ON table_d.a = table_e.a;
-- We don't include test materialized views here since the name is based on the uuid on atomic databases
-- and that makes the test harder to read
-- SETUP LIVE VIEW
---- table_b_live_view (Int64)
DROP TABLE IF EXISTS table_b_live_view;
@ -56,6 +53,7 @@ FORMAT JSONEachRow;
SELECT
'Depending views' as stage,
view_name,
status,
view_target,
view_query
FROM system.query_views_log
@ -88,6 +86,7 @@ FORMAT JSONEachRow;
SELECT
'Depending views 2' as stage,
view_name,
status,
view_target,
view_query
FROM system.query_views_log
@ -107,6 +106,8 @@ ORDER BY view_name
DROP TABLE table_b_live_view;
DROP TABLE matview_a_to_b;
DROP TABLE matview_b_to_c;
DROP TABLE matview_b_to_c_inner SYNC;
DROP TABLE matview_join_d_e;
DROP TABLE table_f;
DROP TABLE table_e;
DROP TABLE table_d;