Merge pull request #6652 from yandex/merge-constraints

Make CONSTRAINTs production ready.
This commit is contained in:
alexey-milovidov 2019-08-26 18:49:43 +03:00 committed by GitHub
commit 418a396ba1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 416 additions and 249 deletions

View File

@ -865,7 +865,7 @@ bool TCPHandler::receiveData()
if (!(storage = query_context->tryGetExternalTable(external_table_name)))
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns});
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
query_context->addExternalTable(external_table_name, storage);
}

View File

@ -160,7 +160,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create("_external", data.second, ColumnsDescription{columns});
StoragePtr storage = StorageMemory::create("_external", data.second, ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
context.addExternalTable(data.second, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);

View File

@ -1,34 +1,86 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <Functions/FunctionHelpers.h>
#include <common/find_symbols.h>
#include <Parsers/formatAST.h>
#include <Columns/ColumnsCommon.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitors.h>
namespace DB
{
namespace ErrorCodes
{
extern const int VIOLATED_CONSTRAINT;
}
CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
const String & table_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))
{
}
void CheckConstraintsBlockOutputStream::write(const Block & block)
{
for (size_t i = 0; i < expressions.size(); ++i)
if (block.rows() > 0)
{
Block res = block;
auto constraint_expr = expressions[i];
auto res_column_uint8 = executeOnBlock(res, constraint_expr);
if (!memoryIsByte(res_column_uint8->getRawDataBegin<1>(), res_column_uint8->byteSize(), 0x1))
Block block_to_calculate = block;
for (size_t i = 0; i < expressions.size(); ++i)
{
auto indices_wrong = findAllWrong(res_column_uint8->getRawDataBegin<1>(), res_column_uint8->byteSize());
std::string indices_str = "{";
for (size_t j = 0; j < indices_wrong.size(); ++j)
{
indices_str += std::to_string(indices_wrong[j]);
indices_str += (j != indices_wrong.size() - 1) ? ", " : "}";
}
auto constraint_expr = expressions[i];
throw Exception{"Violated constraint " + constraints.constraints[i]->name +
" in table " + table + " at indices " + indices_str + ", constraint expression: " +
serializeAST(*(constraints.constraints[i]->expr), true), ErrorCodes::VIOLATED_CONSTRAINT};
constraint_expr->execute(block_to_calculate);
ColumnWithTypeAndName res_column = block_to_calculate.getByPosition(block_to_calculate.columns() - 1);
const ColumnUInt8 & res_column_uint8 = assert_cast<const ColumnUInt8 &>(*res_column.column);
const UInt8 * data = res_column_uint8.getData().data();
size_t size = res_column_uint8.size();
/// Is violated.
if (!memoryIsByte(data, size, 1))
{
size_t row_idx = 0;
for (; row_idx < size; ++row_idx)
if (data[row_idx] != 1)
break;
Names related_columns = constraint_expr->getRequiredColumns();
std::stringstream exception_message;
exception_message << "Constraint " << backQuote(constraints.constraints[i]->name)
<< " for table " << backQuote(table)
<< " is violated at row " << (rows_written + row_idx + 1)
<< ". Expression: (" << serializeAST(*(constraints.constraints[i]->expr), true) << ")"
<< ". Column values";
bool first = true;
for (const auto & name : related_columns)
{
const IColumn & column = *block.getByName(name).column;
assert(row_idx < column.size());
exception_message << (first ? ": " : ", ")
<< backQuoteIfNeed(name) << " = " << applyVisitor(FieldVisitorToString(), column[row_idx]);
first = false;
}
throw Exception{exception_message.str(), ErrorCodes::VIOLATED_CONSTRAINT};
}
}
}
output->write(block);
rows_written += block.rows();
}
@ -48,32 +100,4 @@ void CheckConstraintsBlockOutputStream::writeSuffix()
output->writeSuffix();
}
const ColumnUInt8 *CheckConstraintsBlockOutputStream::executeOnBlock(
Block & block,
const ExpressionActionsPtr & constraint)
{
constraint->execute(block);
ColumnWithTypeAndName res_column = block.safeGetByPosition(block.columns() - 1);
return checkAndGetColumn<ColumnUInt8>(res_column.column.get());
}
std::vector<size_t> CheckConstraintsBlockOutputStream::findAllWrong(const void *data, size_t size)
{
std::vector<size_t> res;
if (size == 0)
return res;
auto ptr = reinterpret_cast<const uint8_t *>(data);
for (size_t i = 0; i < size; ++i)
{
if (*(ptr + i) == 0x0)
{
res.push_back(i);
}
}
return res;
}
}

View File

@ -2,16 +2,14 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/ConstraintsDescription.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int VIOLATED_CONSTRAINT;
}
/** Check for constraints violation. If anything is found - throw an exception with detailed error message.
* Otherwise just pass block to output unchanged.
*/
class CheckConstraintsBlockOutputStream : public IBlockOutputStream
{
@ -21,14 +19,7 @@ public:
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList())),
rows_written(0)
{ }
const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -39,14 +30,11 @@ public:
void writeSuffix() override;
private:
const ColumnUInt8* executeOnBlock(Block & block, const ExpressionActionsPtr & constraint);
std::vector<size_t> findAllWrong(const void *data, size_t size);
String table;
BlockOutputStreamPtr output;
Block header;
const ConstraintsDescription constraints;
const ConstraintsExpressions expressions;
size_t rows_written;
size_t rows_written = 0;
};
}

View File

