Merge pull request #15532 from ClickHouse/substitute_defaults_recursively

Fix defaults substitution for absent columns which depend on other defaults
This commit is contained in:
alesapin 2020-10-02 21:42:01 +03:00 committed by GitHub
commit 77c9f7af79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 147 additions and 92 deletions

View File

@ -1555,9 +1555,8 @@ private:
BlockInputStreamPtr block_input = context.getInputFormat(
current_format, buf, sample, insert_format_max_block_size);
const auto & column_defaults = columns_description.getDefaults();
if (!column_defaults.empty())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
if (columns_description.hasDefaults())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns_description, context);
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);

View File

@ -7,7 +7,7 @@ namespace DB
void AddingDefaultBlockOutputStream::write(const Block & block)
{
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), column_defaults, context));
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), columns, context));
}
void AddingDefaultBlockOutputStream::flush()

View File

@ -2,7 +2,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnConst.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
@ -23,10 +23,10 @@ public:
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & output_block_,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_)
: output(output_), header(header_), output_block(output_block_),
column_defaults(column_defaults_), context(context_)
columns(columns_), context(context_)
{
}
@ -43,7 +43,7 @@ private:
const Block header;
/// Blocks after this stream should have this structure
const Block output_block;
const ColumnDefaults column_defaults;
const ColumnsDescription columns;
const Context & context;
};

View File

@ -127,11 +127,13 @@ static MutableColumnPtr mixColumns(const ColumnWithTypeAndName & col_read,
}
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefaults & column_defaults_,
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(
const BlockInputStreamPtr & input,
const ColumnsDescription & columns_,
const Context & context_)
: column_defaults(column_defaults_),
context(context_)
: columns(columns_)
, column_defaults(columns.getDefaults())
, context(context_)
{
children.push_back(input);
header = input->getHeader();
@ -169,7 +171,7 @@ Block AddingDefaultsBlockInputStream::readImpl()
if (!evaluate_block.columns())
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false);
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
@ -15,7 +15,7 @@ class AddingDefaultsBlockInputStream : public IBlockInputStream
public:
AddingDefaultsBlockInputStream(
const BlockInputStreamPtr & input,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_);
String getName() const override { return "AddingDefaults"; }
@ -26,6 +26,7 @@ protected:
private:
Block header;
const ColumnsDescription columns;
const ColumnDefaults column_defaults;
const Context & context;
};

View File

@ -64,9 +64,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
{
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto column_defaults = metadata_snapshot->getColumns().getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
const auto & columns = metadata_snapshot->getColumns();
if (columns.hasDefaults())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, columns, context);
}
}

View File

@ -1,4 +1,3 @@
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>

View File

@ -348,7 +348,7 @@ BlockIO InterpreterInsertQuery::execute()
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns().getDefaults(), context);
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns(), context);
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.

View File

