mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 17:20:50 +00:00
Merge branch 'issue_7878_2' of https://github.com/vzakaznikov/ClickHouse into vzakaznikov-issue_7878_2
This commit is contained in:
commit
25ea3d83e2
@ -1,4 +1,5 @@
|
||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||
#include <Interpreters/addMissingDefaults.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Common/assert_cast.h>
|
||||
@ -34,8 +35,9 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
|
||||
const Context & context_,
|
||||
const BlockInputStreamPtr & input,
|
||||
const Block & result_header,
|
||||
MatchColumnsMode mode)
|
||||
: context(context_), header(result_header), conversion(header.columns())
|
||||
MatchColumnsMode mode,
|
||||
const ColumnDefaults & column_defaults_)
|
||||
: context(context_), header(result_header), column_defaults(column_defaults_), conversion(header.columns())
|
||||
{
|
||||
children.emplace_back(input);
|
||||
|
||||
@ -102,6 +104,7 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
|
||||
Block ConvertingBlockInputStream::readImpl()
|
||||
{
|
||||
Block src = children.back()->read();
|
||||
std::set<size_t> default_columns;
|
||||
|
||||
if (!src)
|
||||
return src;
|
||||
@ -113,9 +116,7 @@ Block ConvertingBlockInputStream::readImpl()
|
||||
|
||||
if (conversion[res_pos] == USE_DEFAULT)
|
||||
{
|
||||
// Create a column with default values
|
||||
auto column_with_defaults = res_elem.type->createColumn()->cloneResized(src.rows());
|
||||
res_elem.column = std::move(column_with_defaults);
|
||||
default_columns.insert(res_pos);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -128,6 +129,14 @@ Block ConvertingBlockInputStream::readImpl()
|
||||
|
||||
res_elem.column = std::move(converted);
|
||||
}
|
||||
|
||||
// replace missing columns with default value or expression if any
|
||||
if (!default_columns.empty())
|
||||
{
|
||||
res.erase(default_columns);
|
||||
res = addMissingDefaults(res, header.getNamesAndTypesList(), column_defaults, context);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <unordered_map>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Storages/ColumnDefault.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -37,7 +38,8 @@ public:
|
||||
const Context & context,
|
||||
const BlockInputStreamPtr & input,
|
||||
const Block & result_header,
|
||||
MatchColumnsMode mode);
|
||||
MatchColumnsMode mode,
|
||||
const ColumnDefaults & column_defaults = {});
|
||||
|
||||
String getName() const override { return "Converting"; }
|
||||
Block getHeader() const override { return header; }
|
||||
@ -47,6 +49,8 @@ private:
|
||||
|
||||
const Context & context;
|
||||
Block header;
|
||||
/// Only used in NameOrDefault mode
|
||||
const ColumnDefaults column_defaults;
|
||||
|
||||
/// How to construct result block. Position in source block, where to get each column.
|
||||
using Conversion = std::vector<size_t>;
|
||||
|
@ -230,7 +230,16 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
|
||||
/// and two-level aggregation is triggered).
|
||||
in = std::make_shared<SquashingBlockInputStream>(
|
||||
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
|
||||
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::NameOrDefault);
|
||||
|
||||
auto view_table = context.getTable(view.table_id);
|
||||
|
||||
if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(view_table.get()))
|
||||
{
|
||||
StoragePtr inner_table = materialized_view->getTargetTable();
|
||||
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::NameOrDefault, inner_table->getColumns().getDefaults());
|
||||
}
|
||||
else
|
||||
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
}
|
||||
else
|
||||
in = std::make_shared<OneBlockInputStream>(block);
|
||||
|
@ -1,5 +1,6 @@
|
||||
-- Create dictionary, since dictGet*() uses DB::Context in executeImpl()
|
||||
-- (To cover scope of the Context in DB::PushingToViewsBlockOutputStream::process)
|
||||
DROP TABLE IF EXISTS mv;
|
||||
DROP DATABASE IF EXISTS dict_in_01023;
|
||||
CREATE DATABASE dict_in_01023;
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
DROP TABLE IF EXISTS mv;
|
||||
DROP TABLE IF EXISTS mv_source;
|
||||
DROP TABLE IF EXISTS mv_target;
|
||||
|
||||
|
@ -0,0 +1,8 @@
|
||||
1
|
||||
1
|
||||
2
|
||||
3
|
||||
1 2
|
||||
1 2
|
||||
2 3
|
||||
3 4
|
@ -0,0 +1,16 @@
|
||||
DROP TABLE IF EXISTS mv;
|
||||
DROP TABLE IF EXISTS mv_source;
|
||||
DROP TABLE IF EXISTS mv_target;
|
||||
|
||||
CREATE TABLE mv_source (`a` UInt64) ENGINE = MergeTree ORDER BY tuple();
|
||||
CREATE TABLE mv_target (`a` UInt64) ENGINE = MergeTree ORDER BY tuple();
|
||||
|
||||
CREATE MATERIALIZED VIEW mv TO mv_target AS SELECT * FROM mv_source;
|
||||
|
||||
INSERT INTO mv_source VALUES (1);
|
||||
|
||||
ALTER TABLE mv_target ADD COLUMN b UInt8 DEFAULT a + 1;
|
||||
INSERT INTO mv_source VALUES (1),(2),(3);
|
||||
|
||||
SELECT * FROM mv ORDER BY a;
|
||||
SELECT * FROM mv_target ORDER BY a;
|
Loading…
Reference in New Issue
Block a user