@ -231,7 +231,7 @@ DatabaseMySQL::MySQLStorageInfo DatabaseMySQL::createStorageInfo(
{
const auto & mysql_table = StorageMySQL::create(
database_name, table_name, std::move(mysql_pool), mysql_database_name, table_name,
false, "", ColumnsDescription{columns_name_and_type}, global_context);
false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context);
const auto & create_table_query = std::make_shared<ASTCreateQuery>();

View File

@ -83,6 +83,7 @@ std::pair<String, StoragePtr> createTableFromDefinition(
throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context);
ConstraintsDescription constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
return
{
@ -90,7 +91,7 @@ std::pair<String, StoragePtr> createTableFromDefinition(
StorageFactory::instance().get(
ast_create_query,
database_data_path, ast_create_query.table, database_name, context, context.getGlobalContext(),
columns,
columns, constraints,
true, has_force_restore_data_flag)
};
}

View File

@ -91,7 +91,7 @@ public:
Block sample = interpreter->getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
StoragePtr external_storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns});
StoragePtr external_storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns}, ConstraintsDescription{});
external_storage->startup();
/** We replace the subquery with the name of the temporary table.

View File

@ -381,26 +381,37 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(const ASTExpres
}
ColumnsDescription InterpreterCreateQuery::setColumns(
ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const ASTExpressionList * constraints)
{
ConstraintsDescription res;
if (constraints)
for (const auto & constraint : constraints->children)
res.constraints.push_back(std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
return res;
}
ColumnsDescription InterpreterCreateQuery::setProperties(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
if (create.columns_list)
{
if (create.columns_list->columns)
columns = getColumnsDescription(*create.columns_list->columns, context);
if (create.columns_list->indices)
for (const auto & index : create.columns_list->indices->children)
indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (create.columns_list->constraints)
for (const auto & constraint : create.columns_list->constraints->children)
constraints.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
}
else if (!create.as_table.empty())
{
@ -556,6 +567,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
ColumnsDescription columns;
ConstraintsDescription constraints;
StoragePtr res;
if (create.as_table_function)
@ -567,7 +579,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
else
{
/// Set and retrieve list of columns.
columns = setColumns(create, as_select_sample, as_storage);
columns = setProperties(create, as_select_sample, as_storage);
constraints = getConstraintsDescription(create.columns_list->constraints);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
@ -635,6 +648,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
context,
context.getGlobalContext(),
columns,
constraints,
create.attach,
false);
}

View File

@ -14,6 +14,7 @@ namespace DB
class Context;
class ASTCreateQuery;
class ASTExpressionList;
class ASTConstraintDeclaration;
/** Allows to create new table or database,
@ -45,13 +46,14 @@ public:
/// Obtain information about columns, their types, default values and column comments, for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, const Context & context);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
private:
BlockIO createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create);
/// Calculate list of columns of table and return it.
ColumnsDescription setColumns(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
/// Calculate list of columns, constraints, indices, etc... of table and return columns.
ColumnsDescription setProperties(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
void setEngine(ASTCreateQuery & create) const;
void checkAccess(const ASTCreateQuery & create);

View File

@ -121,8 +121,9 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
out = std::make_shared<CheckConstraintsBlockOutputStream>(
query.table, out, query_sample_block, table->getConstraints(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());

View File

@ -278,6 +278,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_nam
std::string data_path = database->getDataPath();
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context);
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
StoragePtr table = StorageFactory::instance().get(create,
data_path,
@ -286,6 +287,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_nam
system_context,
system_context.getGlobalContext(),
columns,
constraints,
create.attach,
false);

View File

@ -24,20 +24,8 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IStorage::IStorage(ColumnsDescription columns_)
IStorage::IStorage(ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
}
IStorage::IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
}
IStorage::IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_, IndicesDescription indices_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
setIndices(std::move(indices_));
}
const ColumnsDescription & IStorage::getColumns() const

View File

@ -65,9 +65,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>
{
public:
IStorage() = default;
explicit IStorage(ColumnsDescription columns_);
IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_);
IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_, IndicesDescription indices_);
explicit IStorage(ColumnsDescription virtuals_);
virtual ~IStorage() = default;
IStorage(const IStorage &) = delete;

View File

@ -86,7 +86,6 @@ StorageKafka::StorageKafka(
size_t skip_broken_,
bool intermediate_commit_)
: IStorage(
columns_,
ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()},
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()},
@ -108,6 +107,7 @@ StorageKafka::StorageKafka(
, skip_broken(skip_broken_)
, intermediate_commit(intermediate_commit_)
{
setColumns(columns_);
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate();
}

View File

@ -218,9 +218,11 @@ StorageLiveView::StorageLiveView(
Context & local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns_)
: IStorage(columns_), table_name(table_name_),
: table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext())
{
setColumns(columns_);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -127,7 +127,6 @@ MergeTreeData::MergeTreeData(
{
const auto settings = getCOWSettings();
setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_);
setConstraints(constraints_);
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical());

View File

@ -40,8 +40,11 @@ public:
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(part_->storage.getColumns(), part_->storage.getVirtuals(), part_->storage.getIndices()), part(part_)
{}
: IStorage(part_->storage.getVirtuals()), part(part_)
{
setColumns(part_->storage.getColumns());
setIndices(part_->storage.getIndices());
}
private:
MergeTreeData::DataPartPtr part;

View File

@ -422,21 +422,21 @@ static StoragePtr create(const StorageFactory::Arguments & args)
switch (merging_params.mode)
{
default:
break;
case MergeTreeData::MergingParams::Summing:
add_optional_param("list of columns to sum");
break;
case MergeTreeData::MergingParams::Replacing:
add_optional_param("version");
break;
case MergeTreeData::MergingParams::Collapsing:
add_mandatory_param("sign column");
break;
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
default:
break;
case MergeTreeData::MergingParams::Summing:
add_optional_param("list of columns to sum");
break;
case MergeTreeData::MergingParams::Replacing:
add_optional_param("version");
break;
case MergeTreeData::MergingParams::Collapsing:
add_mandatory_param("sign column");
break;
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
{
add_mandatory_param("sign column");
add_mandatory_param("version");
@ -574,7 +574,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ASTPtr sample_by_ast;
ASTPtr ttl_table_ast;
IndicesDescription indices_description;
ConstraintsDescription constraints_description;
MutableMergeTreeSettingsPtr storage_settings = MergeTreeSettings::create(args.context.getMergeTreeSettings());
if (is_extended_storage_def)
@ -601,12 +600,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (args.query.columns_list && args.query.columns_list->indices)
for (const auto & index : args.query.columns_list->indices->children)
indices_description.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (args.query.columns_list && args.query.columns_list->constraints)
for (const auto & constraint : args.query.columns_list->constraints->children)
constraints_description.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
storage_settings->loadFromQuery(*args.storage_def);
}
@ -644,14 +638,14 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
args.columns, indices_description, constraints_description,
args.columns, indices_description, args.constraints,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.data_path, args.database_name, args.table_name, args.columns, indices_description,
constraints_description, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
args.constraints, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
}

View File

@ -55,11 +55,12 @@ namespace ErrorCodes
}
StorageBuffer::StorageBuffer(const std::string & database_name_, const std::string & table_name_, const ColumnsDescription & columns_,
StorageBuffer::StorageBuffer(const std::string & database_name_, const std::string & table_name_,
const ColumnsDescription & columns_, const ConstraintsDescription & constraints_,
Context & context_,
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_, bool allow_materialized_)
: IStorage{columns_},
:
table_name(table_name_), database_name(database_name_), global_context(context_),
num_shards(num_shards_), buffers(num_shards_),
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
@ -67,6 +68,8 @@ StorageBuffer::StorageBuffer(const std::string & database_name_, const std::stri
no_destination(destination_database.empty() && destination_table.empty()),
allow_materialized(allow_materialized_), log(&Logger::get("StorageBuffer (" + table_name + ")"))
{
setColumns(columns_);
setConstraints(constraints_);
}
StorageBuffer::~StorageBuffer()
@ -743,7 +746,7 @@ void registerStorageBuffer(StorageFactory & factory)
return StorageBuffer::create(
args.database_name,
args.table_name, args.columns,
args.table_name, args.columns, args.constraints,
args.context,
num_buckets,
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},

View File

@ -140,7 +140,8 @@ protected:
/** num_shards - the level of internal parallelism (the number of independent buffers)
* The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded.
*/
StorageBuffer(const std::string & database_name_, const std::string & table_name_, const ColumnsDescription & columns_,
StorageBuffer(const std::string & database_name_, const std::string & table_name_,
const ColumnsDescription & columns_, const ConstraintsDescription & constraints_,
Context & context_,
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_, bool allow_materialized_);

View File

@ -30,11 +30,13 @@ StorageDictionary::StorageDictionary(
const Context & context,
bool attach,
const String & dictionary_name_)
: IStorage{columns_}, table_name(table_name_),
: table_name(table_name_),
database_name(database_name_),
dictionary_name(dictionary_name_),
logger(&Poco::Logger::get("StorageDictionary"))
{
setColumns(columns_);
if (!attach)
{
const auto & dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);

View File

@ -209,6 +209,7 @@ StorageDistributed::StorageDistributed(
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
@ -216,13 +217,20 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach_)
: IStorage{columns_}, table_name(table_name_), database_name(database_name_),
: table_name(table_name_), database_name(database_name_),
remote_database(remote_database_), remote_table(remote_table_),
global_context(context_), cluster_name(global_context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
sharding_key_expr(sharding_key_ ? buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(table_name) + '/'))
{
setColumns(columns_);
setConstraints(constraints_);
if (sharding_key_)
{
sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false);
sharding_key_column_name = sharding_key_->getColumnName();
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
{
@ -237,15 +245,16 @@ StorageDistributed::StorageDistributed(
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach)
: StorageDistributed(database_name_, table_name_, columns_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
: StorageDistributed(database_name_, table_name_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
{
remote_table_function_ptr = remote_table_function_ptr_;
remote_table_function_ptr = remote_table_function_ptr_;
}
@ -258,7 +267,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -274,7 +283,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -634,7 +643,7 @@ void registerStorageDistributed(StorageFactory & factory)
}
return StorageDistributed::create(
args.database_name, args.table_name, args.columns,
args.database_name, args.table_name, args.columns, args.constraints,
remote_database, remote_table, cluster_name,
args.context, sharding_key, args.data_path,
args.attach);

View File

@ -161,6 +161,7 @@ protected:
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
@ -173,6 +174,7 @@ protected:
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,

View File

@ -46,6 +46,7 @@ StoragePtr StorageFactory::get(
Context & local_context,
Context & context,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
bool attach,
bool has_force_restore_data_flag) const
{
@ -154,6 +155,7 @@ StoragePtr StorageFactory::get(
.local_context = local_context,
.context = context,
.columns = columns,
.constraints = constraints,
.attach = attach,
.has_force_restore_data_flag = has_force_restore_data_flag
};

View File

@ -3,6 +3,7 @@
#include <Common/NamePrompter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <ext/singleton.h>
#include <unordered_map>
@ -35,6 +36,7 @@ public:
Context & local_context;
Context & context;
const ColumnsDescription & columns;
const ConstraintsDescription & constraints;
bool attach;
bool has_force_restore_data_flag;
};
@ -49,6 +51,7 @@ public:
Context & local_context,
Context & context,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
bool attach,
bool has_force_restore_data_flag) const;

View File

@ -72,10 +72,14 @@ StorageFile::StorageFile(
const std::string & table_name_,
const std::string & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorage(columns_),
:
table_name(table_name_), database_name(database_name_), format_name(format_name_), context_global(context_), table_fd(table_fd_)
{
setColumns(columns_);
setConstraints(constraints_);
if (table_fd < 0) /// Will use file
{
use_table_fd = false;
@ -330,7 +334,7 @@ void registerStorageFile(StorageFactory & factory)
return StorageFile::create(
source_path, source_fd,
args.data_path,
args.database_name, args.table_name, format_name, args.columns,
args.database_name, args.table_name, format_name, args.columns, args.constraints,
args.context);
});
}

View File

@ -60,6 +60,7 @@ protected:
const std::string & table_name_,
const std::string & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
private:

View File

@ -30,14 +30,16 @@ StorageHDFS::StorageHDFS(const String & uri_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorage(columns_)
, uri(uri_)
: uri(uri_)
, format_name(format_name_)
, table_name(table_name_)
, database_name(database_name_)
, context(context_)
{
setColumns(columns_);
setConstraints(constraints_);
}
namespace
@ -175,7 +177,7 @@ void registerStorageHDFS(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.context);
return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
});
}

View File

@ -37,6 +37,7 @@ protected:
const String & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
private:

View File

@ -36,8 +36,9 @@ StorageJoin::StorageJoin(
ASTTableJoin::Kind kind_,
ASTTableJoin::Strictness strictness_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite)
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_}
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_, constraints_}
, key_names(key_names_)
, use_nulls(use_nulls_)
, limits(limits_)
@ -170,6 +171,7 @@ void registerStorageJoin(StorageFactory & factory)
kind,
strictness,
args.columns,
args.constraints,
join_any_take_last_row);
});
}

