Modifications after merging constraints support

This commit is contained in:
Alexey Milovidov 2019-08-24 16:00:04 +03:00
parent f10cf3b082
commit a6997aa83f
6 changed files with 79 additions and 88 deletions

View File

@ -1,34 +1,65 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <Functions/FunctionHelpers.h>
#include <common/find_symbols.h>
#include <Parsers/formatAST.h>
#include <Columns/ColumnsCommon.h>
#include <Common/assert_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int VIOLATED_CONSTRAINT;
}
CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
const String & table_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))
{
}
void CheckConstraintsBlockOutputStream::write(const Block & block)
{
if (block.rows() > 0)
{
for (size_t i = 0; i < expressions.size(); ++i)
{
Block res = block;
Block block_to_calculate = block;
auto constraint_expr = expressions[i];
auto res_column_uint8 = executeOnBlock(res, constraint_expr);
if (!memoryIsByte(res_column_uint8->getRawDataBegin<1>(), res_column_uint8->byteSize(), 0x1))
constraint_expr->execute(block_to_calculate);
ColumnWithTypeAndName res_column = block_to_calculate.getByPosition(block_to_calculate.columns() - 1);
const ColumnUInt8 & res_column_uint8 = assert_cast<const ColumnUInt8 &>(*res_column.column);
const UInt8 * data = res_column_uint8.getData().data();
size_t size = res_column_uint8.size();
/// Is violated.
if (!memoryIsByte(data, size, 1))
{
auto indices_wrong = findAllWrong(res_column_uint8->getRawDataBegin<1>(), res_column_uint8->byteSize());
std::string indices_str = "{";
for (size_t j = 0; j < indices_wrong.size(); ++j)
{
indices_str += std::to_string(indices_wrong[j]);
indices_str += (j != indices_wrong.size() - 1) ? ", " : "}";
}
size_t row_idx = 0;
for (; row_idx < size; ++row_idx)
if (data[row_idx] != 1)
break;
throw Exception{"Violated constraint " + constraints.constraints[i]->name +
" in table " + table + " at indices " + indices_str + ", constraint expression: " +
" in table " + table + " at row " + std::to_string(row_idx) + ", constraint expression: " +
serializeAST(*(constraints.constraints[i]->expr), true), ErrorCodes::VIOLATED_CONSTRAINT};
}
}
}
output->write(block);
rows_written += block.rows();
}
@ -48,32 +79,4 @@ void CheckConstraintsBlockOutputStream::writeSuffix()
output->writeSuffix();
}
const ColumnUInt8 *CheckConstraintsBlockOutputStream::executeOnBlock(
Block & block,
const ExpressionActionsPtr & constraint)
{
constraint->execute(block);
ColumnWithTypeAndName res_column = block.safeGetByPosition(block.columns() - 1);
return checkAndGetColumn<ColumnUInt8>(res_column.column.get());
}
std::vector<size_t> CheckConstraintsBlockOutputStream::findAllWrong(const void *data, size_t size)
{
std::vector<size_t> res;
if (size == 0)
return res;
auto ptr = reinterpret_cast<const uint8_t *>(data);
for (size_t i = 0; i < size; ++i)
{
if (*(ptr + i) == 0x0)
{
res.push_back(i);
}
}
return res;
}
}

View File

@ -2,16 +2,14 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/ConstraintsDescription.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int VIOLATED_CONSTRAINT;
}
/** Check for constraints violation. If anything is found - throw an exception with detailed error message.
* Otherwise just pass block to output unchanged.
*/
class CheckConstraintsBlockOutputStream : public IBlockOutputStream
{
@ -21,14 +19,7 @@ public:
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList())),
rows_written(0)
{ }
const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -39,14 +30,11 @@ public:
void writeSuffix() override;
private:
const ColumnUInt8* executeOnBlock(Block & block, const ExpressionActionsPtr & constraint);
std::vector<size_t> findAllWrong(const void *data, size_t size);
String table;
BlockOutputStreamPtr output;
Block header;
const ConstraintsDescription constraints;
const ConstraintsExpressions expressions;
size_t rows_written;
size_t rows_written = 0;
};
}

View File

@ -1,9 +1,9 @@
#!/usr/bin/env bash
#ccache -s
#ccache -s # uncomment to display CCache statistics
mkdir -p /server/build_docker
cd /server/build_docker
cmake -G Ninja /server -DCMAKE_C_COMPILER=`which clang-8` -DCMAKE_CXX_COMPILER=`which clang++-8` -DCMAKE_BUILD_TYPE=Debug
cmake -G Ninja /server -DCMAKE_C_COMPILER=`which gcc-8` -DCMAKE_CXX_COMPILER=`which g++-8`
# Set the number of build jobs to the half of number of virtual CPU cores (rounded up).
# By default, ninja use all virtual CPU cores, that leads to very high memory consumption without much improvement in build time.

View File

@ -178,9 +178,9 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Queries will add or remove metadata about constraints from table so they are processed immediately.
Constraint check *will not be executed* on existing table if it was added. For now, we recommend to create new table and use `INSERT SELECT` query to fill new table.
Constraint check *will not be executed* on existing data if it was added.
All changes on distributed tables are broadcasting to ZooKeeper so will be applied on other replicas.
All changes on replicated tables are broadcasting to ZooKeeper so will be applied on other replicas.
### Manipulations With Partitions and Parts {#alter_manipulations-with-partitions}

View File

@ -177,9 +177,9 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Запросы выполняют добавление или удаление метаданных об ограничениях таблицы `[db].name`, поэтому выполняются мнгновенно.
Если ограничение появилось для непустой таблицы, то *проверка ограничения вызвана не будет*. Если же важно добавить ограничение на существующую таблицу, то рекомендуется создать новую таблицу с нужным ограничением и выполнить `INSERT SELECT` запрос для перекачки данных из одной таблицы в другую.
Если ограничение появилось для непустой таблицы, то *проверка ограничения для имеющихся данных не производится*.
Запрос на изменение ограничений так же, как и с индексами, реплицируется через ZooKeeper.
Запрос на изменение ограничений для Replicated таблиц реплицируется, сохраняя новые метаданные в ZooKeeper и применяя изменения на всех репликах.
### Манипуляции с партициями и кусками {#alter_manipulations-with-partitions}

View File

@ -119,9 +119,9 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
) ENGINE = engine
```
`boolean_expr_1` может быть любым булевым выражением, состоящим из операторов сравнения или функций. При наличии одного или нескольких ограничений в момент вставки данных выражения ограничений будут проверяться на истинность для каждой вставляемой строки данных. В случае, если в теле INSERT запроса придут некорректные данные — клиентов будет выкинуто исключение с нарушенным ограничением.
`boolean_expr_1` может быть любым булевым выражением, состоящим из операторов сравнения или функций. При наличии одного или нескольких ограничений в момент вставки данных выражения ограничений будут проверяться на истинность для каждой вставляемой строки данных. В случае, если в теле INSERT запроса придут некорректные данные — клиент получит исключение с описанием нарушенного ограничения.
Добавление большого числа ограничений может негативно повлиять на производительность объёмных `INSERT` запросов.
Добавление большого числа ограничений может негативно повлиять на производительность `INSERT` запросов.
### Выражение для TTL