Fixed Enum usage in PK expressions. [#METR-23473]

Added check for duplicates of column names in CREATE query.
This commit is contained in:
Vitaliy Lyudvichenko 2016-11-21 15:58:42 +03:00
parent 4cab31a60e
commit e07fc7cea5
9 changed files with 218 additions and 21 deletions

View File

@ -16,9 +16,17 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
class IDataTypeEnum : public IDataType
{
public:
virtual Field castToName(const Field & value_or_name) const = 0;
virtual Field castToValue(const Field & value_or_name) const = 0;
};
template <typename Type>
class DataTypeEnum final : public IDataType
class DataTypeEnum final : public IDataTypeEnum
{
public:
using FieldType = Type;
@ -70,6 +78,10 @@ public:
return it->second;
}
Field castToName(const Field & value_or_name) const override;
Field castToValue(const Field & value_or_name) const override;
DataTypePtr clone() const override;
void serializeBinary(const Field & field, WriteBuffer & ostr) const override;

View File

@ -223,7 +223,8 @@ public:
static const AtomMap atom_map;
/// Не учитывает секцию SAMPLE. all_columns - набор всех столбцов таблицы.
PKCondition(ASTPtr & query, const Context & context, const NamesAndTypesList & all_columns, const SortDescription & sort_descr);
PKCondition(ASTPtr & query, const Context & context, const NamesAndTypesList & all_columns, const SortDescription & sort_descr,
const Block & pk_sample_block);
/// Выполнимо ли условие в диапазоне ключей.
/// left_pk и right_pk должны содержать все поля из sort_descr в соответствующем порядке.
@ -332,6 +333,7 @@ private:
SortDescription sort_descr;
ColumnIndices pk_columns;
const Block & pk_sample_block;
};
}

View File

@ -1,6 +1,7 @@
#include <DB/IO/WriteBufferFromString.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <limits>
namespace DB
{
@ -239,6 +240,49 @@ Field DataTypeEnum<Type>::getDefault() const
return typename NearestFieldType<FieldType>::Type(values.front().second);
}
template <typename Type>
static void checkOverflow(Int64 value)
{
if (!(std::numeric_limits<Type>::min() <= value && value <= std::numeric_limits<Type>::max()))
throw Exception("DataTypeEnum: Unexpected value " + toString(value), ErrorCodes::BAD_TYPE_OF_FIELD);
}
template <typename Type>
Field DataTypeEnum<Type>::castToName(const Field & value_or_name) const
{
if (value_or_name.getType() == Field::Types::String)
{
getValue(value_or_name.get<String>()); /// Check correctness
return value_or_name.get<String>();
}
else if (value_or_name.getType() == Field::Types::Int64)
{
Int64 value = value_or_name.get<Int64>();
checkOverflow<Type>(value);
return getNameForValue(static_cast<Type>(value)).toString();
}
else
throw Exception(String("DataTypeEnum: Unsupported type of field ") + value_or_name.getTypeName(), ErrorCodes::BAD_TYPE_OF_FIELD);
}
template <typename Type>
Field DataTypeEnum<Type>::castToValue(const Field & value_or_name) const
{
if (value_or_name.getType() == Field::Types::String)
{
return static_cast<Int64>(getValue(value_or_name.get<String>()));
}
else if (value_or_name.getType() == Field::Types::Int64)
{
Int64 value = value_or_name.get<Int64>();
checkOverflow<Type>(value);
getNameForValue(static_cast<Type>(value)); /// Check correctness
return value;
}
else
throw Exception(String("DataTypeEnum: Unsupported type of field ") + value_or_name.getTypeName(), ErrorCodes::BAD_TYPE_OF_FIELD);
}
/// Явные инстанцирования.
template class DataTypeEnum<Int8>;

View File

@ -52,6 +52,7 @@ namespace ErrorCodes
extern const int ENGINE_REQUIRED;
extern const int TABLE_METADATA_ALREADY_EXISTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int DUPLICATE_COLUMN;
}
@ -395,6 +396,21 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
create.children.push_back(new_columns);
create.columns = new_columns;
/// Check for duplicates
std::set<String> all_columns;
auto check_column_already_exists = [&all_columns](const NameAndTypePair & column_name_and_type)
{
if (!all_columns.emplace(column_name_and_type.name).second)
throw Exception("Column " + backQuoteIfNeed(column_name_and_type.name) + " already exists", ErrorCodes::DUPLICATE_COLUMN);
};
for (const auto & elem : *res.columns)
check_column_already_exists(elem);
for (const auto & elem : res.materialized_columns)
check_column_already_exists(elem);
for (const auto & elem : res.alias_columns)
check_column_already_exists(elem);
return res;
}