View File

@ -64,6 +64,7 @@ protected:
SizeLimits limits_,
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite);
};

View File

@ -422,12 +422,15 @@ StorageLog::StorageLog(
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
size_t max_compress_block_size_)
: IStorage{columns_},
path(path_), table_name(table_name_), database_name(database_name_),
: path(path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(table_name) + '/' + "sizes.json")
{
setColumns(columns_);
setConstraints(constraints_);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -644,7 +647,7 @@ void registerStorageLog(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageLog::create(
args.data_path, args.database_name, args.table_name, args.columns,
args.data_path, args.database_name, args.table_name, args.columns, args.constraints,
args.context.getSettings().max_compress_block_size);
});
}

View File

@ -57,6 +57,7 @@ protected:
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
size_t max_compress_block_size_);
private:

View File

@ -99,9 +99,11 @@ StorageMaterializedView::StorageMaterializedView(
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_)
: IStorage{columns_}, table_name(table_name_),
: table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext())
{
setColumns(columns_);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -31,6 +31,7 @@ public:
}
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop() override;
void truncate(const ASTPtr &, const Context &) override;

View File

@ -74,9 +74,11 @@ private:
};
StorageMemory::StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_)
: IStorage{std::move(columns_description_)}, database_name(std::move(database_name_)), table_name(std::move(table_name_))
StorageMemory::StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: database_name(std::move(database_name_)), table_name(std::move(table_name_))
{
setColumns(std::move(columns_description_));
setConstraints(std::move(constraints_));
}
@ -143,7 +145,7 @@ void registerStorageMemory(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageMemory::create(args.database_name, args.table_name, args.columns);
return StorageMemory::create(args.database_name, args.table_name, args.columns, args.constraints);
});
}

