mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Allow to ignore errors while pushing to MATERILIZED VIEW
This can be useful in the following scenarious: - you want to duplicate the data to another table and you don't care about the errors - you want to duplicate system.*_log to another server, you are adding materialized view that will push to Distributed table, but you don't want to miss original blocks in the local system.*_log - you want to push some data to a 3d party service, using i.e. URL engine. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
parent
2d4eae386a
commit
a110e0f022
@ -66,6 +66,8 @@ A materialized view is implemented as follows: when inserting data to the table
|
||||
Materialized views in ClickHouse use **column names** instead of column order during insertion into destination table. If some column names are not present in the `SELECT` query result, ClickHouse uses a default value, even if the column is not [Nullable](../../data-types/nullable.md). A safe practice would be to add aliases for every column when using Materialized views.
|
||||
|
||||
Materialized views in ClickHouse are implemented more like insert triggers. If there’s some aggregation in the view query, it’s applied only to the batch of freshly inserted data. Any changes to existing data of source table (like update, delete, drop partition, etc.) does not change the materialized view.
|
||||
|
||||
By default if the will be an error during pushing to the materialized view the data will not be inserted into the table to which the materialized view is attached. You can change this by setting `materialized_views_ignore_errors=true` setting for your `INSERT` query.
|
||||
:::
|
||||
|
||||
If you specify `POPULATE`, the existing table data is inserted into the view when creating it, as if making a `CREATE TABLE ... AS SELECT ...` . Otherwise, the query contains only the data inserted in the table after creating the view. We **do not recommend** using `POPULATE`, since data inserted in the table during the view creation will not be inserted in it.
|
||||
|
@ -508,6 +508,7 @@ class IColumn;
|
||||
M(Bool, allow_experimental_alter_materialized_view_structure, false, "Allow atomic alter on Materialized views. Work in progress.", 0) \
|
||||
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \
|
||||
M(Bool, deduplicate_blocks_in_dependent_materialized_views, false, "Should deduplicate blocks for materialized views if the block is not a duplicate for the table. Use true to always deduplicate in dependent tables.", 0) \
|
||||
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
|
||||
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \
|
||||
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
|
||||
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <Storages/StorageMaterializedView.h>
|
||||
#include <Storages/StorageValues.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
@ -173,7 +174,7 @@ class FinalizingViewsTransform final : public IProcessor
|
||||
static InputPorts initPorts(std::vector<Block> headers);
|
||||
|
||||
public:
|
||||
FinalizingViewsTransform(std::vector<Block> headers, ViewsDataPtr data);
|
||||
FinalizingViewsTransform(std::vector<Block> headers, ViewsDataPtr data, bool materialized_views_ignore_errors_);
|
||||
|
||||
String getName() const override { return "FinalizingViewsTransform"; }
|
||||
Status prepare() override;
|
||||
@ -184,6 +185,7 @@ private:
|
||||
ViewsDataPtr views_data;
|
||||
std::vector<ExceptionStatus> statuses;
|
||||
std::exception_ptr any_exception;
|
||||
bool materialized_views_ignore_errors;
|
||||
};
|
||||
|
||||
|
||||
@ -407,7 +409,7 @@ Chain buildPushingToViewsChain(
|
||||
headers.push_back(chain.getOutputHeader());
|
||||
|
||||
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data);
|
||||
auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move(headers), views_data);
|
||||
auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move(headers), views_data, settings.materialized_views_ignore_errors);
|
||||
auto out = copying_data->getOutputs().begin();
|
||||
auto in = finalizing_views->getInputs().begin();
|
||||
|
||||
@ -684,10 +686,11 @@ void PushingToWindowViewSink::consume(Chunk chunk)
|
||||
}
|
||||
|
||||
|
||||
FinalizingViewsTransform::FinalizingViewsTransform(std::vector<Block> headers, ViewsDataPtr data)
|
||||
FinalizingViewsTransform::FinalizingViewsTransform(std::vector<Block> headers, ViewsDataPtr data, bool materialized_views_ignore_errors_)
|
||||
: IProcessor(initPorts(std::move(headers)), {Block()})
|
||||
, output(outputs.front())
|
||||
, views_data(std::move(data))
|
||||
, materialized_views_ignore_errors(materialized_views_ignore_errors_)
|
||||
{
|
||||
statuses.resize(views_data->views.size());
|
||||
}
|
||||
@ -788,6 +791,13 @@ void FinalizingViewsTransform::work()
|
||||
auto & status = statuses[i];
|
||||
++i;
|
||||
|
||||
if (status.exception && materialized_views_ignore_errors)
|
||||
{
|
||||
auto exception = addStorageToException(status.exception, view.table_id);
|
||||
tryLogException(exception, &Poco::Logger::get("PushingToViews"), "Cannot push to the storage, ignoring the error");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (status.exception)
|
||||
{
|
||||
if (!any_exception)
|
||||
|
@ -0,0 +1,17 @@
|
||||
-- { echoOn }
|
||||
select * from data_02572 order by key;
|
||||
insert into data_02572 values (1); -- { serverError UNKNOWN_TABLE }
|
||||
select * from data_02572 order by key;
|
||||
1
|
||||
insert into data_02572 settings materialized_views_ignore_errors=1 values (2);
|
||||
select * from data_02572 order by key;
|
||||
1
|
||||
2
|
||||
create table receiver_02572 as data_02572;
|
||||
insert into data_02572 values (3);
|
||||
select * from data_02572 order by key;
|
||||
1
|
||||
2
|
||||
3
|
||||
select * from receiver_02572 order by key;
|
||||
3
|
@ -0,0 +1,30 @@
|
||||
set prefer_localhost_replica=1;
|
||||
|
||||
drop table if exists data_02572;
|
||||
drop table if exists proxy_02572;
|
||||
drop table if exists push_to_proxy_mv_02572;
|
||||
drop table if exists receiver_02572;
|
||||
|
||||
create table data_02572 (key Int) engine=Memory();
|
||||
|
||||
create table proxy_02572 (key Int) engine=Distributed('test_shard_localhost', currentDatabase(), 'receiver_02572');
|
||||
-- ensure that insert fails
|
||||
insert into proxy_02572 values (1); -- { serverError UNKNOWN_TABLE }
|
||||
|
||||
-- proxy data with MV
|
||||
create materialized view push_to_proxy_mv_02572 to proxy_02572 as select * from data_02572;
|
||||
|
||||
-- { echoOn }
|
||||
select * from data_02572 order by key;
|
||||
|
||||
insert into data_02572 values (1); -- { serverError UNKNOWN_TABLE }
|
||||
select * from data_02572 order by key;
|
||||
|
||||
insert into data_02572 settings materialized_views_ignore_errors=1 values (2);
|
||||
select * from data_02572 order by key;
|
||||
|
||||
create table receiver_02572 as data_02572;
|
||||
|
||||
insert into data_02572 values (3);
|
||||
select * from data_02572 order by key;
|
||||
select * from receiver_02572 order by key;
|
Loading…
Reference in New Issue
Block a user