mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
Merge pull request #15532 from ClickHouse/substitute_defaults_recursively
Fix defaults substitution for absent columns which depend on other defaults
This commit is contained in:
commit
77c9f7af79
@ -1555,9 +1555,8 @@ private:
|
|||||||
BlockInputStreamPtr block_input = context.getInputFormat(
|
BlockInputStreamPtr block_input = context.getInputFormat(
|
||||||
current_format, buf, sample, insert_format_max_block_size);
|
current_format, buf, sample, insert_format_max_block_size);
|
||||||
|
|
||||||
const auto & column_defaults = columns_description.getDefaults();
|
if (columns_description.hasDefaults())
|
||||||
if (!column_defaults.empty())
|
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns_description, context);
|
||||||
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
|
|
||||||
|
|
||||||
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
|
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ namespace DB
|
|||||||
|
|
||||||
void AddingDefaultBlockOutputStream::write(const Block & block)
|
void AddingDefaultBlockOutputStream::write(const Block & block)
|
||||||
{
|
{
|
||||||
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), column_defaults, context));
|
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), columns, context));
|
||||||
}
|
}
|
||||||
|
|
||||||
void AddingDefaultBlockOutputStream::flush()
|
void AddingDefaultBlockOutputStream::flush()
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
#include <DataStreams/IBlockOutputStream.h>
|
#include <DataStreams/IBlockOutputStream.h>
|
||||||
#include <Columns/ColumnConst.h>
|
#include <Columns/ColumnConst.h>
|
||||||
#include <Storages/ColumnDefault.h>
|
#include <Storages/ColumnsDescription.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -23,10 +23,10 @@ public:
|
|||||||
const BlockOutputStreamPtr & output_,
|
const BlockOutputStreamPtr & output_,
|
||||||
const Block & header_,
|
const Block & header_,
|
||||||
const Block & output_block_,
|
const Block & output_block_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnsDescription & columns_,
|
||||||
const Context & context_)
|
const Context & context_)
|
||||||
: output(output_), header(header_), output_block(output_block_),
|
: output(output_), header(header_), output_block(output_block_),
|
||||||
column_defaults(column_defaults_), context(context_)
|
columns(columns_), context(context_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ private:
|
|||||||
const Block header;
|
const Block header;
|
||||||
/// Blocks after this stream should have this structure
|
/// Blocks after this stream should have this structure
|
||||||
const Block output_block;
|
const Block output_block;
|
||||||
const ColumnDefaults column_defaults;
|
const ColumnsDescription columns;
|
||||||
const Context & context;
|
const Context & context;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -127,11 +127,13 @@ static MutableColumnPtr mixColumns(const ColumnWithTypeAndName & col_read,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(const BlockInputStreamPtr & input,
|
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(
|
||||||
const ColumnDefaults & column_defaults_,
|
const BlockInputStreamPtr & input,
|
||||||
const Context & context_)
|
const ColumnsDescription & columns_,
|
||||||
: column_defaults(column_defaults_),
|
const Context & context_)
|
||||||
context(context_)
|
: columns(columns_)
|
||||||
|
, column_defaults(columns.getDefaults())
|
||||||
|
, context(context_)
|
||||||
{
|
{
|
||||||
children.push_back(input);
|
children.push_back(input);
|
||||||
header = input->getHeader();
|
header = input->getHeader();
|
||||||
@ -169,7 +171,7 @@ Block AddingDefaultsBlockInputStream::readImpl()
|
|||||||
if (!evaluate_block.columns())
|
if (!evaluate_block.columns())
|
||||||
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
|
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
|
||||||
|
|
||||||
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false);
|
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
|
||||||
|
|
||||||
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
|
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <DataStreams/IBlockInputStream.h>
|
#include <DataStreams/IBlockInputStream.h>
|
||||||
#include <Storages/ColumnDefault.h>
|
#include <Storages/ColumnsDescription.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -15,7 +15,7 @@ class AddingDefaultsBlockInputStream : public IBlockInputStream
|
|||||||
public:
|
public:
|
||||||
AddingDefaultsBlockInputStream(
|
AddingDefaultsBlockInputStream(
|
||||||
const BlockInputStreamPtr & input,
|
const BlockInputStreamPtr & input,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnsDescription & columns_,
|
||||||
const Context & context_);
|
const Context & context_);
|
||||||
|
|
||||||
String getName() const override { return "AddingDefaults"; }
|
String getName() const override { return "AddingDefaults"; }
|
||||||
@ -26,6 +26,7 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
Block header;
|
Block header;
|
||||||
|
const ColumnsDescription columns;
|
||||||
const ColumnDefaults column_defaults;
|
const ColumnDefaults column_defaults;
|
||||||
const Context & context;
|
const Context & context;
|
||||||
};
|
};
|
||||||
|
@ -64,9 +64,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
|
|||||||
{
|
{
|
||||||
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
|
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id, context);
|
||||||
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
|
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
|
||||||
auto column_defaults = metadata_snapshot->getColumns().getDefaults();
|
const auto & columns = metadata_snapshot->getColumns();
|
||||||
if (!column_defaults.empty())
|
if (columns.hasDefaults())
|
||||||
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
|
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, columns, context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
#include <DataStreams/AddingDefaultBlockOutputStream.h>
|
|
||||||
#include <DataStreams/ConvertingBlockInputStream.h>
|
#include <DataStreams/ConvertingBlockInputStream.h>
|
||||||
#include <DataStreams/PushingToViewsBlockOutputStream.h>
|
#include <DataStreams/PushingToViewsBlockOutputStream.h>
|
||||||
#include <DataStreams/SquashingBlockInputStream.h>
|
#include <DataStreams/SquashingBlockInputStream.h>
|
||||||
|
@ -348,7 +348,7 @@ BlockIO InterpreterInsertQuery::execute()
|
|||||||
/// Actually we don't know structure of input blocks from query/table,
|
/// Actually we don't know structure of input blocks from query/table,
|
||||||
/// because some clients break insertion protocol (columns != header)
|
/// because some clients break insertion protocol (columns != header)
|
||||||
out = std::make_shared<AddingDefaultBlockOutputStream>(
|
out = std::make_shared<AddingDefaultBlockOutputStream>(
|
||||||
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns().getDefaults(), context);
|
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns(), context);
|
||||||
|
|
||||||
/// It's important to squash blocks as early as possible (before other transforms),
|
/// It's important to squash blocks as early as possible (before other transforms),
|
||||||
/// because other transforms may work inefficient if block size is small.
|
/// because other transforms may work inefficient if block size is small.
|
||||||
|
@ -6,16 +6,17 @@
|
|||||||
#include <Columns/ColumnArray.h>
|
#include <Columns/ColumnArray.h>
|
||||||
#include <Interpreters/inplaceBlockConversions.h>
|
#include <Interpreters/inplaceBlockConversions.h>
|
||||||
#include <Core/Block.h>
|
#include <Core/Block.h>
|
||||||
#include <Storages/ColumnDefault.h>
|
#include <Storages/ColumnsDescription.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
Block addMissingDefaults(const Block & block,
|
Block addMissingDefaults(
|
||||||
const NamesAndTypesList & required_columns,
|
const Block & block,
|
||||||
const ColumnDefaults & column_defaults,
|
const NamesAndTypesList & required_columns,
|
||||||
const Context & context)
|
const ColumnsDescription & columns,
|
||||||
|
const Context & context)
|
||||||
{
|
{
|
||||||
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
|
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
|
||||||
/// First, remember the offset columns for all arrays in the block.
|
/// First, remember the offset columns for all arrays in the block.
|
||||||
@ -49,7 +50,7 @@ Block addMissingDefaults(const Block & block,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (column_defaults.count(column.name))
|
if (columns.hasDefault(column.name))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
String offsets_name = Nested::extractTableName(column.name);
|
String offsets_name = Nested::extractTableName(column.name);
|
||||||
@ -72,8 +73,8 @@ Block addMissingDefaults(const Block & block,
|
|||||||
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
|
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Computes explicitly specified values (in column_defaults) by default and materialized columns.
|
/// Computes explicitly specified values by default and materialized columns.
|
||||||
evaluateMissingDefaults(res, required_columns, column_defaults, context);
|
evaluateMissingDefaults(res, required_columns, columns, context);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ namespace DB
|
|||||||
class Block;
|
class Block;
|
||||||
class Context;
|
class Context;
|
||||||
class NamesAndTypesList;
|
class NamesAndTypesList;
|
||||||
struct ColumnDefault;
|
class ColumnsDescription;
|
||||||
|
|
||||||
/** Adds three types of columns into block
|
/** Adds three types of columns into block
|
||||||
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
|
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
|
||||||
@ -21,7 +21,7 @@ struct ColumnDefault;
|
|||||||
Block addMissingDefaults(
|
Block addMissingDefaults(
|
||||||
const Block & block,
|
const Block & block,
|
||||||
const NamesAndTypesList & required_columns,
|
const NamesAndTypesList & required_columns,
|
||||||
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
|
const ColumnsDescription & columns,
|
||||||
const Context & context);
|
const Context & context);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
#include <Core/Block.h>
|
#include <Core/Block.h>
|
||||||
#include <Parsers/queryToString.h>
|
#include <Parsers/queryToString.h>
|
||||||
#include <Storages/ColumnDefault.h>
|
|
||||||
#include <Interpreters/TreeRewriter.h>
|
#include <Interpreters/TreeRewriter.h>
|
||||||
#include <Interpreters/ExpressionAnalyzer.h>
|
#include <Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <Interpreters/ExpressionActions.h>
|
#include <Interpreters/ExpressionActions.h>
|
||||||
@ -14,6 +13,8 @@
|
|||||||
#include <utility>
|
#include <utility>
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
||||||
|
#include <Common/checkStackSize.h>
|
||||||
|
#include <Storages/ColumnsDescription.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -22,46 +23,46 @@ namespace DB
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults)
|
/// Add all required expressions for missing columns calculation
|
||||||
|
void addDefaultRequiredExpressionsRecursively(Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
|
||||||
|
{
|
||||||
|
checkStackSize();
|
||||||
|
if (block.has(required_column) || added_columns.count(required_column))
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto column_default = columns.getDefault(required_column);
|
||||||
|
|
||||||
|
if (column_default)
|
||||||
|
{
|
||||||
|
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
|
||||||
|
auto column_default_expr = column_default->expression->clone();
|
||||||
|
|
||||||
|
/// Our default may depend on columns with default expr which not present in block
|
||||||
|
/// we have to add them to block too
|
||||||
|
RequiredSourceColumnsVisitor::Data columns_context;
|
||||||
|
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
|
||||||
|
NameSet required_columns_names = columns_context.requiredColumns();
|
||||||
|
|
||||||
|
auto cast_func = makeASTFunction("CAST", column_default_expr, std::make_shared<ASTLiteral>(columns.get(required_column).type->getName()));
|
||||||
|
default_expr_list_accum->children.emplace_back(setAlias(cast_func, required_column));
|
||||||
|
added_columns.emplace(required_column);
|
||||||
|
|
||||||
|
for (const auto & required_column_name : required_columns_names)
|
||||||
|
addDefaultRequiredExpressionsRecursively(block, required_column_name, columns, default_expr_list_accum, added_columns);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
|
||||||
{
|
{
|
||||||
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
||||||
|
|
||||||
|
NameSet added_columns;
|
||||||
for (const auto & column : required_columns)
|
for (const auto & column : required_columns)
|
||||||
{
|
addDefaultRequiredExpressionsRecursively(block, column.name, columns, default_expr_list, added_columns);
|
||||||
if (block.has(column.name))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
const auto it = column_defaults.find(column.name);
|
|
||||||
|
|
||||||
if (it != column_defaults.end())
|
|
||||||
{
|
|
||||||
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
|
|
||||||
auto column_default_expr = it->second.expression->clone();
|
|
||||||
|
|
||||||
/// Our default may depend on columns with ALIAS as default expr which not present in block
|
|
||||||
/// we can easily add them from column_defaults struct
|
|
||||||
RequiredSourceColumnsVisitor::Data columns_context;
|
|
||||||
RequiredSourceColumnsVisitor(columns_context).visit(column_default_expr);
|
|
||||||
NameSet required_columns_names = columns_context.requiredColumns();
|
|
||||||
|
|
||||||
for (const auto & required_column_name : required_columns_names)
|
|
||||||
{
|
|
||||||
/// If we have such default column and it's alias than we should
|
|
||||||
/// add it into default_expression_list
|
|
||||||
if (auto rit = column_defaults.find(required_column_name);
|
|
||||||
rit != column_defaults.end() && rit->second.kind == ColumnDefaultKind::Alias)
|
|
||||||
{
|
|
||||||
default_expr_list->children.emplace_back(setAlias(rit->second.expression->clone(), required_column_name));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto cast_func = makeASTFunction("CAST", column_default_expr, std::make_shared<ASTLiteral>(column.type->getName()));
|
|
||||||
default_expr_list->children.emplace_back(setAlias(cast_func, it->first));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (default_expr_list->children.empty())
|
if (default_expr_list->children.empty())
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|
||||||
return default_expr_list;
|
return default_expr_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,13 +162,13 @@ void performRequiredConversions(Block & block, const NamesAndTypesList & require
|
|||||||
|
|
||||||
void evaluateMissingDefaults(Block & block,
|
void evaluateMissingDefaults(Block & block,
|
||||||
const NamesAndTypesList & required_columns,
|
const NamesAndTypesList & required_columns,
|
||||||
const ColumnDefaults & column_defaults,
|
const ColumnsDescription & columns,
|
||||||
const Context & context, bool save_unneeded_columns)
|
const Context & context, bool save_unneeded_columns)
|
||||||
{
|
{
|
||||||
if (column_defaults.empty())
|
if (!columns.hasDefaults())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, column_defaults);
|
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, columns);
|
||||||
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
|
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,13 +10,13 @@ namespace DB
|
|||||||
class Block;
|
class Block;
|
||||||
class Context;
|
class Context;
|
||||||
class NamesAndTypesList;
|
class NamesAndTypesList;
|
||||||
struct ColumnDefault;
|
class ColumnsDescription;
|
||||||
|
|
||||||
/// Adds missing defaults to block according to required_columns
|
/// Adds missing defaults to block according to required_columns
|
||||||
/// using column_defaults map
|
/// using columns description
|
||||||
void evaluateMissingDefaults(Block & block,
|
void evaluateMissingDefaults(Block & block,
|
||||||
const NamesAndTypesList & required_columns,
|
const NamesAndTypesList & required_columns,
|
||||||
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
|
const ColumnsDescription & columns,
|
||||||
const Context & context, bool save_unneeded_columns = true);
|
const Context & context, bool save_unneeded_columns = true);
|
||||||
|
|
||||||
/// Tries to convert columns in block to required_columns
|
/// Tries to convert columns in block to required_columns
|
||||||
|
@ -8,10 +8,10 @@ namespace DB
|
|||||||
AddingMissedTransform::AddingMissedTransform(
|
AddingMissedTransform::AddingMissedTransform(
|
||||||
Block header_,
|
Block header_,
|
||||||
Block result_header_,
|
Block result_header_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnsDescription & columns_,
|
||||||
const Context & context_)
|
const Context & context_)
|
||||||
: ISimpleTransform(std::move(header_), std::move(result_header_), false)
|
: ISimpleTransform(std::move(header_), std::move(result_header_), false)
|
||||||
, column_defaults(column_defaults_), context(context_)
|
, columns(columns_), context(context_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -20,7 +20,7 @@ void AddingMissedTransform::transform(Chunk & chunk)
|
|||||||
auto num_rows = chunk.getNumRows();
|
auto num_rows = chunk.getNumRows();
|
||||||
Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
||||||
|
|
||||||
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), column_defaults, context);
|
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), columns, context);
|
||||||
chunk.setColumns(res.getColumns(), num_rows);
|
chunk.setColumns(res.getColumns(), num_rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Processors/ISimpleTransform.h>
|
#include <Processors/ISimpleTransform.h>
|
||||||
#include <Storages/ColumnDefault.h>
|
#include <Storages/ColumnsDescription.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -20,7 +20,7 @@ public:
|
|||||||
AddingMissedTransform(
|
AddingMissedTransform(
|
||||||
Block header_,
|
Block header_,
|
||||||
Block result_header_,
|
Block result_header_,
|
||||||
const ColumnDefaults & column_defaults_,
|
const ColumnsDescription & columns_,
|
||||||
const Context & context_);
|
const Context & context_);
|
||||||
|
|
||||||
String getName() const override { return "AddingMissed"; }
|
String getName() const override { return "AddingMissed"; }
|
||||||
@ -28,7 +28,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
void transform(Chunk &) override;
|
void transform(Chunk &) override;
|
||||||
|
|
||||||
const ColumnDefaults column_defaults;
|
const ColumnsDescription columns;
|
||||||
const Context & context;
|
const Context & context;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -378,6 +378,14 @@ bool ColumnsDescription::hasPhysical(const String & column_name) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool ColumnsDescription::hasDefaults() const
|
||||||
|
{
|
||||||
|
for (const auto & column : columns)
|
||||||
|
if (column.default_desc.expression)
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ColumnDefaults ColumnsDescription::getDefaults() const
|
ColumnDefaults ColumnsDescription::getDefaults() const
|
||||||
{
|
{
|
||||||
ColumnDefaults ret;
|
ColumnDefaults ret;
|
||||||
|
@ -109,6 +109,7 @@ public:
|
|||||||
|
|
||||||
ColumnDefaults getDefaults() const; /// TODO: remove
|
ColumnDefaults getDefaults() const; /// TODO: remove
|
||||||
bool hasDefault(const String & column_name) const;
|
bool hasDefault(const String & column_name) const;
|
||||||
|
bool hasDefaults() const;
|
||||||
std::optional<ColumnDefault> getDefault(const String & column_name) const;
|
std::optional<ColumnDefault> getDefault(const String & column_name) const;
|
||||||
|
|
||||||
/// Does column has non default specified compression codec
|
/// Does column has non default specified compression codec
|
||||||
|
@ -180,7 +180,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
|
|||||||
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
||||||
}
|
}
|
||||||
|
|
||||||
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns().getDefaults(), storage.global_context);
|
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
|
||||||
|
|
||||||
/// Move columns from block.
|
/// Move columns from block.
|
||||||
name_and_type = columns.begin();
|
name_and_type = columns.begin();
|
||||||
|
@ -224,7 +224,7 @@ Pipe StorageBuffer::read(
|
|||||||
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
|
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
|
||||||
{
|
{
|
||||||
return std::make_shared<AddingMissedTransform>(stream_header, header_after_adding_defaults,
|
return std::make_shared<AddingMissedTransform>(stream_header, header_after_adding_defaults,
|
||||||
metadata_snapshot->getColumns().getDefaults(), context);
|
metadata_snapshot->getColumns(), context);
|
||||||
});
|
});
|
||||||
|
|
||||||
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
|
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
|
||||||
|
@ -234,12 +234,12 @@ public:
|
|||||||
const Context & context_,
|
const Context & context_,
|
||||||
UInt64 max_block_size_,
|
UInt64 max_block_size_,
|
||||||
FilesInfoPtr files_info_,
|
FilesInfoPtr files_info_,
|
||||||
ColumnDefaults column_defaults_)
|
ColumnsDescription columns_description_)
|
||||||
: SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column))
|
: SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column))
|
||||||
, storage(std::move(storage_))
|
, storage(std::move(storage_))
|
||||||
, metadata_snapshot(metadata_snapshot_)
|
, metadata_snapshot(metadata_snapshot_)
|
||||||
, files_info(std::move(files_info_))
|
, files_info(std::move(files_info_))
|
||||||
, column_defaults(std::move(column_defaults_))
|
, columns_description(std::move(columns_description_))
|
||||||
, context(context_)
|
, context(context_)
|
||||||
, max_block_size(max_block_size_)
|
, max_block_size(max_block_size_)
|
||||||
{
|
{
|
||||||
@ -314,8 +314,8 @@ public:
|
|||||||
reader = FormatFactory::instance().getInput(
|
reader = FormatFactory::instance().getInput(
|
||||||
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
|
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
|
||||||
|
|
||||||
if (!column_defaults.empty())
|
if (columns_description.hasDefaults())
|
||||||
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
|
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_description, context);
|
||||||
|
|
||||||
reader->readPrefix();
|
reader->readPrefix();
|
||||||
}
|
}
|
||||||
@ -366,7 +366,7 @@ private:
|
|||||||
std::unique_ptr<ReadBuffer> read_buf;
|
std::unique_ptr<ReadBuffer> read_buf;
|
||||||
BlockInputStreamPtr reader;
|
BlockInputStreamPtr reader;
|
||||||
|
|
||||||
ColumnDefaults column_defaults;
|
ColumnsDescription columns_description;
|
||||||
|
|
||||||
const Context & context; /// TODO Untangle potential issues with context lifetime.
|
const Context & context; /// TODO Untangle potential issues with context lifetime.
|
||||||
UInt64 max_block_size;
|
UInt64 max_block_size;
|
||||||
@ -417,7 +417,7 @@ Pipe StorageFile::read(
|
|||||||
|
|
||||||
for (size_t i = 0; i < num_streams; ++i)
|
for (size_t i = 0; i < num_streams; ++i)
|
||||||
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
||||||
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
|
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns()));
|
||||||
|
|
||||||
return Pipe::unitePipes(std::move(pipes));
|
return Pipe::unitePipes(std::move(pipes));
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ namespace
|
|||||||
String name_,
|
String name_,
|
||||||
const Block & sample_block,
|
const Block & sample_block,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
const ColumnDefaults & column_defaults,
|
const ColumnsDescription & columns,
|
||||||
UInt64 max_block_size,
|
UInt64 max_block_size,
|
||||||
const CompressionMethod compression_method,
|
const CompressionMethod compression_method,
|
||||||
const std::shared_ptr<Aws::S3::S3Client> & client,
|
const std::shared_ptr<Aws::S3::S3Client> & client,
|
||||||
@ -86,8 +86,8 @@ namespace
|
|||||||
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
|
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
|
||||||
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
|
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
|
||||||
|
|
||||||
if (!column_defaults.empty())
|
if (columns.hasDefaults())
|
||||||
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
|
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
String getName() const override
|
String getName() const override
|
||||||
@ -312,7 +312,7 @@ Pipe StorageS3::read(
|
|||||||
getName(),
|
getName(),
|
||||||
metadata_snapshot->getSampleBlock(),
|
metadata_snapshot->getSampleBlock(),
|
||||||
context,
|
context,
|
||||||
metadata_snapshot->getColumns().getDefaults(),
|
metadata_snapshot->getColumns(),
|
||||||
max_block_size,
|
max_block_size,
|
||||||
chooseCompressionMethod(uri.endpoint, compression_method),
|
chooseCompressionMethod(uri.endpoint, compression_method),
|
||||||
client,
|
client,
|
||||||
|
@ -61,7 +61,7 @@ namespace
|
|||||||
String name_,
|
String name_,
|
||||||
const Block & sample_block,
|
const Block & sample_block,
|
||||||
const Context & context,
|
const Context & context,
|
||||||
const ColumnDefaults & column_defaults,
|
const ColumnsDescription & columns,
|
||||||
UInt64 max_block_size,
|
UInt64 max_block_size,
|
||||||
const ConnectionTimeouts & timeouts,
|
const ConnectionTimeouts & timeouts,
|
||||||
const CompressionMethod compression_method)
|
const CompressionMethod compression_method)
|
||||||
@ -81,7 +81,7 @@ namespace
|
|||||||
compression_method);
|
compression_method);
|
||||||
|
|
||||||
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
|
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
|
||||||
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
|
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
String getName() const override
|
String getName() const override
|
||||||
@ -201,7 +201,7 @@ Pipe IStorageURLBase::read(
|
|||||||
getName(),
|
getName(),
|
||||||
getHeaderBlock(column_names, metadata_snapshot),
|
getHeaderBlock(column_names, metadata_snapshot),
|
||||||
context,
|
context,
|
||||||
metadata_snapshot->getColumns().getDefaults(),
|
metadata_snapshot->getColumns(),
|
||||||
max_block_size,
|
max_block_size,
|
||||||
ConnectionTimeouts::getHTTPTimeouts(context),
|
ConnectionTimeouts::getHTTPTimeouts(context),
|
||||||
chooseCompressionMethod(request_uri.getPath(), compression_method)));
|
chooseCompressionMethod(request_uri.getPath(), compression_method)));
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1 [] [] [] 0
|
||||||
|
CREATE TABLE default.defaults_on_defaults\n(\n `key` UInt64,\n `Arr.C1` Array(UInt32) DEFAULT emptyArrayUInt32(),\n `Arr.C2` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C1)),\n `Arr.C3` Array(UInt32) ALIAS arrayResize(emptyArrayUInt32(), length(Arr.C2)),\n `Arr.C4` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C3)),\n `ArrLen` UInt64 DEFAULT length(Arr.C4)\n)\nENGINE = MergeTree()\nORDER BY tuple()\nSETTINGS index_granularity = 8192
|
||||||
|
1
|
@ -0,0 +1,36 @@
|
|||||||
|
DROP TABLE IF EXISTS defaults_on_defaults;
|
||||||
|
CREATE TABLE defaults_on_defaults (
|
||||||
|
key UInt64
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
ORDER BY tuple();
|
||||||
|
|
||||||
|
INSERT INTO defaults_on_defaults values (1);
|
||||||
|
|
||||||
|
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C1` Array(UInt32) DEFAULT emptyArrayUInt32();
|
||||||
|
|
||||||
|
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C2` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C1));
|
||||||
|
|
||||||
|
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C3` Array(UInt32) ALIAS arrayResize(emptyArrayUInt32(), length(Arr.C2));
|
||||||
|
|
||||||
|
SELECT 1 from defaults_on_defaults where length(`Arr.C2`) = 0;
|
||||||
|
|
||||||
|
SELECT 1 from defaults_on_defaults where length(`Arr.C3`) = 0;
|
||||||
|
|
||||||
|
ALTER TABLE defaults_on_defaults ADD COLUMN `Arr.C4` Array(UInt32) DEFAULT arrayResize(emptyArrayUInt32(), length(Arr.C3));
|
||||||
|
|
||||||
|
SELECT 1 from defaults_on_defaults where length(`Arr.C4`) = 0;
|
||||||
|
|
||||||
|
ALTER TABLE defaults_on_defaults ADD COLUMN `ArrLen` UInt64 DEFAULT length(Arr.C4);
|
||||||
|
|
||||||
|
SELECT 1 from defaults_on_defaults where ArrLen = 0;
|
||||||
|
|
||||||
|
SELECT * from defaults_on_defaults where ArrLen = 0;
|
||||||
|
|
||||||
|
SHOW CREATE TABLE defaults_on_defaults;
|
||||||
|
|
||||||
|
OPTIMIZE TABLE defaults_on_defaults FINAL;
|
||||||
|
|
||||||
|
SELECT 1 from defaults_on_defaults where length(`Arr.C4`) = 0;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS defaults_on_defaults;
|
Loading…
Reference in New Issue
Block a user