View File

@ -33,6 +33,7 @@
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/Common/VirtualColumnUtils.h>
@ -194,9 +195,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
SortDescription sort_descr = data.getSortDescription();
PKCondition key_condition(query, context, available_real_and_virtual_columns, sort_descr);
Block pk_sample_block = data.getPrimaryExpression()->getSampleBlock();
PKCondition key_condition(query, context, available_real_and_virtual_columns, sort_descr, pk_sample_block);
PKCondition date_condition(query, context, available_real_and_virtual_columns,
SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
SortDescription(1, SortColumnDescription(data.date_column_name, 1)), pk_sample_block);
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{

View File

@ -33,7 +33,7 @@ BlocksWithDateIntervals MergeTreeDataWriter::splitBlockIntoParts(const Block & b
/// Минимальная и максимальная дата.
UInt16 min_date = std::numeric_limits<UInt16>::max();
UInt16 max_date = std::numeric_limits<UInt16>::min();
for (ColumnUInt16::Container_t::const_iterator it = dates.begin(); it != dates.end(); ++it)
for (auto it = dates.begin(); it != dates.end(); ++it)
{
if (*it < min_date)
min_date = *it;

View File

@ -2,10 +2,14 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/Columns/ColumnSet.h>
#include <DB/Columns/ColumnTuple.h>
#include <DB/Parsers/ASTSet.h>
#include <DB/Functions/FunctionFactory.h>
#include <DB/Core/FieldVisitors.h>
namespace DB
@ -187,8 +191,9 @@ Block PKCondition::getBlockWithConstants(
}
PKCondition::PKCondition(ASTPtr & query, const Context & context, const NamesAndTypesList & all_columns, const SortDescription & sort_descr_)
: sort_descr(sort_descr_)
PKCondition::PKCondition(ASTPtr & query, const Context & context, const NamesAndTypesList & all_columns,
const SortDescription & sort_descr_, const Block & pk_sample_block_)
: sort_descr(sort_descr_), pk_sample_block(pk_sample_block_)
{
for (size_t i = 0; i < sort_descr.size(); ++i)
{
@ -202,7 +207,7 @@ PKCondition::PKCondition(ASTPtr & query, const Context & context, const NamesAnd
*/
Block block_with_constants = getBlockWithConstants(query, context, all_columns);
/// Преобразуем секцию WHERE в обратную польскую строку.
/// Trasform WHERE section to Reverse Polish notation
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*query);
if (select.where_expression)
{
@ -348,6 +353,47 @@ bool PKCondition::isPrimaryKeyPossiblyWrappedByMonotonicFunctionsImpl(
}
/// NOTE: Keep in the mind that such behavior could be incompatible inside ordinary expression.
/// TODO: Use common methods for types conversions.
static bool tryCastValueToType(const DataTypePtr & desired_type, const DataTypePtr & src_type, Field & src_value)
{
if (desired_type->getName() == src_type->getName())
return true;
/// Try to correct type of constant for correct comparison
try
{
/// Convert String to Enum's value
if (auto data_type_enum = dynamic_cast<const IDataTypeEnum *>(desired_type.get()))
{
src_value = data_type_enum->castToValue(src_value);
}
/// Convert 'YYYY-MM-DD' Strings to Date
else if (typeid_cast<const DataTypeDate *>(desired_type.get()))
{
if (typeid_cast<const DataTypeString *>(src_type.get()))
{
src_value = stringToDateOrDateTime(src_value.safeGet<String>());
}
}
else if (desired_type->behavesAsNumber() && src_type->behavesAsNumber())
{
/// Ok, numeric types are mutually convertible
}
else
{
return false;
}
}
catch (...)
{
return false;
}
return true;
}
bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & block_with_constants, RPNElement & out)
{
/** Функции < > = != <= >= in notIn, у которых один агрумент константа, другой - один из столбцов первичного ключа,
@ -362,33 +408,32 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl
if (args.size() != 2)
return false;
/// Если true, слева константа.
bool inverted;
size_t column;
size_t data_arg_pos; /// Argument number of data (non-const argument)
size_t const_column_num; /// Column number of a constant in block_with_constants
RPNElement::MonotonicFunctionsChain chain;
if (getConstant(args[1], block_with_constants, value)
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, column, chain))
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, const_column_num, chain))
{
inverted = false;
data_arg_pos = 0;
}
else if (getConstant(args[0], block_with_constants, value)
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[1], context, column, chain))
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[1], context, const_column_num, chain))
{
inverted = true;
data_arg_pos = 1;
}
else if (typeid_cast<const ASTSet *>(args[1].get())
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, column, chain))
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, const_column_num, chain))
{
inverted = false;
data_arg_pos = 0;
}
else
return false;
std::string func_name = func->name;
/// Заменим <const> <sign> <column> на <column> <-sign> <const>
if (inverted)
/// Replace <const> <sign> <data> on to <data> <-sign> <const>
if (data_arg_pos == 1)
{
if (func_name == "less")
func_name = "greater";
@ -400,18 +445,30 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl
func_name = "greaterOrEquals";
else if (func_name == "in" || func_name == "notIn" || func_name == "like")
{
/// const IN x не имеет смысла (в отличие от x IN const).
/// "const IN data_column" doesn't make sense (unlike "data_column IN const")
return false;
}
}
out.key_column = column;
out.key_column = const_column_num;
out.monotonic_functions_chain = std::move(chain);
const auto atom_it = atom_map.find(func_name);
if (atom_it == std::end(atom_map))
return false;
std::string data_column_name = args[data_arg_pos]->getColumnName();
const DataTypePtr & data_column_type = pk_sample_block.getByName(data_column_name).type;
const DataTypePtr & const_type = block_with_constants.getByPosition(const_column_num).type;
if (!tryCastValueToType(data_column_type, const_type, value))
{
throw Exception("Primary key expression contains comparison between inconvertible types: " +
data_column_type->getName() + " and " + const_type->getName() +
" inside " + func->getColumnName(),
ErrorCodes::BAD_TYPE_OF_FIELD);
}
return atom_it->second(out, value, node);
}
else if (getConstant(node, block_with_constants, value)) /// Для случаев, когда написано, например, WHERE 0 AND something

View File

@ -0,0 +1,24 @@
17783603973940894060
17783603973940894060
1466602371144054938
1466602371144054938
7021055163408905169
7021055163408905169
15043609445345024647
15043609445345024647
17783603973940894060
17783603973940894060
17783603973940894060
17783603973940894060
15043609445345024647
15043609445345024647
1466602371144054938
1466602371144054938
1466602371144054938
1466602371144054938
3447905173014179293
3447905173014179293
3051197876967004596
3051197876967004596
463667963421364848
463667963421364848

View File

@ -0,0 +1,39 @@
DROP TABLE IF EXISTS test.enum_pk;
CREATE TABLE test.enum_pk (date Date DEFAULT '0000-00-00', x Enum8('0' = 0, '1' = 1, '2' = 2), d Enum8('0' = 0, '1' = 1, '2' = 2)) ENGINE = MergeTree(date, x, 1);
INSERT INTO test.enum_pk (x, d) VALUES ('0', '0')('1', '1')('0', '0')('1', '1')('1', '1')('0', '0')('0', '0')('2', '2')('0', '0')('1', '1')('1', '1')('1', '1')('1', '1')('0', '0');
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x = '0';
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d = '0';
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x != '0';
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d != '0';
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x = '1';
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d = '1';
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x != '1';
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d != '1';
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x = toString(0);
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d = toString(0);
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x = toString(0)) > 0;
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d = toString(0)) > 0;
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE ((x != toString(1)) > 0) > 0;
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE ((d != toString(1)) > 0) > 0;
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE ((x != toString(0)) != 0) > 0;
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE ((d != toString(0)) != 0) > 0;
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (-(x != toString(0)) = -1) > 0;
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (-(d != toString(0)) = -1) > 0;
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE 1 = 1;
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE 1 = 1;
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x = '0' OR x = '1');
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d = '0' OR d = '1');
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x != '0' AND x != '1');
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d != '0' AND d != '1');