fix bahaviour with materialized_views_ignore_errors

This commit is contained in:
Sema Checherinda 2024-12-10 16:24:46 +01:00
parent 689a154438
commit 0f57753013
11 changed files with 216 additions and 7 deletions

View File

@ -361,7 +361,7 @@ Chain InterpreterInsertQuery::buildSink(
else
{
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr,
query_ptr, no_destination,
query_ptr, /* view_level */ 0, no_destination,
thread_status_holder, running_group, elapsed_counter_ms, async_insert);
}

View File

@ -127,7 +127,10 @@ void PushingPipelineExecutor::finish()
finished = true;
if (executor)
executor->executeStep();
{
auto res = executor->executeStep();
chassert(!res);
}
}
void PushingPipelineExecutor::cancel()

View File

@ -1,3 +1,4 @@
#include <exception>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Common/ThreadStatus.h>
#include <Common/Stopwatch.h>

View File

@ -35,6 +35,7 @@
#include <atomic>
#include <chrono>
#include <exception>
#include <memory>
@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr<ViewsData>;
class CopyingDataToViewsTransform final : public IProcessor
{
public:
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data);
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_);
String getName() const override { return "CopyingDataToViewsTransform"; }
Status prepare() override;
@ -129,6 +130,7 @@ public:
private:
InputPort & input;
ViewsDataPtr views_data;
size_t view_level;
};
/// For source chunk, execute view query over it.
@ -223,6 +225,7 @@ private:
/// Generates one chain part for every view in buildPushingToViewsChain
std::optional<Chain> generateViewChain(
ContextPtr context,
size_t view_level,
const StorageID & view_id,
ThreadGroupPtr running_group,
Chain & result_chain,
@ -400,6 +403,7 @@ std::optional<Chain> generateViewChain(
query = live_view->getInnerQuery();
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view_level + 1,
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
}
@ -409,12 +413,14 @@ std::optional<Chain> generateViewChain(
query = window_view->getMergeableQuery();
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view_level + 1,
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert);
}
else
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view_level + 1,
/* no_destination= */ false,
thread_status_holder, running_group, view_counter_ms, async_insert);
@ -466,12 +472,14 @@ Chain buildPushingToViewsChain(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
size_t view_level,
bool no_destination,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms,
bool async_insert,
const Block & live_view_header)
const Block & live_view_header
)
{
checkStackSize();
Chain result_chain;
@ -514,7 +522,7 @@ Chain buildPushingToViewsChain(
try
{
auto out = generateViewChain(
context, view_id, running_group, result_chain,
context, view_level, view_id, running_group, result_chain,
views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children);
if (!out.has_value())
@ -554,7 +562,7 @@ Chain buildPushingToViewsChain(
for (const auto & chain : chains)
headers.push_back(chain.getOutputHeader());
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data);
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data, view_level);
auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move(headers), views_data);
auto out = copying_data->getOutputs().begin();
auto in = finalizing_views->getInputs().begin();
@ -726,10 +734,11 @@ static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context
}
CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data)
CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_)
: IProcessor({header}, OutputPorts(data->views.size(), header))
, input(inputs.front())
, views_data(std::move(data))
, view_level(view_level_)
{
if (views_data->views.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs");
@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare()
auto data = input.pullData();
if (data.exception)
{
// If view_level == 0 than the exception comes from the source table.
// There is no case when we could tolerate exceptions from the source table.
// Do not tolerate incomming exception and do not pass it to the follwing processors.
if (view_level == 0)
std::rethrow_exception(data.exception);
if (!views_data->has_exception)
{
views_data->first_exception = data.exception;

View File

@ -60,6 +60,7 @@ Chain buildPushingToViewsChain(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
size_t view_level,
/// It is true when we should not insert into table, but only to views.
/// Used e.g. for kafka. We should try to remove it somehow.
bool no_destination,

View File

@ -4,6 +4,7 @@
#include <Interpreters/PartLog.h>
#include <Processors/Transforms/DeduplicationTokenTransforms.h>
#include <DataTypes/ObjectUtils.h>
#include "Common/logger_useful.h"
#include <Common/ProfileEventsScope.h>
#include <Core/Settings.h>
@ -79,6 +80,7 @@ void MergeTreeSink::onStart()
void MergeTreeSink::onFinish()
{
chassert(!isCancelled());
finishDelayedChunk();
}

View File

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<profiles>
<default>
<s3_retry_attempts>1</s3_retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_validate_request_settings>0</s3_validate_request_settings>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<storage_policy>broken_s3</storage_policy>
</query_log>
</clickhouse>

View File

@ -39,6 +39,18 @@ def cluster():
],
with_minio=True,
)
cluster.add_instance(
"node_with_query_log_on_s3",
main_configs=[
"configs/storage_conf.xml",
"configs/query_log_conf.xml"
],
user_configs=[
"configs/setting.xml",
"configs/no_s3_retries.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -718,3 +730,118 @@ def test_no_key_found_disk(cluster, broken_s3):
)
assert s3_disk_no_key_errors_metric_value > 0
def test_node_with_query_log_on_s3(cluster, broken_s3):
node = cluster.instances["node_with_query_log_on_s3"]
node.query(
"""
SYSTEM FLUSH LOGS
"""
)
node.query(
"""
CREATE TABLE log_sink
ENGINE = MergeTree()
ORDER BY ()
EMPTY AS
SELECT *
FROM system.query_log
"""
)
node.query(
"""
CREATE MATERIALIZED VIEW log_sink_mv TO log_sink AS
SELECT *
FROM system.query_log
"""
)
node.query(
"""
SELECT 1111
"""
)
node.query(
"""
SYSTEM FLUSH LOGS
"""
)
node.query(
"""
SELECT 2222
"""
)
broken_s3.setup_at_object_upload(count=100, after=0)
node.query(
"""
SYSTEM FLUSH LOGS
"""
)
count_from_query_log = node.query(
"""
SELECT count() from system.query_log WHERE query like 'SELECT 2222%' AND type = 'QueryFinish'
""")
assert count_from_query_log == "0\n"
def test_exception_in_onFinish(cluster, broken_s3):
node = cluster.instances["node_with_query_log_on_s3"]
node.query(
"""
CREATE TABLE source (i Int64)
ENGINE = MergeTree()
ORDER BY ()
SETTINGS storage_policy='broken_s3'
"""
)
node.query(
"""
CREATE TABLE source_sink
ENGINE = MergeTree()
ORDER BY ()
EMPTY AS
SELECT *
FROM source
"""
)
node.query(
"""
CREATE MATERIALIZED VIEW source_sink_mv TO source_sink AS
SELECT *
FROM source
"""
)
node.query(
"""
INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (1)
"""
)
broken_s3.setup_at_object_upload(count=100, after=0)
node.query_and_get_error(
"""
INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (2)
"""
)
count_from_query_log = node.query(
"""
SELECT count() from source
""")
assert count_from_query_log == "1\n"

View File

@ -0,0 +1,14 @@
-- { echoOn }
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
0
select count() from testXA;
0
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
0
select count() from testXA;
0

View File

@ -0,0 +1,28 @@
-- more blocks to process
set max_block_size = 10;
set min_insert_block_size_rows = 10;
drop table if exists testX;
drop table if exists testXA;
create table testX (A Int64) engine=MergeTree partition by (intDiv(A, 10), throwIf(A=2)) order by tuple();
create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX;
-- { echoOn }
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
select count() from testXA;
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
select count() from testXA;
-- { echoOff }
drop table testX;
drop view testXA;