View File

@ -55,7 +55,7 @@ private:
std::mutex mutex;
protected:
StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_);
StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
};
}

View File

@ -53,13 +53,14 @@ StorageMerge::StorageMerge(
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
: IStorage(columns_, ColumnsDescription({{"_table", std::make_shared<DataTypeString>()}}, true))
: IStorage(ColumnsDescription({{"_table", std::make_shared<DataTypeString>()}}, true))
, table_name(table_name_)
, database_name(database_name_)
, source_database(source_database_)
, table_name_regexp(table_name_regexp_)
, global_context(context_)
{
setColumns(columns_);
}

View File

@ -45,9 +45,9 @@ StorageMySQL::StorageMySQL(
const bool replace_query_,
const std::string & on_duplicate_clause_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: IStorage{columns_}
, table_name(table_name_)
: table_name(table_name_)
, database_name(database_name_)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
@ -56,6 +56,8 @@ StorageMySQL::StorageMySQL(
, pool(std::move(pool_))
, global_context(context_)
{
setColumns(columns_);
setConstraints(constraints_);
}
@ -241,6 +243,7 @@ void registerStorageMySQL(StorageFactory & factory)
replace_query,
on_duplicate_clause,
args.columns,
args.constraints,
args.context);
});
}

View File

@ -28,6 +28,7 @@ public:
const bool replace_query_,
const std::string & on_duplicate_clause_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
std::string getName() const override { return "MySQL"; }
@ -54,7 +55,6 @@ private:
bool replace_query;
std::string on_duplicate_clause;
mysqlxx::Pool pool;
Context global_context;
};

View File