@ -6,15 +6,16 @@
#include <Columns/ColumnArray.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Block.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
Block addMissingDefaults(const Block & block,
Block addMissingDefaults(
const Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
const Context & context)
{
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
@ -49,7 +50,7 @@ Block addMissingDefaults(const Block & block,
continue;
}
if (column_defaults.count(column.name))
if (columns.hasDefault(column.name))
continue;
String offsets_name = Nested::extractTableName(column.name);
@ -72,8 +73,8 @@ Block addMissingDefaults(const Block & block,
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
}
/// Computes explicitly specified values (in column_defaults) by default and materialized columns.
evaluateMissingDefaults(res, required_columns, column_defaults, context);
/// Computes explicitly specified values by default and materialized columns.
evaluateMissingDefaults(res, required_columns, columns, context);
return res;
}

View File

@ -10,7 +10,7 @@ namespace DB
class Block;
class Context;
class NamesAndTypesList;
struct ColumnDefault;
class ColumnsDescription;
/** Adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
@ -21,7 +21,7 @@ struct ColumnDefault;
Block addMissingDefaults(
const Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const ColumnsDescription & columns,
const Context & context);
}

View File

@ -2,7 +2,6 @@
#include <Core/Block.h>
#include <Parsers/queryToString.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
@ -14,6 +13,8 @@
#include <utility>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Common/checkStackSize.h>
#include <Storages/ColumnsDescription.h>
namespace DB
@ -22,46 +23,46 @@ namespace DB
namespace
{
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults)
/// Add all required expressions for missing columns calculation
void addDefaultRequiredExpressionsRecursively(Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
{
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
checkStackSize();
if (block.has(required_column) || added_columns.count(required_column))
return;
for (const auto & column : required_columns)
{
if (block.has(column.name))
continue;
auto column_default = columns.getDefault(required_column);
const auto it = column_defaults.find(column.name);
if (it != column_defaults.end())
if (column_default)
{
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
auto column_default_expr = it->second.expression->clone();
auto column_default_expr = column_default->expression->clone();
/// Our default may depend on columns with ALIAS as default expr which not present in block
/// we can easily add them from column_defaults struct
/// Our default may depend on columns with default expr which not present in block
/// we have to add them to block too
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
NameSet required_columns_names = columns_context.requiredColumns();
auto cast_func = makeASTFunction("CAST", column_default_expr, std::make_shared<ASTLiteral>(columns.get(required_column).type->getName()));
default_expr_list_accum->children.emplace_back(setAlias(cast_func, required_column));
added_columns.emplace(required_column);
for (const auto & required_column_name : required_columns_names)
{
/// If we have such default column and it's alias than we should
/// add it into default_expression_list
if (auto rit = column_defaults.find(required_column_name);
rit != column_defaults.end() && rit->second.kind == ColumnDefaultKind::Alias)
{
default_expr_list->children.emplace_back(setAlias(rit->second.expression->clone(), required_column_name));
addDefaultRequiredExpressionsRecursively(block, required_column_name, columns, default_expr_list_accum, added_columns);
}
}
auto cast_func = makeASTFunction("CAST", column_default_expr, std::make_shared<ASTLiteral>(column.type->getName()));
default_expr_list->children.emplace_back(setAlias(cast_func, it->first));
}
}
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
{
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NameSet added_columns;
for (const auto & column : required_columns)
addDefaultRequiredExpressionsRecursively(block, column.name, columns, default_expr_list, added_columns);
if (default_expr_list->children.empty())
return nullptr;
return default_expr_list;
}
@ -161,13 +162,13 @@ void performRequiredConversions(Block & block, const NamesAndTypesList & require
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
const Context & context, bool save_unneeded_columns)
{
if (column_defaults.empty())
if (!columns.hasDefaults())
return;
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, column_defaults);
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, columns);
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
}

View File

@ -10,13 +10,13 @@ namespace DB
class Block;
class Context;
class NamesAndTypesList;
struct ColumnDefault;
class ColumnsDescription;
/// Adds missing defaults to block according to required_columns
/// using column_defaults map
/// using columns description
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const ColumnsDescription & columns,
const Context & context, bool save_unneeded_columns = true);
/// Tries to convert columns in block to required_columns

View File

@ -8,10 +8,10 @@ namespace DB
AddingMissedTransform::AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_)
: ISimpleTransform(std::move(header_), std::move(result_header_), false)
, column_defaults(column_defaults_), context(context_)
, columns(columns_), context(context_)
{
}
@ -20,7 +20,7 @@ void AddingMissedTransform::transform(Chunk & chunk)
auto num_rows = chunk.getNumRows();
Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), column_defaults, context);
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), columns, context);
chunk.setColumns(res.getColumns(), num_rows);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Storages/ColumnDefault.h>
#include <Storages/ColumnsDescription.h>
namespace DB
@ -20,7 +20,7 @@ public:
AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnDefaults & column_defaults_,
const ColumnsDescription & columns_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
@ -28,7 +28,7 @@ public:
private:
void transform(Chunk &) override;
const ColumnDefaults column_defaults;
const ColumnsDescription columns;
const Context & context;
};

View File

@ -378,6 +378,14 @@ bool ColumnsDescription::hasPhysical(const String & column_name) const
}
bool ColumnsDescription::hasDefaults() const
{
for (const auto & column : columns)
if (column.default_desc.expression)
return true;
return false;
}
ColumnDefaults ColumnsDescription::getDefaults() const
{
ColumnDefaults ret;

View File

@ -109,6 +109,7 @@ public:
ColumnDefaults getDefaults() const; /// TODO: remove
bool hasDefault(const String & column_name) const;
bool hasDefaults() const;
std::optional<ColumnDefault> getDefault(const String & column_name) const;
/// Does column has non default specified compression codec

View File

@ -180,7 +180,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
}
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns().getDefaults(), storage.global_context);
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
/// Move columns from block.
name_and_type = columns.begin();

View File

@ -224,7 +224,7 @@ Pipe StorageBuffer::read(
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingMissedTransform>(stream_header, header_after_adding_defaults,
metadata_snapshot->getColumns().getDefaults(), context);
metadata_snapshot->getColumns(), context);
});
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)

View File

@ -234,12 +234,12 @@ public:
const Context & context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnDefaults column_defaults_)
ColumnsDescription columns_description_)
: SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, files_info(std::move(files_info_))
, column_defaults(std::move(column_defaults_))
, columns_description(std::move(columns_description_))
, context(context_)
, max_block_size(max_block_size_)
{
@ -314,8 +314,8 @@ public:
reader = FormatFactory::instance().getInput(
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
if (columns_description.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context);
reader->readPrefix();
}
@ -366,7 +366,7 @@ private:
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
ColumnDefaults column_defaults;
ColumnsDescription columns_description;
const Context & context; /// TODO Untangle potential issues with context lifetime.
UInt64 max_block_size;
@ -417,7 +417,7 @@ Pipe StorageFile::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns()));
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -71,7 +71,7 @@ namespace
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
UInt64 max_block_size,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
@ -86,8 +86,8 @@ namespace
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
if (columns.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
}
String getName() const override
@ -312,7 +312,7 @@ Pipe StorageS3::read(
getName(),
metadata_snapshot->getSampleBlock(),
context,
metadata_snapshot->getColumns().getDefaults(),
metadata_snapshot->getColumns(),
max_block_size,
chooseCompressionMethod(uri.endpoint, compression_method),
client,

View File

@ -61,7 +61,7 @@ namespace
String name_,
const Block & sample_block,
const Context & context,
const ColumnDefaults & column_defaults,
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
@ -81,7 +81,7 @@ namespace
compression_method);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
}
String getName() const override
@ -201,7 +201,7 @@ Pipe IStorageURLBase::read(
getName(),
getHeaderBlock(column_names, metadata_snapshot),
context,
metadata_snapshot->getColumns().getDefaults(),
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(request_uri.getPath(), compression_method)));

View File

@ -0,0 +1,7 @@
1
1
1
1
1 [] [] [] 0
CREATE TABLE default.defaults_on_defaults\n(\n `key` UInt64,\n `Arr.C1` Array(UInt32) DEFAULT emptyArrayUInt32(),\n `Arr.C2` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C1)),\n `Arr.C3` Array(UInt32) ALIAS arrayResize(emptyArrayUInt32(), length(Arr.C2)),\n `Arr.C4` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C3)),\n `ArrLen` UInt64 DEFAULT length(Arr.C4)\n)\nENGINE = MergeTree()\nORDER BY tuple()\nSETTINGS index_granularity = 8192
1

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS defaults_on_defaults;
CREATE TABLE defaults_on_defaults (
key UInt64
)
ENGINE = MergeTree()
ORDER BY tuple();
INSERT INTO defaults_on_defaults values (1);
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C1` Array(UInt32) DEFAULT emptyArrayUInt32();
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C2` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C1));
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C3` Array(UInt32) ALIAS arrayResize(emptyArrayUInt32(), length(Arr.C2));
SELECT 1 from defaults_on_defaults where length(`Arr.C2`) = 0;
SELECT 1 from defaults_on_defaults where length(`Arr.C3`) = 0;
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C4` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C3));
SELECT 1 from defaults_on_defaults where length(`Arr.C4`) = 0;
ALTER TABLE defaults_on_defaults ADD COLUMN `ArrLen` UInt64 DEFAULT length(Arr.C4);
SELECT 1 from defaults_on_defaults where ArrLen = 0;
SELECT * from defaults_on_defaults where ArrLen = 0;
SHOW CREATE TABLE defaults_on_defaults;
OPTIMIZE TABLE defaults_on_defaults FINAL;
SELECT 1 from defaults_on_defaults where length(`Arr.C4`) = 0;
DROP TABLE IF EXISTS defaults_on_defaults;