diff --git a/dbms/src/Processors/Transforms/ConvertingTransform.cpp b/dbms/src/Processors/Transforms/ConvertingTransform.cpp new file mode 100644 index 00000000000..0aa38313d52 --- /dev/null +++ b/dbms/src/Processors/Transforms/ConvertingTransform.cpp @@ -0,0 +1,125 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int THERE_IS_NO_COLUMN; + extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; + extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; +} + +static ColumnPtr castColumnWithDiagnostic( + const ColumnWithTypeAndName & src_elem, + const ColumnWithTypeAndName & res_elem, + const Context & context) +{ + try + { + return castColumn(src_elem, res_elem.type, context); + } + catch (Exception & e) + { + e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) + + " to destination column " + backQuoteIfNeed(res_elem.name)); + throw; + } +} + +ConvertingTransform::ConvertingTransform( + Block source_header, + Block result_header, + MatchColumnsMode mode, + const Context & context) + : ISimpleTransform(std::move(source_header), std::move(result_header), false) + , context(context) +{ + auto & source = getInputPort().getHeader(); + auto & result = getOutputPort().getHeader(); + + size_t num_input_columns = source.columns(); + size_t num_result_columns = result.columns(); + + if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns) + throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH); + + for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num) + { + const auto & res_elem = result_header.getByPosition(result_col_num); + + switch (mode) + { + case MatchColumnsMode::Position: + conversion[result_col_num] = result_col_num; + break; + + case MatchColumnsMode::Name: + if (source.has(res_elem.name)) + conversion[result_col_num] = source.getPositionByName(res_elem.name); + else + throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream", + ErrorCodes::THERE_IS_NO_COLUMN); + break; + } + + const auto & src_elem = source.getByPosition(conversion[result_col_num]); + + /// Check constants. + + if (auto * res_const = typeid_cast(res_elem.column.get())) + { + if (auto * src_const = typeid_cast(src_elem.column.get())) + { + if (res_const->getField() != src_const->getField()) + throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because " + "it is constant but values of constants are different in source and result", + ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); + } + else + throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because " + "it is non constant in source stream but must be constant in result", + ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); + } + + /// Check conversion by dry run CAST function. + + castColumnWithDiagnostic(src_elem, res_elem, context); + } +} + +void ConvertingTransform::transform(Chunk & chunk) +{ + auto & source = getInputPort().getHeader(); + auto & result = getOutputPort().getHeader(); + + auto num_rows = chunk.getNumRows(); + auto src_columns = chunk.detachColumns(); + + size_t num_res_columns = conversion.size(); + + Columns res_columns; + res_columns.reserve(num_res_columns); + + for (size_t res_pos = 0; res_pos < num_res_columns; ++res_pos) + { + const auto & src_elem = source.getByPosition(conversion[res_pos]); + auto res_elem = result.getByPosition(res_pos); + + ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context); + + if (!res_elem.column->isColumnConst()) + converted = converted->convertToFullColumnIfConst(); + + res_columns.emplace_back(std::move(converted)); + } + + chunk.setColumns(std::move(res_columns), num_rows); +} + +} diff --git a/dbms/src/Processors/Transforms/ConvertingTransform.h b/dbms/src/Processors/Transforms/ConvertingTransform.h new file mode 100644 index 00000000000..d6e6219316a --- /dev/null +++ b/dbms/src/Processors/Transforms/ConvertingTransform.h @@ -0,0 +1,49 @@ +#include +#include + +namespace DB +{ + +/** Convert one block structure to another: + * + * Leaves only necessary columns; + * + * Columns are searched in source first by name; + * and if there is no column with same name, then by position. + * + * Converting types of matching columns (with CAST function). + * + * Materializing columns which are const in source and non-const in result, + * throw if they are const in result and non const in source, + * or if they are const and have different values. + */ +class ConvertingTransform : public ISimpleTransform +{ +public: + enum class MatchColumnsMode + { + /// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names. + Position, + /// Find columns in source by their names. Allow excessive columns in source. + Name, + }; + + ConvertingTransform( + Block source_header, + Block result_header, + MatchColumnsMode mode, + const Context & context); + + String getName() const override { return "Converting"; } + +protected: + void transform(Chunk & chunk) override; + +private: + const Context & context; + + /// How to construct result block. Position in source block, where to get each column. + ColumnNumbers conversion; +}; + +}