@ -26,7 +26,7 @@ void registerStorageNull(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageNull::create(args.database_name, args.table_name, args.columns);
return StorageNull::create(args.database_name, args.table_name, args.columns, args.constraints);
});
}

View File

@ -52,9 +52,11 @@ private:
String database_name;
protected:
StorageNull(String database_name_, String table_name_, ColumnsDescription columns_description_)
: IStorage{std::move(columns_description_)}, table_name(std::move(table_name_)), database_name(std::move(database_name_))
StorageNull(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: table_name(std::move(table_name_)), database_name(std::move(database_name_))
{
setColumns(std::move(columns_description_));
setConstraints(std::move(constraints_));
}
};

View File

@ -90,9 +90,13 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_)
: IStorage{columns_}, table_name(table_name_), database_name(database_name_)
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
setConstraints(constraints_);
if (path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -105,8 +109,9 @@ StorageSet::StorageSet(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_)
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_},
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_, constraints_},
set(std::make_shared<Set>(SizeLimits(), false))
{
Block header = getSampleBlock();
@ -209,7 +214,7 @@ void registerStorageSet(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageSet::create(args.data_path, args.database_name, args.table_name, args.columns);
return StorageSet::create(args.data_path, args.database_name, args.table_name, args.columns, args.constraints);
});
}

View File

@ -33,7 +33,8 @@ protected:
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_);
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
String path;
String table_name;
@ -81,7 +82,8 @@ protected:
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_);
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
};
}

View File

@ -198,14 +198,17 @@ StorageStripeLog::StorageStripeLog(
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_)
: IStorage{columns_},
path(path_), table_name(table_name_), database_name(database_name_),
: path(path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(table_name) + '/' + "sizes.json"),
log(&Logger::get("StorageStripeLog"))
{
setColumns(columns_);
setConstraints(constraints_);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -316,7 +319,7 @@ void registerStorageStripeLog(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageStripeLog::create(
args.data_path, args.database_name, args.table_name, args.columns,
args.data_path, args.database_name, args.table_name, args.columns, args.constraints,
args.attach, args.context.getSettings().max_compress_block_size);
});
}

View File

@ -74,6 +74,7 @@ protected:
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_);
};

View File

@ -327,14 +327,17 @@ StorageTinyLog::StorageTinyLog(
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_)
: IStorage{columns_},
path(path_), table_name(table_name_), database_name(database_name_),
: path(path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(table_name) + '/' + "sizes.json"),
log(&Logger::get("StorageTinyLog"))
{
setColumns(columns_);
setConstraints(constraints_);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -450,7 +453,7 @@ void registerStorageTinyLog(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageTinyLog::create(
args.data_path, args.database_name, args.table_name, args.columns,
args.data_path, args.database_name, args.table_name, args.columns, args.constraints,
args.attach, args.context.getSettings().max_compress_block_size);
});
}

View File

@ -78,6 +78,7 @@ protected:
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_);
};

View File

@ -24,14 +24,18 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
IStorageURLBase::IStorageURLBase(const Poco::URI & uri_,
IStorageURLBase::IStorageURLBase(
const Poco::URI & uri_,
const Context & context_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_)
: IStorage(columns_), uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
setConstraints(constraints_);
}
namespace
@ -214,7 +218,7 @@ void registerStorageURL(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageURL::create(uri, args.database_name, args.table_name, format_name, args.columns, args.context);
return StorageURL::create(uri, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
});
}
}

View File

@ -2,9 +2,9 @@
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
/**
@ -19,7 +19,8 @@ public:
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
BlockInputStreams read(const Names & column_names,
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
@ -31,12 +32,14 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
protected:
IStorageURLBase(const Poco::URI & uri_,
IStorageURLBase(
const Poco::URI & uri_,
const Context & context_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_);
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
Poco::URI uri;
const Context & context_global;
@ -48,13 +51,15 @@ private:
virtual std::string getReadMethod() const;
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
@ -63,16 +68,19 @@ private:
virtual Block getHeaderBlock(const Names & column_names) const = 0;
};
class StorageURL : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
{
public:
StorageURL(const Poco::URI & uri_,
StorageURL(
const Poco::URI & uri_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorageURLBase(uri_, context_, database_name_, table_name_, format_name_, columns_)
: IStorageURLBase(uri_, context_, database_name_, table_name_, format_name_, columns_, constraints_)
{
}

View File

@ -31,8 +31,10 @@ StorageView::StorageView(
const String & table_name_,
const ASTCreateQuery & query,
const ColumnsDescription & columns_)
: IStorage{columns_}, table_name(table_name_), database_name(database_name_)
: table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -30,7 +30,8 @@ StorageXDBC::StorageXDBC(
const ColumnsDescription & columns_,
const Context & context_,
const BridgeHelperPtr bridge_helper_)
: IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_)
/// Please add support for constraints as soon as StorageODBC or JDBC will support insertion.
: IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_, ConstraintsDescription{})
, bridge_helper(bridge_helper_)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)

View File

@ -1,59 +1,61 @@
#pragma once
#include <Storages/StorageURL.h>
#include <ext/shared_ptr_helper.h>
#include <Common/XDBCBridgeHelper.h>
namespace DB
{
/** Implements storage in the XDBC database.
* Use ENGINE = xdbc(connection_string, table_name)
* Example ENGINE = odbc('dsn=test', table)
* Read only.
*/
class StorageXDBC : public IStorageURLBase
{
public:
class StorageXDBC : public IStorageURLBase
{
public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
StorageXDBC(const std::string & database_name_,
const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
StorageXDBC(const std::string & database_name_,
const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
private:
private:
BridgeHelperPtr bridge_helper;
std::string remote_database_name;
std::string remote_table_name;
BridgeHelperPtr bridge_helper;
std::string remote_database_name;
std::string remote_table_name;
Poco::Logger * log;
Poco::Logger * log;
std::string getReadMethod() const override;
std::string getReadMethod() const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names) const override;
Block getHeaderBlock(const Names & column_names) const override;
std::string getName() const override;
};
std::string getName() const override;
};
}

