ClickHouse/dbms/src/Storages/PkCondition.cpp

287 lines
7.9 KiB
C++
Raw Normal View History

2012-12-05 12:44:55 +00:00
#include <DB/Storages/PkCondition.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
namespace DB
{
PkCondition::PkCondition(ASTPtr query, const Context & context_, const SortDescription & sort_descr_)
: context(context_), sort_descr(sort_descr_)
{
for (size_t i = 0; i < sort_descr.size(); ++i)
{
pk_columns[sort_descr[i].column_name] = i;
}
/** Вычисление выражений, зависящих только от констант.
* Чтобы индекс мог использоваться, если написано, например WHERE Date = toDate(now()).
*/
Expression expr_for_constant_folding(query, context);
Block block_with_constants;
/// В блоке должен быть хотя бы один столбец, чтобы у него было известно число строк.
ColumnWithNameAndType dummy_column;
dummy_column.name = "_dummy";
dummy_column.type = new DataTypeUInt8;
dummy_column.column = new ColumnConstUInt8(1, 0);
block_with_constants.insert(dummy_column);
expr_for_constant_folding.execute(block_with_constants, 0, true);
/// Преобразуем секцию WHERE в обратную польскую строку.
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*query);
if (select.where_expression)
{
traverseAST(select.where_expression, block_with_constants);
}
else
{
rpn.push_back(RPNElement(RPNElement::FUNCTION_UNKNOWN));
}
}
/** Получить значение константного выражения.
* Вернуть false, если выражение не константно.
*/
static bool getConstant(ASTPtr & expr, Block & block_with_constants, Field & value)
{
String column_name = expr->getColumnName();
if (ASTLiteral * lit = dynamic_cast<ASTLiteral *>(&*expr))
{
/// литерал
value = lit->value;
return true;
}
else if (block_with_constants.has(column_name) && block_with_constants.getByName(column_name).column->isConst())
{
/// выражение, вычислившееся в константу
value = (*block_with_constants.getByName(column_name).column)[0];
return true;
}
else
return false;
}
void PkCondition::traverseAST(ASTPtr & node, Block & block_with_constants)
{
RPNElement element;
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*node))
{
if (operatorFromAST(func, element))
{
ASTs & args = dynamic_cast<ASTExpressionList &>(*func->arguments).children;
for (size_t i = 0; i < args.size(); ++i)
{
traverseAST(args[i], block_with_constants);
}
rpn.push_back(element);
return;
}
}
if (!atomFromAST(node, block_with_constants, element))
{
element.function = RPNElement::FUNCTION_UNKNOWN;
}
rpn.push_back(element);
}
bool PkCondition::atomFromAST(ASTPtr & node, Block & block_with_constants, RPNElement & out)
{
/// Фнукции < > = != <= >=, у которых один агрумент константа, другой - один из столбцов первичного ключа.
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*node))
{
ASTs & args = dynamic_cast<ASTExpressionList &>(*func->arguments).children;
if (args.size() != 2)
return false;
/// Если true, слева константа.
bool inverted;
size_t column;
Field value;
if (pk_columns.count(args[0]->getColumnName()) && getConstant(args[1], block_with_constants, value))
{
inverted = false;
column = pk_columns[args[0]->getColumnName()];
}
else if (pk_columns.count(args[1]->getColumnName()) && getConstant(args[0], block_with_constants, value))
{
inverted = true;
column = pk_columns[args[1]->getColumnName()];
}
else
return false;
std::string func_name = func->name;
/// Заменим <const> <sign> <column> на <column> <-sign> <const>
if (inverted)
{
if (func_name == "less")
func_name = "greater";
else if (func_name == "greater")
func_name = "less";
else if (func_name == "greaterOrEquals")
func_name = "lessOrEquals";
else if (func_name == "lessOrEquals")
func_name = "greaterOrEquals";
}
out.function = RPNElement::FUNCTION_IN_RANGE;
out.key_column = column;
if (func->name == "notEquals")
{
out.function = RPNElement::FUNCTION_NOT_IN_RANGE;
out.range = Range(value);
}
else if (func->name == "equals")
out.range = Range(value);
else if (func->name == "less")
out.range = Range::RightBounded(value, false);
else if (func->name == "greater")
out.range = Range::LeftBounded(value, false);
else if (func->name == "lessOrEquals")
out.range = Range::RightBounded(value, true);
else if (func->name == "greaterOrEquals")
out.range = Range::LeftBounded(value, true);
else
return false;
return true;
}
return false;
}
bool PkCondition::operatorFromAST(ASTFunction * func, RPNElement & out)
{
/// Фнукции AND, OR, NOT.
ASTs & args = dynamic_cast<ASTExpressionList &>(*func->arguments).children;
if (func->name == "not")
{
if (args.size() != 1)
return false;
out.function = RPNElement::FUNCTION_NOT;
}
else
{
if (args.size() != 2)
return false;
if (func->name == "and")
out.function = RPNElement::FUNCTION_AND;
else if (func->name == "or")
out.function = RPNElement::FUNCTION_OR;
else
return false;
}
return true;
}
String PkCondition::toString()
{
String res;
for (size_t i = 0; i < rpn.size(); ++i)
{
if (i)
res += ", ";
res += rpn[i].toString();
}
return res;
}
/// Множество значений булевой переменной. То есть два булевых значения: может ли быть true, может ли быть false.
struct BoolMask
{
bool can_be_true;
bool can_be_false;
BoolMask() {}
BoolMask(bool can_be_true_, bool can_be_false_) : can_be_true(can_be_true_), can_be_false(can_be_false_) {}
BoolMask operator &(const BoolMask & m)
{
return BoolMask(can_be_true && m.can_be_true, can_be_false || m.can_be_false);
}
BoolMask operator |(const BoolMask & m)
{
return BoolMask(can_be_true || m.can_be_true, can_be_false && m.can_be_false);
}
BoolMask operator !()
{
return BoolMask(can_be_false, can_be_true);
}
};
bool PkCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk)
{
/// Найдем диапазоны элементов ключа.
std::vector<Range> key_ranges(sort_descr.size(), Range());
for (size_t i = 0; i < sort_descr.size(); ++i)
{
if (left_pk[i] == right_pk[i])
{
key_ranges[i] = Range(left_pk[i]);
}
else
{
key_ranges[i] = Range(left_pk[i], true, right_pk[i], true);
break;
}
}
std::vector<BoolMask> rpn_stack;
for (size_t i = 0; i < rpn.size(); ++i)
{
RPNElement & element = rpn[i];
if (element.function == RPNElement::FUNCTION_UNKNOWN)
{
rpn_stack.push_back(BoolMask(true, true));
}
else if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE || element.function == RPNElement::FUNCTION_IN_RANGE)
{
Range & key_range = key_ranges[element.key_column];
bool intersects = element.range.intersectsRange(key_range);
bool contains = element.range.containsRange(key_range);
rpn_stack.push_back(BoolMask(intersects, !contains));
if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE)
rpn_stack.back() = !rpn_stack.back();
}
else if (element.function == RPNElement::FUNCTION_NOT)
{
rpn_stack.back() = !rpn_stack.back();
}
else if (element.function == RPNElement::FUNCTION_AND)
{
BoolMask arg1 = rpn_stack.back();
rpn_stack.pop_back();
BoolMask arg2 = rpn_stack.back();
rpn_stack.back() = arg1 & arg2;
}
else if (element.function == RPNElement::FUNCTION_OR)
{
BoolMask arg1 = rpn_stack.back();
rpn_stack.pop_back();
BoolMask arg2 = rpn_stack.back();
rpn_stack.back() = arg1 | arg2;
}
else
throw Exception("Unexpected function type in PkCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
}
if (rpn_stack.size() != 1)
throw Exception("Unexpected stack size in PkCondition::mayBeTrueInRange", ErrorCodes::LOGICAL_ERROR);
return rpn_stack[0].can_be_true;
}
}