diff --git a/src/AggregateFunctions/AggregateFunctionSum.h b/src/AggregateFunctions/AggregateFunctionSum.h index 1038c8107a5..134b7e490d1 100644 --- a/src/AggregateFunctions/AggregateFunctionSum.h +++ b/src/AggregateFunctions/AggregateFunctionSum.h @@ -287,7 +287,7 @@ public: void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { - const auto & column = static_cast(*columns[0]); + const auto & column = assert_cast(*columns[0]); if constexpr (is_big_int_v) this->data(place).add(static_cast(column.getData()[row_num])); else @@ -309,7 +309,7 @@ public: } else { - const auto & column = static_cast(*columns[0]); + const auto & column = assert_cast(*columns[0]); this->data(place).addMany(column.getData().data(), batch_size); } } @@ -327,7 +327,7 @@ public: } else { - const auto & column = static_cast(*columns[0]); + const auto & column = assert_cast(*columns[0]); this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size); } } @@ -349,7 +349,7 @@ public: void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override { - auto & column = static_cast(to); + auto & column = assert_cast(to); column.getData().push_back(this->data(place).get()); } diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 61fdbc0198b..9b5a4bad697 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ #include #include #include +#include namespace DB @@ -130,7 +132,7 @@ Pipe StorageMaterializedView::read( void StorageMaterializedView::read( QueryPlan & query_plan, const Names & column_names, - const StorageMetadataPtr & /*metadata_snapshot*/, + const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, @@ -139,15 +141,27 @@ void StorageMaterializedView::read( { auto storage = getTargetTable(); auto lock = storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); - auto metadata_snapshot = storage->getInMemoryMetadataPtr(); + auto target_metadata_snapshot = storage->getInMemoryMetadataPtr(); if (query_info.order_optimizer) - query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context); + query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, context); - storage->read(query_plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + storage->read(query_plan, column_names, target_metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); if (query_plan.isInitialized()) { + auto mv_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, context, processed_stage); + auto target_header = getHeaderForProcessingStage(*storage, column_names, target_metadata_snapshot, query_info, context, processed_stage); + if (!blocksHaveEqualStructure(mv_header, target_header)) + { + auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(), + mv_header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + auto converting_step = std::make_unique(query_plan.getCurrentDataStream(), converting_actions); + converting_step->setStepDescription("Convert target table structure to MaterializedView structure"); + query_plan.addStep(std::move(converting_step)); + } + StreamLocalLimits limits; SizeLimits leaf_limits; @@ -161,7 +175,7 @@ void StorageMaterializedView::read( nullptr, nullptr); - adding_limits_and_quota->setStepDescription("Lock destination table for Buffer"); + adding_limits_and_quota->setStepDescription("Lock destination table for MaterializedView"); query_plan.addStep(std::move(adding_limits_and_quota)); } } diff --git a/tests/queries/0_stateless/01182_materialized_view_different_structure.reference b/tests/queries/0_stateless/01182_materialized_view_different_structure.reference new file mode 100644 index 00000000000..a1f113394b2 --- /dev/null +++ b/tests/queries/0_stateless/01182_materialized_view_different_structure.reference @@ -0,0 +1,6 @@ +4999950000.000000 +4999950000 +1000 499500 499500 999 0 +1000 124716 499500 255 0 +1000 124716 99 0 +2000 249432 255 0 diff --git a/tests/queries/0_stateless/01182_materialized_view_different_structure.sql b/tests/queries/0_stateless/01182_materialized_view_different_structure.sql new file mode 100644 index 00000000000..751bcc9e48e --- /dev/null +++ b/tests/queries/0_stateless/01182_materialized_view_different_structure.sql @@ -0,0 +1,42 @@ +DROP TABLE IF EXISTS test_table; +DROP TABLE IF EXISTS numbers; +DROP TABLE IF EXISTS test_mv; +DROP TABLE IF EXISTS src; +DROP TABLE IF EXISTS dst; +DROP TABLE IF EXISTS mv; +DROP TABLE IF EXISTS dist; + +CREATE TABLE test_table (key UInt32, value Decimal(16, 6)) ENGINE = SummingMergeTree() ORDER BY key; +CREATE TABLE numbers (number UInt64) ENGINE=Memory; + +CREATE MATERIALIZED VIEW test_mv TO test_table (number UInt64, value Decimal(38, 6)) +AS SELECT number, sum(number) AS value FROM (SELECT *, toDecimal64(number, 6) AS val FROM numbers) GROUP BY number; + +INSERT INTO numbers SELECT * FROM numbers(100000); + +SELECT sum(value) FROM test_mv; +SELECT sum(value) FROM (SELECT number, sum(number) AS value FROM (SELECT *, toDecimal64(number, 6) AS val FROM numbers) GROUP BY number); + +CREATE TABLE src (n UInt64, s FixedString(16)) ENGINE=Memory; +CREATE TABLE dst (n UInt8, s String) ENGINE = Memory; +CREATE MATERIALIZED VIEW mv TO dst (n String) AS SELECT * FROM src; +SET allow_experimental_bigint_types=1; +CREATE TABLE dist (n Int128) ENGINE=Distributed(test_cluster_two_shards, currentDatabase(), mv); + +INSERT INTO src SELECT number, toString(number) FROM numbers(1000); +INSERT INTO mv SELECT toString(number + 1000) FROM numbers(1000); -- { serverError 53 } +INSERT INTO mv SELECT arrayJoin(['42', 'test']); -- { serverError 53 } + +SELECT count(), sum(n), sum(toInt64(s)), max(n), min(n) FROM src; +SELECT count(), sum(n), sum(toInt64(s)), max(n), min(n) FROM dst; +SELECT count(), sum(toInt64(n)), max(n), min(n) FROM mv; +SELECT count(), sum(toInt64(n)), max(n), min(n) FROM dist; -- { serverError 70 } +SELECT count(), sum(toInt64(n)), max(toUInt32(n)), min(toInt128(n)) FROM dist; + +DROP TABLE test_table; +DROP TABLE numbers; +DROP TABLE test_mv; +DROP TABLE src; +DROP TABLE dst; +DROP TABLE mv; +DROP TABLE dist;