View File

@ -23,7 +23,7 @@ struct State
{
registerFunctions();
DatabasePtr database = std::make_shared<DatabaseMemory>("test");
database->attachTable("table", StorageMemory::create("test", "table", ColumnsDescription{columns}));
database->attachTable("table", StorageMemory::create("test", "table", ColumnsDescription{columns}, ConstraintsDescription{}));
context.makeGlobalContext();
context.addDatabase("test", database);
context.setCurrentDatabase("test");

View File

@ -25,7 +25,7 @@ try
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
names_and_types.emplace_back("b", std::make_shared<DataTypeUInt8>());
StoragePtr table = StorageLog::create("./", "test", "test", ColumnsDescription{names_and_types}, 1048576);
StoragePtr table = StorageLog::create("./", "test", "test", ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
table->startup();
auto context = Context::createGlobal();

View File

@ -15,6 +15,7 @@ StoragePtr TableFunctionFile::getStorage(
table_name,
format,
columns,
ConstraintsDescription{},
global_context);
}

View File

@ -16,6 +16,7 @@ StoragePtr TableFunctionHDFS::getStorage(
table_name,
format,
columns,
ConstraintsDescription{},
global_context);
}

View File

@ -127,6 +127,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
replace_query,
on_duplicate_clause,
ColumnsDescription{columns},
ConstraintsDescription{},
context);
res->startup();

View File

@ -4,13 +4,14 @@
#include <TableFunctions/TableFunctionURL.h>
#include <Poco/URI.h>
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
{
Poco::URI uri(source);
return StorageURL::create(uri, getDatabaseName(), table_name, format, columns, global_context);
return StorageURL::create(uri, getDatabaseName(), table_name, format, columns, ConstraintsDescription{}, global_context);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -3,6 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
EXCEPTION_TEXT=violated
EXCEPTION_SUCCESS_TEXT=ok
$CLICKHOUSE_CLIENT --query="CREATE DATABASE IF NOT EXISTS test;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test_constraints;"
@ -20,7 +21,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 2);"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
# This one must throw and exception
EXCEPTION_TEXT="Violated constraint b_constraint in table test_constraints at indices {1, 3}"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (3, 4), (1, 0), (3, 4), (6, 0);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
@ -38,13 +39,11 @@ $CLICKHOUSE_CLIENT --query="CREATE TABLE test_constraints
ENGINE = MergeTree ORDER BY (a);"
# This one must throw an exception
EXCEPTION_TEXT="Violated constraint b_constraint in table test_constraints at indices {0}"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 2);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
# This one must throw an exception
EXCEPTION_TEXT="Violated constraint a_constraint in table test_constraints at indices {1}"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (5, 16), (10, 11), (9, 11), (8, 12);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"

View File

@ -3,6 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
EXCEPTION_TEXT=violated
EXCEPTION_SUCCESS_TEXT=ok
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test_constraints;"
@ -20,7 +21,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 2);"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
# This one must throw and exception
EXCEPTION_TEXT="Violated constraint b_constraint in table test_constraints at indices"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 0);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"

View File

