Merge pull request #72898 from ClickHouse/revert-72882-revert-72654-revert-72642-revert-72395-chesema-dtor-Finalizer

Revert "Revert "Revert "Revert "make d-tor Finalizer more obvious""""
This commit is contained in:
Sema Checherinda 2024-12-12 15:08:56 +00:00 committed by GitHub
commit 8fd1571802
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 282 additions and 27 deletions

View File

@ -303,6 +303,7 @@ static bool isTrivialSelect(const ASTPtr & select)
Chain InterpreterInsertQuery::buildChain(
const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
const Names & columns,
ThreadStatusesHolderPtr thread_status_holder,
@ -324,7 +325,7 @@ Chain InterpreterInsertQuery::buildChain(
if (check_access)
getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames());
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain sink = buildSink(table, view_level, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample);
chain.appendChain(std::move(sink));
@ -333,6 +334,7 @@ Chain InterpreterInsertQuery::buildChain(
Chain InterpreterInsertQuery::buildSink(
const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
@ -361,7 +363,7 @@ Chain InterpreterInsertQuery::buildSink(
else
{
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr,
query_ptr, no_destination,
query_ptr, view_level, no_destination,
thread_status_holder, running_group, elapsed_counter_ms, async_insert);
}
@ -423,7 +425,14 @@ Chain InterpreterInsertQuery::buildPreSinkChain(
return out;
}
std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block)
std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildPreAndSinkChains(
size_t presink_streams,
size_t sink_streams,
StoragePtr table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block
)
{
chassert(presink_streams > 0);
chassert(sink_streams > 0);
@ -439,7 +448,7 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
for (size_t i = 0; i < sink_streams; ++i)
{
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
auto out = buildSink(table, view_level, metadata_snapshot, /* thread_status_holder= */ nullptr,
running_group, /* elapsed_counter_ms= */ nullptr);
sink_chains.emplace_back(std::move(out));
@ -639,7 +648,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
auto [presink_chains, sink_chains] = buildPreAndSinkChains(
presink_streams_size, sink_streams_size,
table, metadata_snapshot, query_sample_block);
table, /* view_level */ 0, metadata_snapshot, query_sample_block);
pipeline.resize(presink_chains.size());
@ -693,7 +702,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query
{
auto [presink_chains, sink_chains] = buildPreAndSinkChains(
/* presink_streams */1, /* sink_streams */1,
table, metadata_snapshot, query_sample_block);
table, /* view_level */ 0, metadata_snapshot, query_sample_block);
chain = std::move(presink_chains.front());
chain.appendChain(std::move(sink_chains.front()));

View File

@ -43,6 +43,7 @@ public:
Chain buildChain(
const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
const Names & columns,
ThreadStatusesHolderPtr thread_status_holder = {},
@ -79,13 +80,20 @@ private:
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
std::pair<std::vector<Chain>, std::vector<Chain>> buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block);
std::pair<std::vector<Chain>, std::vector<Chain>> buildPreAndSinkChains(
size_t presink_streams,
size_t sink_streams,
StoragePtr table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block);
QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table);
QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table);
Chain buildSink(
const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,

View File

@ -127,7 +127,10 @@ void PushingPipelineExecutor::finish()
finished = true;
if (executor)
executor->executeStep();
{
auto res = executor->executeStep();
chassert(!res);
}
}
void PushingPipelineExecutor::cancel()

View File

@ -1,3 +1,4 @@
#include <exception>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Common/ThreadStatus.h>
#include <Common/Stopwatch.h>

View File

@ -35,6 +35,7 @@
#include <atomic>
#include <chrono>
#include <exception>
#include <memory>
@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr<ViewsData>;
class CopyingDataToViewsTransform final : public IProcessor
{
public:
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data);
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_);
String getName() const override { return "CopyingDataToViewsTransform"; }
Status prepare() override;
@ -129,6 +130,7 @@ public:
private:
InputPort & input;
ViewsDataPtr views_data;
size_t view_level;
};
/// For source chunk, execute view query over it.
@ -223,6 +225,7 @@ private:
/// Generates one chain part for every view in buildPushingToViewsChain
std::optional<Chain> generateViewChain(
ContextPtr context,
size_t view_level,
const StorageID & view_id,
ThreadGroupPtr running_group,
Chain & result_chain,
@ -369,7 +372,7 @@ std::optional<Chain> generateViewChain(
/// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false`
bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type;
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
out = interpreter.buildChain(inner_table, view_level + 1, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
if (interpreter.shouldAddSquashingFroStorage(inner_table))
{
@ -400,6 +403,7 @@ std::optional<Chain> generateViewChain(
query = live_view->getInnerQuery();
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view_level + 1,
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
}
@ -409,12 +413,14 @@ std::optional<Chain> generateViewChain(
query = window_view->getMergeableQuery();
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view_level + 1,
/* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert);
}
else
out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(),
view_level + 1,
/* no_destination= */ false,
thread_status_holder, running_group, view_counter_ms, async_insert);
@ -466,12 +472,14 @@ Chain buildPushingToViewsChain(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
size_t view_level,
bool no_destination,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms,
bool async_insert,
const Block & live_view_header)
const Block & live_view_header
)
{
checkStackSize();
Chain result_chain;
@ -514,7 +522,7 @@ Chain buildPushingToViewsChain(
try
{
auto out = generateViewChain(
context, view_id, running_group, result_chain,
context, view_level, view_id, running_group, result_chain,
views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children);
if (!out.has_value())
@ -554,7 +562,7 @@ Chain buildPushingToViewsChain(
for (const auto & chain : chains)
headers.push_back(chain.getOutputHeader());
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data);
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data, view_level);
auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move(headers), views_data);
auto out = copying_data->getOutputs().begin();
auto in = finalizing_views->getInputs().begin();
@ -726,10 +734,11 @@ static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context
}
CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data)
CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_)
: IProcessor({header}, OutputPorts(data->views.size(), header))
, input(inputs.front())
, views_data(std::move(data))
, view_level(view_level_)
{
if (views_data->views.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs");
@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare()
auto data = input.pullData();
if (data.exception)
{
// If view_level == 0 than the exception comes from the source table.
// There is no case when we could tolerate exceptions from the source table.
// Do not tolerate incoming exception and do not pass it to the following processors.
if (view_level == 0)
std::rethrow_exception(data.exception);
if (!views_data->has_exception)
{
views_data->first_exception = data.exception;

View File

@ -60,6 +60,7 @@ Chain buildPushingToViewsChain(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const ASTPtr & query_ptr,
size_t view_level,
/// It is true when we should not insert into table, but only to views.
/// Used e.g. for kafka. We should try to remove it somehow.
bool no_destination,

View File

@ -1,3 +1,4 @@
#include <exception>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>
@ -44,6 +45,8 @@ MergeTreeSink::~MergeTreeSink()
if (!delayed_chunk)
return;
chassert(isCancelled() || std::uncaught_exceptions());
for (auto & partition : delayed_chunk->partitions)
{
partition.temp_part.cancel();
@ -76,6 +79,7 @@ void MergeTreeSink::onStart()
void MergeTreeSink::onFinish()
{
chassert(!isCancelled());
finishDelayedChunk();
}

View File

@ -119,7 +119,7 @@ struct MergedBlockOutputStream::Finalizer::Impl
}
void finish();
void cancel();
void cancel() noexcept;
};
void MergedBlockOutputStream::Finalizer::finish()
@ -130,7 +130,7 @@ void MergedBlockOutputStream::Finalizer::finish()
to_finish->finish();
}
void MergedBlockOutputStream::Finalizer::cancel()
void MergedBlockOutputStream::Finalizer::cancel() noexcept
{
std::unique_ptr<Impl> to_cancel = std::move(impl);
impl.reset();
@ -167,7 +167,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
part->getDataPartStorage().removeFile(file_name);
}
void MergedBlockOutputStream::Finalizer::Impl::cancel()
void MergedBlockOutputStream::Finalizer::Impl::cancel() noexcept
{
writer.cancel();
@ -183,15 +183,8 @@ MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : imp
MergedBlockOutputStream::Finalizer::~Finalizer()
{
try
{
if (impl)
finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (impl)
cancel();
}

View File

@ -55,7 +55,7 @@ public:
~Finalizer();
void finish();
void cancel();
void cancel() noexcept;
};
/// Finalize writing part and fill inner structures

View File

@ -179,6 +179,8 @@ ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl()
if (!delayed_chunk)
return;
chassert(isCancelled() || std::uncaught_exceptions());
for (auto & partition : delayed_chunk->partitions)
{
partition.temp_part.cancel();

View File

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<profiles>
<default>
<s3_retry_attempts>1</s3_retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_validate_request_settings>0</s3_validate_request_settings>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<storage_policy>broken_s3</storage_policy>
</query_log>
</clickhouse>

View File

@ -39,6 +39,18 @@ def cluster():
],
with_minio=True,
)
cluster.add_instance(
"node_with_query_log_on_s3",
main_configs=[
"configs/storage_conf.xml",
"configs/query_log_conf.xml",
],
user_configs=[
"configs/setting.xml",
"configs/no_s3_retries.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -718,3 +730,150 @@ def test_no_key_found_disk(cluster, broken_s3):
)
assert s3_disk_no_key_errors_metric_value > 0
def test_node_with_query_log_on_s3(cluster, broken_s3):
node = cluster.instances["node_with_query_log_on_s3"]
node.query(
"""
SYSTEM FLUSH LOGS
"""
)
node.query(
"""
DROP VIEW IF EXISTS log_sink_mv
"""
)
node.query(
"""
DROP TABLE IF EXISTS log_sink
"""
)
node.query(
"""
CREATE TABLE log_sink
ENGINE = MergeTree()
ORDER BY ()
EMPTY AS
SELECT *
FROM system.query_log
"""
)
node.query(
"""
CREATE MATERIALIZED VIEW log_sink_mv TO log_sink AS
SELECT *
FROM system.query_log
"""
)
node.query(
"""
SELECT 1111
"""
)
node.query(
"""
SYSTEM FLUSH LOGS
"""
)
node.query(
"""
SELECT 2222
"""
)
broken_s3.setup_at_object_upload(count=100, after=0)
node.query(
"""
SYSTEM FLUSH LOGS
"""
)
count_from_query_log = node.query(
"""
SELECT count() from system.query_log WHERE query like 'SELECT 2222%' AND type = 'QueryFinish'
"""
)
assert count_from_query_log == "0\n"
def test_exception_in_onFinish(cluster, broken_s3):
node = cluster.instances["node_with_query_log_on_s3"]
node.query(
"""
DROP VIEW IF EXISTS source_sink_mv
"""
)
node.query(
"""
DROP TABLE IF EXISTS source_sink
"""
)
node.query(
"""
DROP TABLE IF EXISTS source
"""
)
node.query(
"""
CREATE TABLE source (i Int64)
ENGINE = MergeTree()
ORDER BY ()
SETTINGS storage_policy='broken_s3'
"""
)
node.query(
"""
CREATE TABLE source_sink
ENGINE = MergeTree()
ORDER BY ()
EMPTY AS
SELECT *
FROM source
"""
)
node.query(
"""
CREATE MATERIALIZED VIEW source_sink_mv TO source_sink AS
SELECT *
FROM source
"""
)
node.query(
"""
INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (1)
"""
)
broken_s3.setup_at_object_upload(count=100, after=0)
node.query_and_get_error(
"""
INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (2)
"""
)
count_from_query_log = node.query(
"""
SELECT count() from source
"""
)
assert count_from_query_log == "1\n"

View File

@ -0,0 +1,14 @@
-- { echoOn }
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
0
select count() from testXA;
0
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
0
select count() from testXA;
0

View File

@ -0,0 +1,28 @@
-- more blocks to process
set max_block_size = 10;
set min_insert_block_size_rows = 10;
drop table if exists testX;
drop table if exists testXA;
create table testX (A Int64) engine=MergeTree partition by (intDiv(A, 10), throwIf(A=2)) order by tuple();
create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX;
-- { echoOn }
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
select count() from testXA;
insert into testX select number from numbers(20)
settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
select count() from testX;
select count() from testXA;
-- { echoOff }
drop table testX;
drop view testXA;