mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
fix segfault on aggregation when MV has unexpected structure
This commit is contained in:
parent
1e4b32aa30
commit
6560ec3ed5
@ -287,7 +287,7 @@ public:
|
||||
|
||||
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
const auto & column = static_cast<const ColVecType &>(*columns[0]);
|
||||
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
|
||||
if constexpr (is_big_int_v<T>)
|
||||
this->data(place).add(static_cast<TResult>(column.getData()[row_num]));
|
||||
else
|
||||
@ -309,7 +309,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & column = static_cast<const ColVecType &>(*columns[0]);
|
||||
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
|
||||
this->data(place).addMany(column.getData().data(), batch_size);
|
||||
}
|
||||
}
|
||||
@ -327,7 +327,7 @@ public:
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & column = static_cast<const ColVecType &>(*columns[0]);
|
||||
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
|
||||
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
|
||||
}
|
||||
}
|
||||
@ -349,7 +349,7 @@ public:
|
||||
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
|
||||
{
|
||||
auto & column = static_cast<ColVecResult &>(to);
|
||||
auto & column = assert_cast<ColVecResult &>(to);
|
||||
column.getData().push_back(this->data(place).get());
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Interpreters/InterpreterRenameQuery.h>
|
||||
#include <Interpreters/getTableExpressions.h>
|
||||
#include <Interpreters/AddDefaultDatabaseVisitor.h>
|
||||
#include <Interpreters/getHeaderForProcessingStage.h>
|
||||
#include <Access/AccessFlags.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
@ -24,6 +25,7 @@
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -130,7 +132,7 @@ Pipe StorageMaterializedView::read(
|
||||
void StorageMaterializedView::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
@ -139,15 +141,27 @@ void StorageMaterializedView::read(
|
||||
{
|
||||
auto storage = getTargetTable();
|
||||
auto lock = storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
|
||||
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
|
||||
|
||||
if (query_info.order_optimizer)
|
||||
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context);
|
||||
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, context);
|
||||
|
||||
storage->read(query_plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
storage->read(query_plan, column_names, target_metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
|
||||
if (query_plan.isInitialized())
|
||||
{
|
||||
auto mv_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
|
||||
auto target_header = getHeaderForProcessingStage(*storage, column_names, target_metadata_snapshot, query_info, context, processed_stage);
|
||||
if (!blocksHaveEqualStructure(mv_header, target_header))
|
||||
{
|
||||
auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),
|
||||
mv_header.getColumnsWithTypeAndName(),
|
||||
ActionsDAG::MatchColumnsMode::Name);
|
||||
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
|
||||
converting_step->setStepDescription("Convert target table structure to MaterializedView structure");
|
||||
query_plan.addStep(std::move(converting_step));
|
||||
}
|
||||
|
||||
StreamLocalLimits limits;
|
||||
SizeLimits leaf_limits;
|
||||
|
||||
@ -161,7 +175,7 @@ void StorageMaterializedView::read(
|
||||
nullptr,
|
||||
nullptr);
|
||||
|
||||
adding_limits_and_quota->setStepDescription("Lock destination table for Buffer");
|
||||
adding_limits_and_quota->setStepDescription("Lock destination table for MaterializedView");
|
||||
query_plan.addStep(std::move(adding_limits_and_quota));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,6 @@
|
||||
4999950000.000000
|
||||
4999950000
|
||||
1000 499500 499500 999 0
|
||||
1000 124716 499500 255 0
|
||||
1000 124716 99 0
|
||||
2000 249432 255 0
|
@ -0,0 +1,42 @@
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
DROP TABLE IF EXISTS numbers;
|
||||
DROP TABLE IF EXISTS test_mv;
|
||||
DROP TABLE IF EXISTS src;
|
||||
DROP TABLE IF EXISTS dst;
|
||||
DROP TABLE IF EXISTS mv;
|
||||
DROP TABLE IF EXISTS dist;
|
||||
|
||||
CREATE TABLE test_table (key UInt32, value Decimal(16, 6)) ENGINE = SummingMergeTree() ORDER BY key;
|
||||
CREATE TABLE numbers (number UInt64) ENGINE=Memory;
|
||||
|
||||
CREATE MATERIALIZED VIEW test_mv TO test_table (number UInt64, value Decimal(38, 6))
|
||||
AS SELECT number, sum(number) AS value FROM (SELECT *, toDecimal64(number, 6) AS val FROM numbers) GROUP BY number;
|
||||
|
||||
INSERT INTO numbers SELECT * FROM numbers(100000);
|
||||
|
||||
SELECT sum(value) FROM test_mv;
|
||||
SELECT sum(value) FROM (SELECT number, sum(number) AS value FROM (SELECT *, toDecimal64(number, 6) AS val FROM numbers) GROUP BY number);
|
||||
|
||||
CREATE TABLE src (n UInt64, s FixedString(16)) ENGINE=Memory;
|
||||
CREATE TABLE dst (n UInt8, s String) ENGINE = Memory;
|
||||
CREATE MATERIALIZED VIEW mv TO dst (n String) AS SELECT * FROM src;
|
||||
SET allow_experimental_bigint_types=1;
|
||||
CREATE TABLE dist (n Int128) ENGINE=Distributed(test_cluster_two_shards, currentDatabase(), mv);
|
||||
|
||||
INSERT INTO src SELECT number, toString(number) FROM numbers(1000);
|
||||
INSERT INTO mv SELECT toString(number + 1000) FROM numbers(1000); -- { serverError 53 }
|
||||
INSERT INTO mv SELECT arrayJoin(['42', 'test']); -- { serverError 53 }
|
||||
|
||||
SELECT count(), sum(n), sum(toInt64(s)), max(n), min(n) FROM src;
|
||||
SELECT count(), sum(n), sum(toInt64(s)), max(n), min(n) FROM dst;
|
||||
SELECT count(), sum(toInt64(n)), max(n), min(n) FROM mv;
|
||||
SELECT count(), sum(toInt64(n)), max(n), min(n) FROM dist; -- { serverError 70 }
|
||||
SELECT count(), sum(toInt64(n)), max(toUInt32(n)), min(toInt128(n)) FROM dist;
|
||||
|
||||
DROP TABLE test_table;
|
||||
DROP TABLE numbers;
|
||||
DROP TABLE test_mv;
|
||||
DROP TABLE src;
|
||||
DROP TABLE dst;
|
||||
DROP TABLE mv;
|
||||
DROP TABLE dist;
|
Loading…
Reference in New Issue
Block a user