Added ConvertingTransform.

This commit is contained in:
Nikolai Kochetov 2019-03-25 20:30:57 +03:00
parent 827511c96d
commit 4b068ebd9c
2 changed files with 174 additions and 0 deletions

View File

@ -0,0 +1,125 @@
#include <Processors/Transforms/ConvertingTransform.h>
#include <Interpreters/castColumn.h>
#include <Columns/ColumnConst.h>
#include <Parsers/IAST.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int THERE_IS_NO_COLUMN;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
static ColumnPtr castColumnWithDiagnostic(
const ColumnWithTypeAndName & src_elem,
const ColumnWithTypeAndName & res_elem,
const Context & context)
{
try
{
return castColumn(src_elem, res_elem.type, context);
}
catch (Exception & e)
{
e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) +
" to destination column " + backQuoteIfNeed(res_elem.name));
throw;
}
}
ConvertingTransform::ConvertingTransform(
Block source_header,
Block result_header,
MatchColumnsMode mode,
const Context & context)
: ISimpleTransform(std::move(source_header), std::move(result_header), false)
, context(context)
{
auto & source = getInputPort().getHeader();
auto & result = getOutputPort().getHeader();
size_t num_input_columns = source.columns();
size_t num_result_columns = result.columns();
if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
{
const auto & res_elem = result_header.getByPosition(result_col_num);
switch (mode)
{
case MatchColumnsMode::Position:
conversion[result_col_num] = result_col_num;
break;
case MatchColumnsMode::Name:
if (source.has(res_elem.name))
conversion[result_col_num] = source.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
}
const auto & src_elem = source.getByPosition(conversion[result_col_num]);
/// Check constants.
if (auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get()))
{
if (auto * src_const = typeid_cast<const ColumnConst *>(src_elem.column.get()))
{
if (res_const->getField() != src_const->getField())
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
"it is constant but values of constants are different in source and result",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
else
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
"it is non constant in source stream but must be constant in result",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
/// Check conversion by dry run CAST function.
castColumnWithDiagnostic(src_elem, res_elem, context);
}
}
void ConvertingTransform::transform(Chunk & chunk)
{
auto & source = getInputPort().getHeader();
auto & result = getOutputPort().getHeader();
auto num_rows = chunk.getNumRows();
auto src_columns = chunk.detachColumns();
size_t num_res_columns = conversion.size();
Columns res_columns;
res_columns.reserve(num_res_columns);
for (size_t res_pos = 0; res_pos < num_res_columns; ++res_pos)
{
const auto & src_elem = source.getByPosition(conversion[res_pos]);
auto res_elem = result.getByPosition(res_pos);
ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);
if (!res_elem.column->isColumnConst())
converted = converted->convertToFullColumnIfConst();
res_columns.emplace_back(std::move(converted));
}
chunk.setColumns(std::move(res_columns), num_rows);
}
}

View File

@ -0,0 +1,49 @@
#include <Processors/ISimpleTransform.h>
#include <Core/ColumnNumbers.h>
namespace DB
{
/** Convert one block structure to another:
*
* Leaves only necessary columns;
*
* Columns are searched in source first by name;
* and if there is no column with same name, then by position.
*
* Converting types of matching columns (with CAST function).
*
* Materializing columns which are const in source and non-const in result,
* throw if they are const in result and non const in source,
* or if they are const and have different values.
*/
class ConvertingTransform : public ISimpleTransform
{
public:
enum class MatchColumnsMode
{
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
Position,
/// Find columns in source by their names. Allow excessive columns in source.
Name,
};
ConvertingTransform(
Block source_header,
Block result_header,
MatchColumnsMode mode,
const Context & context);
String getName() const override { return "Converting"; }
protected:
void transform(Chunk & chunk) override;
private:
const Context & context;
/// How to construct result block. Position in source block, where to get each column.
ColumnNumbers conversion;
};
}