@ -0,0 +1,14 @@
0
0
3
0
0
3
0
0
3
0
0
3
CREATE TABLE default.constrained (`URL` String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log
CREATE TABLE default.constrained2 (`URL` String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log

View File

@ -0,0 +1,53 @@
DROP TABLE IF EXISTS constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Null;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Memory;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = StripeLog;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = TinyLog;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
DROP TABLE IF EXISTS constrained2;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log;
CREATE TABLE constrained2 AS constrained;
SHOW CREATE TABLE constrained FORMAT TSVRaw;
SHOW CREATE TABLE constrained2 FORMAT TSVRaw;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
INSERT INTO constrained2 VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
DROP TABLE constrained;
DROP TABLE constrained2;

View File

@ -1,9 +1,9 @@
#!/usr/bin/env bash
#ccache -s
#ccache -s # uncomment to display CCache statistics
mkdir -p /server/build_docker
cd /server/build_docker
cmake -G Ninja /server -DCMAKE_C_COMPILER=`which clang-8` -DCMAKE_CXX_COMPILER=`which clang++-8` -DCMAKE_BUILD_TYPE=Debug
cmake -G Ninja /server -DCMAKE_C_COMPILER=`which gcc-8` -DCMAKE_CXX_COMPILER=`which g++-8`
# Set the number of build jobs to the half of number of virtual CPU cores (rounded up).
# By default, ninja use all virtual CPU cores, that leads to very high memory consumption without much improvement in build time.

View File

@ -29,7 +29,7 @@ These actions are described in detail below.
ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [AFTER name_after]
```
Adds a new column to the table with the specified `name`, `type`, and `default_expr` (see the section [Default expressions](create.md#create-default-values)).
Adds a new column to the table with the specified `name`, `type`, and `default_expr` (see the section [Default expressions](create.md#create-default-values)).
If the `IF NOT EXISTS` clause is included, the query won't return an error if the column already exists. If you specify `AFTER name_after` (the name of another column), the column is added after the specified one in the list of table columns. Otherwise, the column is added to the end of the table. Note that there is no way to add a column to the beginning of a table. For a chain of actions, `name_after` can be the name of a column that is added in one of the previous actions.
@ -66,7 +66,7 @@ CLEAR COLUMN [IF EXISTS] name IN PARTITION partition_name
```
Resets all data in a column for a specified partition. Read more about setting the partition name in the section [How to specify the partition expression](#alter-how-to-specify-part-expr).
If the `IF EXISTS` clause is specified, the query won't return an error if the column doesn't exist.
Example:
@ -85,7 +85,7 @@ Adds a comment to the column. If the `IF EXISTS` clause is specified, the query
Each column can have one comment. If a comment already exists for the column, a new comment overwrites the previous comment.
Comments are stored in the `comment_expression` column returned by the [DESCRIBE TABLE](misc.md#misc-describe-table) query.
Comments are stored in the `comment_expression` column returned by the [DESCRIBE TABLE](misc.md#misc-describe-table) query.
Example:
@ -129,7 +129,7 @@ The `ALTER` query lets you create and delete separate elements (columns) in nest
There is no support for deleting columns in the primary key or the sampling key (columns that are used in the `ENGINE` expression). Changing the type for columns that are included in the primary key is only possible if this change does not cause the data to be modified (for example, you are allowed to add values to an Enum or to change a type from `DateTime` to `UInt32`).
If the `ALTER` query is not sufficient to make the table changes you need, you can create a new table, copy the data to it using the [INSERT SELECT](insert_into.md#insert_query_insert-select) query, then switch the tables using the [RENAME](misc.md#misc_operations-rename) query and delete the old table. You can use the [clickhouse-copier](../operations/utils/clickhouse-copier.md) as an alternative to the `INSERT SELECT` query.
If the `ALTER` query is not sufficient to make the table changes you need, you can create a new table, copy the data to it using the [INSERT SELECT](insert_into.md#insert_query_insert-select) query, then switch the tables using the [RENAME](misc.md#misc_operations-rename) query and delete the old table. You can use the [clickhouse-copier](../operations/utils/clickhouse-copier.md) as an alternative to the `INSERT SELECT` query.
The `ALTER` query blocks all reads and writes for the table. In other words, if a long `SELECT` is running at the time of the `ALTER` query, the `ALTER` query will wait for it to complete. At the same time, all new queries to the same table will wait while this `ALTER` is running.
@ -178,9 +178,9 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Queries will add or remove metadata about constraints from table so they are processed immediately.
Constraint check *will not be executed* on existing table if it was added. For now, we recommend to create new table and use `INSERT SELECT` query to fill new table.
Constraint check *will not be executed* on existing data if it was added.
All changes on distributed tables are broadcasting to ZooKeeper so will be applied on other replicas.
All changes on replicated tables are broadcasting to ZooKeeper so will be applied on other replicas.
### Manipulations With Partitions and Parts {#alter_manipulations-with-partitions}
@ -267,7 +267,7 @@ This query copies the data partition from the `table1` to `table2`. Note that da
For the query to run successfully, the following conditions must be met:
- Both tables must have the same structure.
- Both tables must have the same partition key.
- Both tables must have the same partition key.
#### CLEAR COLUMN IN PARTITION {#alter_clear-column-partition}
@ -289,13 +289,13 @@ ALTER TABLE visits CLEAR COLUMN hour in PARTITION 201902
ALTER TABLE table_name FREEZE [PARTITION partition_expr]
```
This query creates a local backup of a specified partition. If the `PARTITION` clause is omitted, the query creates the backup of all partitions at once.
This query creates a local backup of a specified partition. If the `PARTITION` clause is omitted, the query creates the backup of all partitions at once.
Note that for old-styled tables you can specify the prefix of the partition name (for example, '2019') - then the query creates the backup for all the corresponding partitions. Read about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
!!! note
The entire backup process is performed without stopping the server.
At the time of execution, for a data snapshot, the query creates hardlinks to a table data. Hardlinks are placed in the directory `/var/lib/clickhouse/shadow/N/...`, where:
- `/var/lib/clickhouse/` is the working ClickHouse directory specified in the config.
@ -348,7 +348,7 @@ ALTER TABLE users ATTACH PARTITION 201902;
```
Note that:
- The `ALTER ... FETCH PARTITION` query isn't replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER ... FETCH PARTITION` query isn't replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER TABLE ... ATTACH` query is replicated. It adds the data to all replicas. The data is added to one of the replicas from the `detached` directory, and to the others - from neighboring replicas.
Before downloading, the system checks if the partition exists and the table structure matches. The most appropriate replica is selected automatically from the healthy replicas.

View File

@ -29,7 +29,7 @@ ALTER TABLE [db].name [ON CLUSTER cluster] ADD|DROP|CLEAR|COMMENT|MODIFY COLUMN
ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [AFTER name_after]
```
Добавляет в таблицу новый столбец с именем `name`, типом `type` и выражением для умолчания `default_expr` (смотрите раздел [Значения по умолчанию](create.md#create-default-values)).
Добавляет в таблицу новый столбец с именем `name`, типом `type` и выражением для умолчания `default_expr` (смотрите раздел [Значения по умолчанию](create.md#create-default-values)).
Если указано `IF NOT EXISTS`, запрос не будет возвращать ошибку, если столбец уже существует. Если указано `AFTER name_after` (имя другого столбца), то столбец добавляется (в список столбцов таблицы) после указанного. Иначе, столбец добавляется в конец таблицы. Обратите внимание, ClickHouse не позволяет добавлять столбцы в начало таблицы. Для цепочки действий, `name_after` может быть именем столбца, который добавляется в одном из предыдущих действий.
@ -84,7 +84,7 @@ COMMENT COLUMN [IF EXISTS] name 'Text comment'
Каждый столбец может содержать только один комментарий. При выполнении запроса существующий комментарий заменяется на новый.
Посмотреть комментарии можно в столбце `comment_expression` из запроса [DESCRIBE TABLE](misc.md#misc-describe-table).
Посмотреть комментарии можно в столбце `comment_expression` из запроса [DESCRIBE TABLE](misc.md#misc-describe-table).
Пример:
@ -177,9 +177,9 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Запросы выполняют добавление или удаление метаданных об ограничениях таблицы `[db].name`, поэтому выполняются мнгновенно.
Если ограничение появилось для непустой таблицы, то *проверка ограничения вызвана не будет*. Если же важно добавить ограничение на существующую таблицу, то рекомендуется создать новую таблицу с нужным ограничением и выполнить `INSERT SELECT` запрос для перекачки данных из одной таблицы в другую.
Если ограничение появилось для непустой таблицы, то *проверка ограничения для имеющихся данных не производится*.
Запрос на изменение ограничений так же, как и с индексами, реплицируется через ZooKeeper.
Запрос на изменение ограничений для Replicated таблиц реплицируется, сохраняя новые метаданные в ZooKeeper и применяя изменения на всех репликах.
### Манипуляции с партициями и кусками {#alter_manipulations-with-partitions}
@ -260,7 +260,7 @@ ALTER TABLE visits ATTACH PART 201901_2_2_0;
ALTER TABLE table2 REPLACE PARTITION partition_expr FROM table1
```
Копирует партицию из таблицы `table1` в таблицу `table2`. Данные из `table1` не удаляются.
Копирует партицию из таблицы `table1` в таблицу `table2`. Данные из `table1` не удаляются.
Следует иметь в виду:
@ -297,19 +297,19 @@ ALTER TABLE table_name CLEAR INDEX index_name IN PARTITION partition_expr
ALTER TABLE table_name FREEZE [PARTITION partition_expr]
```
Создаёт резервную копию для заданной партиции. Если выражение `PARTITION` опущено, резервные копии будут созданы для всех партиций.
Создаёт резервную копию для заданной партиции. Если выражение `PARTITION` опущено, резервные копии будут созданы для всех партиций.
!!! note
Создание резервной копии не требует остановки сервера.
Создание резервной копии не требует остановки сервера.
Для таблиц старого стиля имя партиций можно задавать в виде префикса (например, '2019'). В этом случае резервные копии будут созданы для всех соответствующих партиций. Подробнее о том, как корректно задать имя партиции, см. в разделе [Как задавать имя партиции в запросах ALTER](#alter-how-to-specify-part-expr).
Запрос делает следующее — для текущего состояния таблицы он формирует жесткие ссылки на данные в этой таблице. Ссылки размещаются в директории `/var/lib/clickhouse/shadow/N/...`, где:
- `/var/lib/clickhouse/` — рабочая директория ClickHouse, заданная в конфигурационном файле;
- `N` — инкрементальный номер резервной копии.
Структура директорий внутри резервной копии такая же, как внутри `/var/lib/clickhouse/`. Запрос выполнит 'chmod' для всех файлов, запрещая запись в них.
Структура директорий внутри резервной копии такая же, как внутри `/var/lib/clickhouse/`. Запрос выполнит 'chmod' для всех файлов, запрещая запись в них.
Обратите внимание, запрос `ALTER TABLE t FREEZE PARTITION` не реплицируется. Он создает резервную копию только на локальном сервере. После создания резервной копии данные из `/var/lib/clickhouse/shadow/` можно скопировать на удалённый сервер, а локальную копию удалить.
@ -357,7 +357,7 @@ ALTER TABLE users ATTACH PARTITION 201902;
#### Как задавать имя партиции в запросах ALTER {#alter-how-to-specify-part-expr}
Чтобы задать нужную партицию в запросах `ALTER ... PARTITION`, можно использовать:
Чтобы задать нужную партицию в запросах `ALTER ... PARTITION`, можно использовать:
- Имя партиции. Посмотреть имя партиции можно в столбце `partition` системной таблицы [system.parts](../operations/system_tables.md#system_tables-parts). Например, `ALTER TABLE visits DETACH PARTITION 201901`.
- Произвольное выражение из столбцов исходной таблицы. Также поддерживаются константы и константные выражения. Например, `ALTER TABLE visits DETACH PARTITION toYYYYMM(toDate('2019-01-25'))`.

View File

@ -119,9 +119,9 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
) ENGINE = engine
```
`boolean_expr_1` может быть любым булевым выражением, состоящим из операторов сравнения или функций. При наличии одного или нескольких ограничений в момент вставки данных выражения ограничений будут проверяться на истинность для каждой вставляемой строки данных. В случае, если в теле INSERT запроса придут некорректные данные — клиентов будет выкинуто исключение с нарушенным ограничением.
`boolean_expr_1` может быть любым булевым выражением, состоящим из операторов сравнения или функций. При наличии одного или нескольких ограничений в момент вставки данных выражения ограничений будут проверяться на истинность для каждой вставляемой строки данных. В случае, если в теле INSERT запроса придут некорректные данные — клиент получит исключение с описанием нарушенного ограничения.
Добавление большого числа ограничений может негативно повлиять на производительность объёмных `INSERT` запросов.
Добавление большого числа ограничений может негативно повлиять на производительность `INSERT` запросов.
### Выражение для TTL