Resolve conflicts

This commit is contained in:
alesapin 2019-08-26 21:08:58 +03:00
commit cf31187254
72 changed files with 594 additions and 326 deletions

View File

@ -865,7 +865,7 @@ bool TCPHandler::receiveData()
if (!(storage = query_context->tryGetExternalTable(external_table_name)))
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns});
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
query_context->addExternalTable(external_table_name, storage);
}

View File

@ -160,7 +160,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create("_external", data.second, ColumnsDescription{columns});
StoragePtr storage = StorageMemory::create("_external", data.second, ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
context.addExternalTable(data.second, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);

View File

@ -1,34 +1,86 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <Functions/FunctionHelpers.h>
#include <common/find_symbols.h>
#include <Parsers/formatAST.h>
#include <Columns/ColumnsCommon.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitors.h>
namespace DB
{
namespace ErrorCodes
{
extern const int VIOLATED_CONSTRAINT;
}
CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
const String & table_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))
{
}
void CheckConstraintsBlockOutputStream::write(const Block & block)
{
for (size_t i = 0; i < expressions.size(); ++i)
if (block.rows() > 0)
{
Block res = block;
auto constraint_expr = expressions[i];
auto res_column_uint8 = executeOnBlock(res, constraint_expr);
if (!memoryIsByte(res_column_uint8->getRawDataBegin<1>(), res_column_uint8->byteSize(), 0x1))
Block block_to_calculate = block;
for (size_t i = 0; i < expressions.size(); ++i)
{
auto indices_wrong = findAllWrong(res_column_uint8->getRawDataBegin<1>(), res_column_uint8->byteSize());
std::string indices_str = "{";
for (size_t j = 0; j < indices_wrong.size(); ++j)
{
indices_str += std::to_string(indices_wrong[j]);
indices_str += (j != indices_wrong.size() - 1) ? ", " : "}";
}
auto constraint_expr = expressions[i];
throw Exception{"Violated constraint " + constraints.constraints[i]->name +
" in table " + table + " at indices " + indices_str + ", constraint expression: " +
serializeAST(*(constraints.constraints[i]->expr), true), ErrorCodes::VIOLATED_CONSTRAINT};
constraint_expr->execute(block_to_calculate);
ColumnWithTypeAndName res_column = block_to_calculate.getByPosition(block_to_calculate.columns() - 1);
const ColumnUInt8 & res_column_uint8 = assert_cast<const ColumnUInt8 &>(*res_column.column);
const UInt8 * data = res_column_uint8.getData().data();
size_t size = res_column_uint8.size();
/// Is violated.
if (!memoryIsByte(data, size, 1))
{
size_t row_idx = 0;
for (; row_idx < size; ++row_idx)
if (data[row_idx] != 1)
break;
Names related_columns = constraint_expr->getRequiredColumns();
std::stringstream exception_message;
exception_message << "Constraint " << backQuote(constraints.constraints[i]->name)
<< " for table " << backQuote(table)
<< " is violated at row " << (rows_written + row_idx + 1)
<< ". Expression: (" << serializeAST(*(constraints.constraints[i]->expr), true) << ")"
<< ". Column values";
bool first = true;
for (const auto & name : related_columns)
{
const IColumn & column = *block.getByName(name).column;
assert(row_idx < column.size());
exception_message << (first ? ": " : ", ")
<< backQuoteIfNeed(name) << " = " << applyVisitor(FieldVisitorToString(), column[row_idx]);
first = false;
}
throw Exception{exception_message.str(), ErrorCodes::VIOLATED_CONSTRAINT};
}
}
}
output->write(block);
rows_written += block.rows();
}
@ -48,32 +100,4 @@ void CheckConstraintsBlockOutputStream::writeSuffix()
output->writeSuffix();
}
const ColumnUInt8 *CheckConstraintsBlockOutputStream::executeOnBlock(
Block & block,
const ExpressionActionsPtr & constraint)
{
constraint->execute(block);
ColumnWithTypeAndName res_column = block.safeGetByPosition(block.columns() - 1);
return checkAndGetColumn<ColumnUInt8>(res_column.column.get());
}
std::vector<size_t> CheckConstraintsBlockOutputStream::findAllWrong(const void *data, size_t size)
{
std::vector<size_t> res;
if (size == 0)
return res;
auto ptr = reinterpret_cast<const uint8_t *>(data);
for (size_t i = 0; i < size; ++i)
{
if (*(ptr + i) == 0x0)
{
res.push_back(i);
}
}
return res;
}
}

View File

@ -2,16 +2,14 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/ConstraintsDescription.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int VIOLATED_CONSTRAINT;
}
/** Check for constraints violation. If anything is found - throw an exception with detailed error message.
* Otherwise just pass block to output unchanged.
*/
class CheckConstraintsBlockOutputStream : public IBlockOutputStream
{
@ -21,14 +19,7 @@ public:
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
output(output_),
header(header_),
constraints(constraints_),
expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList())),
rows_written(0)
{ }
const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
@ -39,14 +30,11 @@ public:
void writeSuffix() override;
private:
const ColumnUInt8* executeOnBlock(Block & block, const ExpressionActionsPtr & constraint);
std::vector<size_t> findAllWrong(const void *data, size_t size);
String table;
BlockOutputStreamPtr output;
Block header;
const ConstraintsDescription constraints;
const ConstraintsExpressions expressions;
size_t rows_written;
size_t rows_written = 0;
};
}

View File

@ -231,7 +231,7 @@ DatabaseMySQL::MySQLStorageInfo DatabaseMySQL::createStorageInfo(
{
const auto & mysql_table = StorageMySQL::create(
database_name, table_name, std::move(mysql_pool), mysql_database_name, table_name,
false, "", ColumnsDescription{columns_name_and_type}, global_context);
false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context);
const auto & create_table_query = std::make_shared<ASTCreateQuery>();

View File

@ -83,6 +83,7 @@ std::pair<String, StoragePtr> createTableFromDefinition(
throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context);
ConstraintsDescription constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
return
{
@ -90,7 +91,7 @@ std::pair<String, StoragePtr> createTableFromDefinition(
StorageFactory::instance().get(
ast_create_query,
database_data_path, ast_create_query.table, database_name, context, context.getGlobalContext(),
columns,
columns, constraints,
true, has_force_restore_data_flag)
};
}

View File

@ -91,7 +91,7 @@ public:
Block sample = interpreter->getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
StoragePtr external_storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns});
StoragePtr external_storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns}, ConstraintsDescription{});
external_storage->startup();
/** We replace the subquery with the name of the temporary table.

View File

@ -381,26 +381,37 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(const ASTExpres
}
ColumnsDescription InterpreterCreateQuery::setColumns(
ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const ASTExpressionList * constraints)
{
ConstraintsDescription res;
if (constraints)
for (const auto & constraint : constraints->children)
res.constraints.push_back(std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
return res;
}
ColumnsDescription InterpreterCreateQuery::setProperties(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
if (create.columns_list)
{
if (create.columns_list->columns)
columns = getColumnsDescription(*create.columns_list->columns, context);
if (create.columns_list->indices)
for (const auto & index : create.columns_list->indices->children)
indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (create.columns_list->constraints)
for (const auto & constraint : create.columns_list->constraints->children)
constraints.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
}
else if (!create.as_table.empty())
{
@ -556,6 +567,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
ColumnsDescription columns;
ConstraintsDescription constraints;
StoragePtr res;
if (create.as_table_function)
@ -567,7 +579,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
else
{
/// Set and retrieve list of columns.
columns = setColumns(create, as_select_sample, as_storage);
columns = setProperties(create, as_select_sample, as_storage);
constraints = getConstraintsDescription(create.columns_list->constraints);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
@ -635,6 +648,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
context,
context.getGlobalContext(),
columns,
constraints,
create.attach,
false);
}

View File

@ -14,6 +14,7 @@ namespace DB
class Context;
class ASTCreateQuery;
class ASTExpressionList;
class ASTConstraintDeclaration;
/** Allows to create new table or database,
@ -45,13 +46,14 @@ public:
/// Obtain information about columns, their types, default values and column comments, for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, const Context & context);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
private:
BlockIO createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create);
/// Calculate list of columns of table and return it.
ColumnsDescription setColumns(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
/// Calculate list of columns, constraints, indices, etc... of table and return columns.
ColumnsDescription setProperties(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
void setEngine(ASTCreateQuery & create) const;
void checkAccess(const ASTCreateQuery & create);

View File

@ -121,8 +121,9 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
out = std::make_shared<CheckConstraintsBlockOutputStream>(
query.table, out, query_sample_block, table->getConstraints(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());

View File

@ -278,6 +278,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_nam
std::string data_path = database->getDataPath();
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context);
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
StoragePtr table = StorageFactory::instance().get(create,
data_path,
@ -286,6 +287,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_nam
system_context,
system_context.getGlobalContext(),
columns,
constraints,
create.attach,
false);

View File

@ -24,20 +24,8 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IStorage::IStorage(ColumnsDescription columns_)
IStorage::IStorage(ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
}
IStorage::IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
}
IStorage::IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_, IndicesDescription indices_) : virtuals(std::move(virtuals_))
{
setColumns(std::move(columns_));
setIndices(std::move(indices_));
}
const ColumnsDescription & IStorage::getColumns() const

View File

@ -65,9 +65,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>
{
public:
IStorage() = default;
explicit IStorage(ColumnsDescription columns_);
IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_);
IStorage(ColumnsDescription columns_, ColumnsDescription virtuals_, IndicesDescription indices_);
explicit IStorage(ColumnsDescription virtuals_);
virtual ~IStorage() = default;
IStorage(const IStorage &) = delete;

View File

@ -86,7 +86,6 @@ StorageKafka::StorageKafka(
size_t skip_broken_,
bool intermediate_commit_)
: IStorage(
columns_,
ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()},
{"_key", std::make_shared<DataTypeString>()},
{"_offset", std::make_shared<DataTypeUInt64>()},
@ -108,6 +107,7 @@ StorageKafka::StorageKafka(
, skip_broken(skip_broken_)
, intermediate_commit(intermediate_commit_)
{
setColumns(columns_);
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate();
}

View File

@ -218,9 +218,11 @@ StorageLiveView::StorageLiveView(
Context & local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns_)
: IStorage(columns_), table_name(table_name_),
: table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext())
{
setColumns(columns_);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -127,7 +127,6 @@ MergeTreeData::MergeTreeData(
{
const auto settings = getSettings();
setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_);
setConstraints(constraints_);
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical());

View File

@ -40,8 +40,11 @@ public:
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(part_->storage.getColumns(), part_->storage.getVirtuals(), part_->storage.getIndices()), part(part_)
{}
: IStorage(part_->storage.getVirtuals()), part(part_)
{
setColumns(part_->storage.getColumns());
setIndices(part_->storage.getIndices());
}
private:
MergeTreeData::DataPartPtr part;

View File

@ -422,21 +422,21 @@ static StoragePtr create(const StorageFactory::Arguments & args)
switch (merging_params.mode)
{
default:
break;
case MergeTreeData::MergingParams::Summing:
add_optional_param("list of columns to sum");
break;
case MergeTreeData::MergingParams::Replacing:
add_optional_param("version");
break;
case MergeTreeData::MergingParams::Collapsing:
add_mandatory_param("sign column");
break;
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
default:
break;
case MergeTreeData::MergingParams::Summing:
add_optional_param("list of columns to sum");
break;
case MergeTreeData::MergingParams::Replacing:
add_optional_param("version");
break;
case MergeTreeData::MergingParams::Collapsing:
add_mandatory_param("sign column");
break;
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
{
add_mandatory_param("sign column");
add_mandatory_param("version");
@ -575,6 +575,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ASTPtr ttl_table_ast;
IndicesDescription indices_description;
ConstraintsDescription constraints_description;
std::unique_ptr<MergeTreeSettings> storage_settings = std::make_unique<MergeTreeSettings>(args.context.getMergeTreeSettings());
if (is_extended_storage_def)
@ -601,12 +602,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (args.query.columns_list && args.query.columns_list->indices)
for (const auto & index : args.query.columns_list->indices->children)
indices_description.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (args.query.columns_list && args.query.columns_list->constraints)
for (const auto & constraint : args.query.columns_list->constraints->children)
constraints_description.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
storage_settings->loadFromQuery(*args.storage_def);
}
@ -644,14 +640,14 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
args.columns, indices_description, constraints_description,
args.columns, indices_description, args.constraints,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.data_path, args.database_name, args.table_name, args.columns, indices_description,
constraints_description, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
args.constraints, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
}

View File

@ -55,11 +55,12 @@ namespace ErrorCodes
}
StorageBuffer::StorageBuffer(const std::string & database_name_, const std::string & table_name_, const ColumnsDescription & columns_,
StorageBuffer::StorageBuffer(const std::string & database_name_, const std::string & table_name_,
const ColumnsDescription & columns_, const ConstraintsDescription & constraints_,
Context & context_,
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_, bool allow_materialized_)
: IStorage{columns_},
:
table_name(table_name_), database_name(database_name_), global_context(context_),
num_shards(num_shards_), buffers(num_shards_),
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
@ -67,6 +68,8 @@ StorageBuffer::StorageBuffer(const std::string & database_name_, const std::stri
no_destination(destination_database.empty() && destination_table.empty()),
allow_materialized(allow_materialized_), log(&Logger::get("StorageBuffer (" + table_name + ")"))
{
setColumns(columns_);
setConstraints(constraints_);
}
StorageBuffer::~StorageBuffer()
@ -746,7 +749,7 @@ void registerStorageBuffer(StorageFactory & factory)
return StorageBuffer::create(
args.database_name,
args.table_name, args.columns,
args.table_name, args.columns, args.constraints,
args.context,
num_buckets,
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},

View File

@ -139,7 +139,8 @@ protected:
/** num_shards - the level of internal parallelism (the number of independent buffers)
* The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded.
*/
StorageBuffer(const std::string & database_name_, const std::string & table_name_, const ColumnsDescription & columns_,
StorageBuffer(const std::string & database_name_, const std::string & table_name_,
const ColumnsDescription & columns_, const ConstraintsDescription & constraints_,
Context & context_,
size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_, bool allow_materialized_);

View File

@ -30,11 +30,13 @@ StorageDictionary::StorageDictionary(
const Context & context,
bool attach,
const String & dictionary_name_)
: IStorage{columns_}, table_name(table_name_),
: table_name(table_name_),
database_name(database_name_),
dictionary_name(dictionary_name_),
logger(&Poco::Logger::get("StorageDictionary"))
{
setColumns(columns_);
if (!attach)
{
const auto & dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);

View File

@ -209,6 +209,7 @@ StorageDistributed::StorageDistributed(
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
@ -216,13 +217,20 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach_)
: IStorage{columns_}, table_name(table_name_), database_name(database_name_),
: table_name(table_name_), database_name(database_name_),
remote_database(remote_database_), remote_table(remote_table_),
global_context(context_), cluster_name(global_context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
sharding_key_expr(sharding_key_ ? buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(table_name) + '/'))
{
setColumns(columns_);
setConstraints(constraints_);
if (sharding_key_)
{
sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false);
sharding_key_column_name = sharding_key_->getColumnName();
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
{
@ -237,15 +245,16 @@ StorageDistributed::StorageDistributed(
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach)
: StorageDistributed(database_name_, table_name_, columns_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
: StorageDistributed(database_name_, table_name_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
{
remote_table_function_ptr = remote_table_function_ptr_;
remote_table_function_ptr = remote_table_function_ptr_;
}
@ -258,7 +267,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -274,7 +283,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -636,7 +645,7 @@ void registerStorageDistributed(StorageFactory & factory)
}
return StorageDistributed::create(
args.database_name, args.table_name, args.columns,
args.database_name, args.table_name, args.columns, args.constraints,
remote_database, remote_table, cluster_name,
args.context, sharding_key, args.data_path,
args.attach);

View File

@ -160,6 +160,7 @@ protected:
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
@ -172,6 +173,7 @@ protected:
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,

View File

@ -46,6 +46,7 @@ StoragePtr StorageFactory::get(
Context & local_context,
Context & context,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
bool attach,
bool has_force_restore_data_flag) const
{
@ -154,6 +155,7 @@ StoragePtr StorageFactory::get(
.local_context = local_context,
.context = context,
.columns = columns,
.constraints = constraints,
.attach = attach,
.has_force_restore_data_flag = has_force_restore_data_flag
};

View File

@ -3,6 +3,7 @@
#include <Common/NamePrompter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <ext/singleton.h>
#include <unordered_map>
@ -35,6 +36,7 @@ public:
Context & local_context;
Context & context;
const ColumnsDescription & columns;
const ConstraintsDescription & constraints;
bool attach;
bool has_force_restore_data_flag;
};
@ -49,6 +51,7 @@ public:
Context & local_context,
Context & context,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
bool attach,
bool has_force_restore_data_flag) const;

View File

@ -72,10 +72,14 @@ StorageFile::StorageFile(
const std::string & table_name_,
const std::string & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorage(columns_),
:
table_name(table_name_), database_name(database_name_), format_name(format_name_), context_global(context_), table_fd(table_fd_)
{
setColumns(columns_);
setConstraints(constraints_);
if (table_fd < 0) /// Will use file
{
use_table_fd = false;
@ -330,7 +334,7 @@ void registerStorageFile(StorageFactory & factory)
return StorageFile::create(
source_path, source_fd,
args.data_path,
args.database_name, args.table_name, format_name, args.columns,
args.database_name, args.table_name, format_name, args.columns, args.constraints,
args.context);
});
}

View File

@ -60,6 +60,7 @@ protected:
const std::string & table_name_,
const std::string & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
private:

View File

@ -30,14 +30,16 @@ StorageHDFS::StorageHDFS(const String & uri_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorage(columns_)
, uri(uri_)
: uri(uri_)
, format_name(format_name_)
, table_name(table_name_)
, database_name(database_name_)
, context(context_)
{
setColumns(columns_);
setConstraints(constraints_);
}
namespace
@ -175,7 +177,7 @@ void registerStorageHDFS(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.context);
return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
});
}

View File

@ -37,6 +37,7 @@ protected:
const String & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
private:

View File

@ -36,8 +36,9 @@ StorageJoin::StorageJoin(
ASTTableJoin::Kind kind_,
ASTTableJoin::Strictness strictness_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite)
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_}
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_, constraints_}
, key_names(key_names_)
, use_nulls(use_nulls_)
, limits(limits_)
@ -170,6 +171,7 @@ void registerStorageJoin(StorageFactory & factory)
kind,
strictness,
args.columns,
args.constraints,
join_any_take_last_row);
});
}

View File

@ -64,6 +64,7 @@ protected:
SizeLimits limits_,
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite);
};

View File

@ -422,12 +422,15 @@ StorageLog::StorageLog(
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
size_t max_compress_block_size_)
: IStorage{columns_},
path(path_), table_name(table_name_), database_name(database_name_),
: path(path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(table_name) + '/' + "sizes.json")
{
setColumns(columns_);
setConstraints(constraints_);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -644,7 +647,7 @@ void registerStorageLog(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageLog::create(
args.data_path, args.database_name, args.table_name, args.columns,
args.data_path, args.database_name, args.table_name, args.columns, args.constraints,
args.context.getSettings().max_compress_block_size);
});
}

View File

@ -57,6 +57,7 @@ protected:
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
size_t max_compress_block_size_);
private:

View File

@ -99,9 +99,11 @@ StorageMaterializedView::StorageMaterializedView(
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_)
: IStorage{columns_}, table_name(table_name_),
: table_name(table_name_),
database_name(database_name_), global_context(local_context.getGlobalContext())
{
setColumns(columns_);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -31,6 +31,7 @@ public:
}
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop() override;
void truncate(const ASTPtr &, const Context &) override;

View File

@ -74,9 +74,11 @@ private:
};
StorageMemory::StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_)
: IStorage{std::move(columns_description_)}, database_name(std::move(database_name_)), table_name(std::move(table_name_))
StorageMemory::StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: database_name(std::move(database_name_)), table_name(std::move(table_name_))
{
setColumns(std::move(columns_description_));
setConstraints(std::move(constraints_));
}
@ -143,7 +145,7 @@ void registerStorageMemory(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageMemory::create(args.database_name, args.table_name, args.columns);
return StorageMemory::create(args.database_name, args.table_name, args.columns, args.constraints);
});
}

View File

@ -55,7 +55,7 @@ private:
std::mutex mutex;
protected:
StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_);
StorageMemory(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
};
}

View File

@ -53,13 +53,14 @@ StorageMerge::StorageMerge(
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
: IStorage(columns_, ColumnsDescription({{"_table", std::make_shared<DataTypeString>()}}, true))
: IStorage(ColumnsDescription({{"_table", std::make_shared<DataTypeString>()}}, true))
, table_name(table_name_)
, database_name(database_name_)
, source_database(source_database_)
, table_name_regexp(table_name_regexp_)
, global_context(context_)
{
setColumns(columns_);
}

View File

@ -45,9 +45,9 @@ StorageMySQL::StorageMySQL(
const bool replace_query_,
const std::string & on_duplicate_clause_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
: IStorage{columns_}
, table_name(table_name_)
: table_name(table_name_)
, database_name(database_name_)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
@ -56,6 +56,8 @@ StorageMySQL::StorageMySQL(
, pool(std::move(pool_))
, global_context(context_)
{
setColumns(columns_);
setConstraints(constraints_);
}
@ -241,6 +243,7 @@ void registerStorageMySQL(StorageFactory & factory)
replace_query,
on_duplicate_clause,
args.columns,
args.constraints,
args.context);
});
}

View File

@ -28,6 +28,7 @@ public:
const bool replace_query_,
const std::string & on_duplicate_clause_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_);
std::string getName() const override { return "MySQL"; }
@ -54,7 +55,6 @@ private:
bool replace_query;
std::string on_duplicate_clause;
mysqlxx::Pool pool;
Context global_context;
};

View File

@ -26,7 +26,7 @@ void registerStorageNull(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageNull::create(args.database_name, args.table_name, args.columns);
return StorageNull::create(args.database_name, args.table_name, args.columns, args.constraints);
});
}

View File

@ -51,9 +51,11 @@ private:
String database_name;
protected:
StorageNull(String database_name_, String table_name_, ColumnsDescription columns_description_)
: IStorage{std::move(columns_description_)}, table_name(std::move(table_name_)), database_name(std::move(database_name_))
StorageNull(String database_name_, String table_name_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: table_name(std::move(table_name_)), database_name(std::move(database_name_))
{
setColumns(std::move(columns_description_));
setConstraints(std::move(constraints_));
}
};

View File

@ -376,7 +376,7 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
}
/** Verify that list of columns and table storage_settings match those specified in ZK (/ metadata).
/** Verify that list of columns and table storage_settings_ptr match those specified in ZK (/ metadata).
* If not, throw an exception.
*/
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter)
@ -637,8 +637,8 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const auto & part : parts)
total_rows_on_filesystem += part->rows_count;
const auto storage_settings = getSettings();
bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings->replicated_max_ratio_of_wrong_parts;
const auto storage_settings_ptr = getSettings();
bool insane = unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
if (insane && !skip_sanity_checks)
{
@ -781,13 +781,13 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (!has_been_already_added)
{
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
String part_path = replica_path + "/parts/" + part_name;
ops.emplace_back(zkutil::makeCheckRequest(
zookeeper_path + "/columns", expected_columns_version));
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
{
ops.emplace_back(zkutil::makeCreateRequest(
part_path, local_part_header.toString(), zkutil::CreateMode::Persistent));
@ -1035,14 +1035,14 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
parts.push_back(part);
}
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
if (!have_all_parts)
{
/// If you do not have all the necessary parts, try to take some already merged part from someone.
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
return false;
}
else if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
else if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
/// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica.
@ -1051,7 +1051,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
for (const auto & part : parts)
sum_parts_bytes_on_disk += part->bytes_on_disk;
if (sum_parts_bytes_on_disk >= storage_settings->prefer_fetch_merged_part_size_threshold)
if (sum_parts_bytes_on_disk >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
{
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
@ -1161,7 +1161,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
{
const String & source_part_name = entry.source_parts.at(0);
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name);
DataPartPtr source_part = getActiveContainingPart(source_part_name);
@ -1181,8 +1181,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
if (entry.create_time + storage_settings->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings->prefer_fetch_merged_part_size_threshold)
if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
{
/// If entry is old enough, and have enough size, and some replica has the desired part,
/// then prefer fetching from replica.
@ -1276,21 +1276,21 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
String replica = findReplicaHavingCoveringPart(entry, true);
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
static std::atomic_uint total_fetches {0};
if (storage_settings->replicated_max_parallel_fetches && total_fetches >= storage_settings->replicated_max_parallel_fetches)
if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
{
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings->replicated_max_parallel_fetches.toString(),
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++total_fetches;
SCOPE_EXIT({--total_fetches;});
if (storage_settings->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings->replicated_max_parallel_fetches_for_table)
if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
{
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings->replicated_max_parallel_fetches_for_table.toString(),
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
@ -2213,7 +2213,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (!is_leader)
return;
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
const bool deduplicate = false; /// TODO: read deduplicate option from table config
const bool force_ttl = false;
@ -2235,16 +2235,16 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second;
if (merges_and_mutations_sum >= storage_settings->max_replicated_merges_in_queue)
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges (" << merges_and_mutations_queued.first << ") and part mutations ("
<< merges_and_mutations_queued.second << ") is greater than max_replicated_merges_in_queue ("
<< storage_settings->max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
<< storage_settings_ptr->max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
}
else
{
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings->max_replicated_merges_in_queue, merges_and_mutations_sum);
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
FutureMergedMutatedPart future_merged_part;
@ -2256,7 +2256,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
}
/// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
&& merges_and_mutations_queued.second < storage_settings->max_replicated_mutations_in_queue)
&& merges_and_mutations_queued.second < storage_settings_ptr->max_replicated_mutations_in_queue)
{
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVector();
@ -3029,11 +3029,11 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
assertNotReadonly();
const Settings & query_settings = context.getSettingsRef();
bool deduplicate = storage_settings->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate);
@ -3067,7 +3067,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
};
bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL()));
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
if (!partition && final)
{
@ -3100,7 +3100,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
if (!partition)
{
selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, storage_settings->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
future_merged_part, true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
else
{
@ -3153,9 +3153,9 @@ void StorageReplicatedMergeTree::alter(
if (params.isSettingsAlter())
{
/// We don't replicate storage_settings ALTER. It's local operation.
/// We don't replicate storage_settings_ptr ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
LOG_DEBUG(log, "ALTER storage_settings only");
LOG_DEBUG(log, "ALTER storage_settings_ptr only");
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
alterSettings(new_changes, query_context, table_lock_holder);
@ -3950,10 +3950,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
auto zookeeper = tryGetZooKeeper();
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
res.is_leader = is_leader;
res.can_become_leader = storage_settings->replicated_can_become_leader;
res.can_become_leader = storage_settings_ptr->replicated_can_become_leader;
res.is_readonly = is_readonly;
res.is_session_expired = !zookeeper || zookeeper->expired();
@ -4143,14 +4143,14 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
out_absolute_delay = getAbsoluteDelay();
out_relative_delay = 0;
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
/** Relative delay is the maximum difference of absolute delay from any other replica,
* (if this replica lags behind any other live replica, or zero, otherwise).
* Calculated only if the absolute delay is large enough.
*/
if (out_absolute_delay < static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
if (out_absolute_delay < static_cast<time_t>(storage_settings_ptr->min_relative_delay_to_yield_leadership))
return;
auto zookeeper = getZooKeeper();
@ -4971,7 +4971,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
const String & block_id_path) const
{
const String & part_name = part->name;
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
if (!block_id_path.empty())
{
@ -4989,7 +4989,7 @@ void StorageReplicatedMergeTree::getCommitPartOps(
zookeeper_path + "/columns",
columns_version));
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
{
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/parts/" + part->name,
@ -5018,12 +5018,12 @@ void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit(
AlterDataPartTransaction & transaction)
{
String part_path = replica_path + "/parts/" + transaction.getPartName();
const auto storage_settings = getSettings();
const auto storage_settings_ptr = getSettings();
bool need_delete_columns_and_checksums_nodes = false;
try
{
if (storage_settings->use_minimalistic_part_header_in_zookeeper)
if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
{
auto part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
transaction.getNewColumns(), transaction.getNewChecksums());
@ -5203,9 +5203,9 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, const C
bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
{
const auto storage_settings = getSettings();
return storage_settings->index_granularity_bytes != 0 &&
(storage_settings->enable_mixed_granularity_parts ||
const auto storage_settings_ptr = getSettings();
return storage_settings_ptr->index_granularity_bytes != 0 &&
(storage_settings_ptr->enable_mixed_granularity_parts ||
(!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
}

View File

@ -90,9 +90,13 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_)
: IStorage{columns_}, table_name(table_name_), database_name(database_name_)
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
setConstraints(constraints_);
if (path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -105,8 +109,9 @@ StorageSet::StorageSet(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_)
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_},
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: StorageSetOrJoinBase{path_, database_name_, table_name_, columns_, constraints_},
set(std::make_shared<Set>(SizeLimits(), false))
{
Block header = getSampleBlock();
@ -209,7 +214,7 @@ void registerStorageSet(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageSet::create(args.data_path, args.database_name, args.table_name, args.columns);
return StorageSet::create(args.data_path, args.database_name, args.table_name, args.columns, args.constraints);
});
}

View File

@ -33,7 +33,8 @@ protected:
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_);
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
String path;
String table_name;
@ -81,7 +82,8 @@ protected:
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_);
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
};
}

View File

@ -198,14 +198,17 @@ StorageStripeLog::StorageStripeLog(
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_)
: IStorage{columns_},
path(path_), table_name(table_name_), database_name(database_name_),
: path(path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(table_name) + '/' + "sizes.json"),
log(&Logger::get("StorageStripeLog"))
{
setColumns(columns_);
setConstraints(constraints_);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -316,7 +319,7 @@ void registerStorageStripeLog(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageStripeLog::create(
args.data_path, args.database_name, args.table_name, args.columns,
args.data_path, args.database_name, args.table_name, args.columns, args.constraints,
args.attach, args.context.getSettings().max_compress_block_size);
});
}

View File

@ -74,6 +74,7 @@ protected:
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_);
};

View File

@ -327,14 +327,17 @@ StorageTinyLog::StorageTinyLog(
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_)
: IStorage{columns_},
path(path_), table_name(table_name_), database_name(database_name_),
: path(path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(table_name) + '/' + "sizes.json"),
log(&Logger::get("StorageTinyLog"))
{
setColumns(columns_);
setConstraints(constraints_);
if (path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -450,7 +453,7 @@ void registerStorageTinyLog(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageTinyLog::create(
args.data_path, args.database_name, args.table_name, args.columns,
args.data_path, args.database_name, args.table_name, args.columns, args.constraints,
args.attach, args.context.getSettings().max_compress_block_size);
});
}

View File

@ -78,6 +78,7 @@ protected:
const std::string & database_name_,
const std::string & table_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_);
};

View File

@ -24,14 +24,18 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
IStorageURLBase::IStorageURLBase(const Poco::URI & uri_,
IStorageURLBase::IStorageURLBase(
const Poco::URI & uri_,
const Context & context_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_)
: IStorage(columns_), uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
setConstraints(constraints_);
}
namespace
@ -214,7 +218,7 @@ void registerStorageURL(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageURL::create(uri, args.database_name, args.table_name, format_name, args.columns, args.context);
return StorageURL::create(uri, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
});
}
}

View File

@ -2,9 +2,9 @@
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
/**
@ -19,7 +19,8 @@ public:
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
BlockInputStreams read(const Names & column_names,
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
@ -31,12 +32,14 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
protected:
IStorageURLBase(const Poco::URI & uri_,
IStorageURLBase(
const Poco::URI & uri_,
const Context & context_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_);
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
Poco::URI uri;
const Context & context_global;
@ -48,13 +51,15 @@ private:
virtual std::string getReadMethod() const;
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
@ -63,16 +68,19 @@ private:
virtual Block getHeaderBlock(const Names & column_names) const = 0;
};
class StorageURL : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
{
public:
StorageURL(const Poco::URI & uri_,
StorageURL(
const Poco::URI & uri_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorageURLBase(uri_, context_, database_name_, table_name_, format_name_, columns_)
: IStorageURLBase(uri_, context_, database_name_, table_name_, format_name_, columns_, constraints_)
{
}

View File

@ -31,8 +31,10 @@ StorageView::StorageView(
const String & table_name_,
const ASTCreateQuery & query,
const ColumnsDescription & columns_)
: IStorage{columns_}, table_name(table_name_), database_name(database_name_)
: table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);

View File

@ -30,7 +30,8 @@ StorageXDBC::StorageXDBC(
const ColumnsDescription & columns_,
const Context & context_,
const BridgeHelperPtr bridge_helper_)
: IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_)
/// Please add support for constraints as soon as StorageODBC or JDBC will support insertion.
: IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_, ConstraintsDescription{})
, bridge_helper(bridge_helper_)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)

View File

@ -1,59 +1,61 @@
#pragma once
#include <Storages/StorageURL.h>
#include <ext/shared_ptr_helper.h>
#include <Common/XDBCBridgeHelper.h>
namespace DB
{
/** Implements storage in the XDBC database.
* Use ENGINE = xdbc(connection_string, table_name)
* Example ENGINE = odbc('dsn=test', table)
* Read only.
*/
class StorageXDBC : public IStorageURLBase
{
public:
class StorageXDBC : public IStorageURLBase
{
public:
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
StorageXDBC(const std::string & database_name_,
const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
StorageXDBC(const std::string & database_name_,
const std::string & table_name_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
private:
private:
BridgeHelperPtr bridge_helper;
std::string remote_database_name;
std::string remote_table_name;
BridgeHelperPtr bridge_helper;
std::string remote_database_name;
std::string remote_table_name;
Poco::Logger * log;
Poco::Logger * log;
std::string getReadMethod() const override;
std::string getReadMethod() const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names) const override;
Block getHeaderBlock(const Names & column_names) const override;
std::string getName() const override;
};
std::string getName() const override;
};
}

View File

@ -23,7 +23,7 @@ struct State
{
registerFunctions();
DatabasePtr database = std::make_shared<DatabaseMemory>("test");
database->attachTable("table", StorageMemory::create("test", "table", ColumnsDescription{columns}));
database->attachTable("table", StorageMemory::create("test", "table", ColumnsDescription{columns}, ConstraintsDescription{}));
context.makeGlobalContext();
context.addDatabase("test", database);
context.setCurrentDatabase("test");

View File

@ -25,7 +25,7 @@ try
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
names_and_types.emplace_back("b", std::make_shared<DataTypeUInt8>());
StoragePtr table = StorageLog::create("./", "test", "test", ColumnsDescription{names_and_types}, 1048576);
StoragePtr table = StorageLog::create("./", "test", "test", ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
table->startup();
auto context = Context::createGlobal();

View File

@ -15,6 +15,7 @@ StoragePtr TableFunctionFile::getStorage(
table_name,
format,
columns,
ConstraintsDescription{},
global_context);
}

View File

@ -16,6 +16,7 @@ StoragePtr TableFunctionHDFS::getStorage(
table_name,
format,
columns,
ConstraintsDescription{},
global_context);
}

View File

@ -127,6 +127,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
replace_query,
on_duplicate_clause,
ColumnsDescription{columns},
ConstraintsDescription{},
context);
res->startup();

View File

@ -4,13 +4,14 @@
#include <TableFunctions/TableFunctionURL.h>
#include <Poco/URI.h>
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
{
Poco::URI uri(source);
return StorageURL::create(uri, getDatabaseName(), table_name, format, columns, global_context);
return StorageURL::create(uri, getDatabaseName(), table_name, format, columns, ConstraintsDescription{}, global_context);
}
void registerTableFunctionURL(TableFunctionFactory & factory)

View File

@ -3,6 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
EXCEPTION_TEXT=violated
EXCEPTION_SUCCESS_TEXT=ok
$CLICKHOUSE_CLIENT --query="CREATE DATABASE IF NOT EXISTS test;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test_constraints;"
@ -20,7 +21,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 2);"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
# This one must throw and exception
EXCEPTION_TEXT="Violated constraint b_constraint in table test_constraints at indices {1, 3}"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (3, 4), (1, 0), (3, 4), (6, 0);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
@ -38,13 +39,11 @@ $CLICKHOUSE_CLIENT --query="CREATE TABLE test_constraints
ENGINE = MergeTree ORDER BY (a);"
# This one must throw an exception
EXCEPTION_TEXT="Violated constraint b_constraint in table test_constraints at indices {0}"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 2);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
# This one must throw an exception
EXCEPTION_TEXT="Violated constraint a_constraint in table test_constraints at indices {1}"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (5, 16), (10, 11), (9, 11), (8, 12);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"

View File

@ -3,6 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
EXCEPTION_TEXT=violated
EXCEPTION_SUCCESS_TEXT=ok
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS test_constraints;"
@ -20,7 +21,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 2);"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"
# This one must throw and exception
EXCEPTION_TEXT="Violated constraint b_constraint in table test_constraints at indices"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_constraints VALUES (1, 0);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not thrown an exception"
$CLICKHOUSE_CLIENT --query="SELECT * FROM test_constraints;"

View File

@ -0,0 +1,14 @@
0
0
3
0
0
3
0
0
3
0
0
3
CREATE TABLE default.constrained (`URL` String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log
CREATE TABLE default.constrained2 (`URL` String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log

View File

@ -0,0 +1,53 @@
DROP TABLE IF EXISTS constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Null;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Memory;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = StripeLog;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = TinyLog;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), ('https://yandex.ru/te\xFFst'); -- { serverError 469 }
SELECT count() FROM constrained;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('ftp://yandex.ru/Hello'), (toValidUTF8('https://yandex.ru/te\xFFst'));
SELECT count() FROM constrained;
DROP TABLE constrained;
DROP TABLE IF EXISTS constrained2;
CREATE TABLE constrained (URL String, CONSTRAINT is_yandex CHECK domainWithoutWWW(URL) = 'yandex.ru', CONSTRAINT is_utf8 CHECK isValidUTF8(URL)) ENGINE = Log;
CREATE TABLE constrained2 AS constrained;
SHOW CREATE TABLE constrained FORMAT TSVRaw;
SHOW CREATE TABLE constrained2 FORMAT TSVRaw;
INSERT INTO constrained VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
INSERT INTO constrained2 VALUES ('https://www.yandex.ru/?q=upyachka'), ('Hello'), ('test'); -- { serverError 469 }
DROP TABLE constrained;
DROP TABLE constrained2;

View File

@ -1,9 +1,9 @@
#!/usr/bin/env bash
#ccache -s
#ccache -s # uncomment to display CCache statistics
mkdir -p /server/build_docker
cd /server/build_docker
cmake -G Ninja /server -DCMAKE_C_COMPILER=`which clang-8` -DCMAKE_CXX_COMPILER=`which clang++-8` -DCMAKE_BUILD_TYPE=Debug
cmake -G Ninja /server -DCMAKE_C_COMPILER=`which gcc-8` -DCMAKE_CXX_COMPILER=`which g++-8`
# Set the number of build jobs to the half of number of virtual CPU cores (rounded up).
# By default, ninja use all virtual CPU cores, that leads to very high memory consumption without much improvement in build time.

View File

@ -1,13 +1,15 @@
# Enum
Enumerated type storing pairs of the `'string' = integer` format.
Enumerated type consisting of named values.
Named values must be delcared as `'string' = integer` pairs. ClickHouse stores only numbers, but supports operations with the values through their names.
ClickHouse supports:
- 8-bit `Enum`. It can contain up to 256 values with enumeration of `[-128, 127]`.
- 16-bit `Enum`. It can contain up to 65536 values with enumeration of `[-32768, 32767]`.
- 8-bit `Enum`. It can contain up to 256 values enumerated in the `[-128, 127]` range.
- 16-bit `Enum`. It can contain up to 65536 values enumerated in the `[-32768, 32767]` range.
ClickHouse automatically chooses a type for `Enum` at data insertion. Also, you can use `Enum8` or `Enum16` types to be sure in size of storage.
ClickHouse automatically chooses the type of `Enum` when data is inserted. You can also use `Enum8` or `Enum16` types to be sure in the size of storage.
## Usage examples
@ -21,7 +23,7 @@ CREATE TABLE t_enum
ENGINE = TinyLog
```
This column `x` can only store the values that are listed in the type definition: `'hello'` or `'world'`. If you try to save any other value, ClickHouse will generate an exception. ClickHouse automatically chooses the 8-bit size for enumeration of this `Enum`.
Column `x` can only store values that are listed in the type definition: `'hello'` or `'world'`. If you try to save any other value, ClickHouse will raise an exception. 8-bit size for this `Enum` is chosen automatically.
```sql
:) INSERT INTO t_enum VALUES ('hello'), ('world'), ('hello')

View File

@ -29,7 +29,7 @@ These actions are described in detail below.
ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [AFTER name_after]
```
Adds a new column to the table with the specified `name`, `type`, and `default_expr` (see the section [Default expressions](create.md#create-default-values)).
Adds a new column to the table with the specified `name`, `type`, and `default_expr` (see the section [Default expressions](create.md#create-default-values)).
If the `IF NOT EXISTS` clause is included, the query won't return an error if the column already exists. If you specify `AFTER name_after` (the name of another column), the column is added after the specified one in the list of table columns. Otherwise, the column is added to the end of the table. Note that there is no way to add a column to the beginning of a table. For a chain of actions, `name_after` can be the name of a column that is added in one of the previous actions.
@ -66,7 +66,7 @@ CLEAR COLUMN [IF EXISTS] name IN PARTITION partition_name
```
Resets all data in a column for a specified partition. Read more about setting the partition name in the section [How to specify the partition expression](#alter-how-to-specify-part-expr).
If the `IF EXISTS` clause is specified, the query won't return an error if the column doesn't exist.
Example:
@ -85,7 +85,7 @@ Adds a comment to the column. If the `IF EXISTS` clause is specified, the query
Each column can have one comment. If a comment already exists for the column, a new comment overwrites the previous comment.
Comments are stored in the `comment_expression` column returned by the [DESCRIBE TABLE](misc.md#misc-describe-table) query.
Comments are stored in the `comment_expression` column returned by the [DESCRIBE TABLE](misc.md#misc-describe-table) query.
Example:
@ -129,7 +129,7 @@ The `ALTER` query lets you create and delete separate elements (columns) in nest
There is no support for deleting columns in the primary key or the sampling key (columns that are used in the `ENGINE` expression). Changing the type for columns that are included in the primary key is only possible if this change does not cause the data to be modified (for example, you are allowed to add values to an Enum or to change a type from `DateTime` to `UInt32`).
If the `ALTER` query is not sufficient to make the table changes you need, you can create a new table, copy the data to it using the [INSERT SELECT](insert_into.md#insert_query_insert-select) query, then switch the tables using the [RENAME](misc.md#misc_operations-rename) query and delete the old table. You can use the [clickhouse-copier](../operations/utils/clickhouse-copier.md) as an alternative to the `INSERT SELECT` query.
If the `ALTER` query is not sufficient to make the table changes you need, you can create a new table, copy the data to it using the [INSERT SELECT](insert_into.md#insert_query_insert-select) query, then switch the tables using the [RENAME](misc.md#misc_operations-rename) query and delete the old table. You can use the [clickhouse-copier](../operations/utils/clickhouse-copier.md) as an alternative to the `INSERT SELECT` query.
The `ALTER` query blocks all reads and writes for the table. In other words, if a long `SELECT` is running at the time of the `ALTER` query, the `ALTER` query will wait for it to complete. At the same time, all new queries to the same table will wait while this `ALTER` is running.
@ -178,9 +178,9 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Queries will add or remove metadata about constraints from table so they are processed immediately.
Constraint check *will not be executed* on existing table if it was added. For now, we recommend to create new table and use `INSERT SELECT` query to fill new table.
Constraint check *will not be executed* on existing data if it was added.
All changes on distributed tables are broadcasting to ZooKeeper so will be applied on other replicas.
All changes on replicated tables are broadcasting to ZooKeeper so will be applied on other replicas.
### Manipulations With Partitions and Parts {#alter_manipulations-with-partitions}
@ -267,7 +267,7 @@ This query copies the data partition from the `table1` to `table2`. Note that da
For the query to run successfully, the following conditions must be met:
- Both tables must have the same structure.
- Both tables must have the same partition key.
- Both tables must have the same partition key.
#### CLEAR COLUMN IN PARTITION {#alter_clear-column-partition}
@ -289,13 +289,13 @@ ALTER TABLE visits CLEAR COLUMN hour in PARTITION 201902
ALTER TABLE table_name FREEZE [PARTITION partition_expr]
```
This query creates a local backup of a specified partition. If the `PARTITION` clause is omitted, the query creates the backup of all partitions at once.
This query creates a local backup of a specified partition. If the `PARTITION` clause is omitted, the query creates the backup of all partitions at once.
Note that for old-styled tables you can specify the prefix of the partition name (for example, '2019') - then the query creates the backup for all the corresponding partitions. Read about setting the partition expression in a section [How to specify the partition expression](#alter-how-to-specify-part-expr).
!!! note
The entire backup process is performed without stopping the server.
At the time of execution, for a data snapshot, the query creates hardlinks to a table data. Hardlinks are placed in the directory `/var/lib/clickhouse/shadow/N/...`, where:
- `/var/lib/clickhouse/` is the working ClickHouse directory specified in the config.
@ -348,7 +348,7 @@ ALTER TABLE users ATTACH PARTITION 201902;
```
Note that:
- The `ALTER ... FETCH PARTITION` query isn't replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER ... FETCH PARTITION` query isn't replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER TABLE ... ATTACH` query is replicated. It adds the data to all replicas. The data is added to one of the replicas from the `detached` directory, and to the others - from neighboring replicas.
Before downloading, the system checks if the partition exists and the table structure matches. The most appropriate replica is selected automatically from the healthy replicas.

View File

@ -2,17 +2,19 @@
All these functions don't follow the RFC. They are maximally simplified for improved performance.
## Functions that extract part of a URL
## Functions that Extract Parts of a URL
If there isn't anything similar in a URL, an empty string is returned.
If the relevant part isn't present in a URL, an empty string is returned.
### protocol
Returns the protocol. Examples: http, ftp, mailto, magnet...
Extracts the protocol from a URL.
Examples of typical returned values: http, https, ftp, mailto, tel, magnet...
### domain
Extracts the host part from URL.
Extracts the hostname from a URL.
```
domain(url)
@ -23,7 +25,7 @@ domain(url)
- `url` — URL. Type: [String](../../data_types/string.md).
URL can be specified with or without scheme. Examples:
The URL can be specified with or without a scheme. Examples:
```
svn+ssh://some.svn-hosting.com:80/repo/trunk
@ -31,10 +33,18 @@ some.svn-hosting.com:80/repo/trunk
https://yandex.com/time/
```
For these examples, the `domain` function returns the following results:
```
some.svn-hosting.com
some.svn-hosting.com
yandex.com
```
**Returned values**
- Host name. If ClickHouse can parse input string as URL.
- Empty string. If ClickHouse cannot parse input string as URL.
- Host name. If ClickHouse can parse the input string as a URL.
- Empty string. If ClickHouse can't parse the input string as a URL.
Type: `String`.
@ -55,7 +65,7 @@ Returns the domain and removes no more than one 'www.' from the beginning of it,
### topLevelDomain
Extracts the the top-level domain from URL.
Extracts the the top-level domain from a URL.
```
topLevelDomain(url)
@ -65,7 +75,7 @@ topLevelDomain(url)
- `url` — URL. Type: [String](../../data_types/string.md).
URL can be specified with or without scheme. Examples:
The URL can be specified with or without a scheme. Examples:
```
svn+ssh://some.svn-hosting.com:80/repo/trunk
@ -75,8 +85,8 @@ https://yandex.com/time/
**Returned values**
- Domain name. If ClickHouse can parse input string as URL.
- Empty string. If ClickHouse cannot parse input string as URL.
- Domain name. If ClickHouse can parse the input string as a URL.
- Empty string. If ClickHouse cannot parse the input string as a URL.
Type: `String`.

View File

@ -1,27 +1,32 @@
# Enum
# Enum8, Enum16
Перечисляемый тип данных, содержащий именованные значения.
Включает в себя типы `Enum8` и `Enum16`. `Enum` сохраняет конечный набор пар `'строка' = целое число`. Все операции с данными типа `Enum` ClickHouse выполняет как с числами, однако пользователь при этом работает со строковыми константами. Это более эффективно с точки зрения производительности, чем работа с типом данных `String`.
Именованные значения задаются парами `'string' = integer`. ClickHouse хранить только числа, но допускает операции над ними с помощью заданных имён.
- `Enum8` описывается парами `'String' = Int8`.
- `Enum16` описывается парами `'String' = Int16`.
ClickHouse поддерживает:
## Примеры применения
- 8-битный `Enum`. Может содержать до 256 значений, пронумерованных в диапазоне `[-128, 127]`.
- 16-битный `Enum`. Может содержать до 65536 значений, пронумерованных в диапазоне `[-32768, 32767]`.
ClickHouse автоматически выбирает размерность `Enum` при вставке данных. Чтобы точно понимать размер хранимых данных можно использовать типы `Enum8` или `Enum16`.
## Примеры использования
Создадим таблицу со столбцом типа `Enum8('hello' = 1, 'world' = 2)`.
```
```sql
CREATE TABLE t_enum
(
x Enum8('hello' = 1, 'world' = 2)
x Enum('hello' = 1, 'world' = 2)
)
ENGINE = TinyLog
```
В столбец `x` можно сохранять только значения, перечисленные при определении типа, т.е. `'hello'` или `'world'`. Если попытаться сохранить другое значение, ClickHouse сгенерирует исключение.
В столбец `x` можно сохранять только значения, перечисленные при определении типа, т.е. `'hello'` или `'world'`. Если вы попытаетесь сохранить любое другое значение, ClickHouse сгенерирует исключение. ClickHouse автоматически выберет размерность 8-bit для этого `Enum`.
```
:) INSERT INTO t_enum Values('hello'),('world'),('hello')
```sql
:) INSERT INTO t_enum VALUES ('hello'), ('world'), ('hello')
INSERT INTO t_enum VALUES
@ -35,12 +40,12 @@ INSERT INTO t_enum VALUES
Exception on client:
Code: 49. DB::Exception: Unknown element 'a' for type Enum8('hello' = 1, 'world' = 2)
Code: 49. DB::Exception: Unknown element 'a' for type Enum('hello' = 1, 'world' = 2)
```
При запросе данных из таблицы ClickHouse выдаст строковые значения из `Enum`.
```
```sql
SELECT * FROM t_enum
┌─x─────┐
@ -49,10 +54,11 @@ SELECT * FROM t_enum
│ hello │
└───────┘
```
Если необходимо увидеть цифровые эквиваленты строкам, то необходимо привести тип.
```
SELECT CAST(x, 'Int8') FROM t_enum
Если необходимо увидеть цифровые эквиваленты строкам, то необходимо привести тип `Enum` к целочисленному.
```sql
SELECT CAST(x AS Int8) FROM t_enum
┌─CAST(x, 'Int8')─┐
│ 1 │
@ -61,14 +67,14 @@ SELECT CAST(x, 'Int8') FROM t_enum
└─────────────────┘
```
Чтобы создать значение типа Enum в запросе, также необходима функция `CAST`.
Чтобы создать значение типа `Enum` в запросе, также необходимо использовать функцию `CAST`.
```
SELECT toTypeName(CAST('a', 'Enum8(\'a\' = 1, \'b\' = 2)'))
```sql
SELECT toTypeName(CAST('a', 'Enum(\'a\' = 1, \'b\' = 2)'))
┌─toTypeName(CAST('a', 'Enum8(\'a\' = 1, \'b\' = 2)'))─┐
│ Enum8('a' = 1, 'b' = 2)
└─────────────────────────────────────────────────────
┌─toTypeName(CAST('a', 'Enum(\'a\' = 1, \'b\' = 2)'))─┐
│ Enum8('a' = 1, 'b' = 2) │
└─────────────────────────────────────────────────────┘
```
## Общие правила и особенности использования

View File

@ -29,7 +29,7 @@ ALTER TABLE [db].name [ON CLUSTER cluster] ADD|DROP|CLEAR|COMMENT|MODIFY COLUMN
ADD COLUMN [IF NOT EXISTS] name [type] [default_expr] [AFTER name_after]
```
Добавляет в таблицу новый столбец с именем `name`, типом `type` и выражением для умолчания `default_expr` (смотрите раздел [Значения по умолчанию](create.md#create-default-values)).
Добавляет в таблицу новый столбец с именем `name`, типом `type` и выражением для умолчания `default_expr` (смотрите раздел [Значения по умолчанию](create.md#create-default-values)).
Если указано `IF NOT EXISTS`, запрос не будет возвращать ошибку, если столбец уже существует. Если указано `AFTER name_after` (имя другого столбца), то столбец добавляется (в список столбцов таблицы) после указанного. Иначе, столбец добавляется в конец таблицы. Обратите внимание, ClickHouse не позволяет добавлять столбцы в начало таблицы. Для цепочки действий, `name_after` может быть именем столбца, который добавляется в одном из предыдущих действий.
@ -84,7 +84,7 @@ COMMENT COLUMN [IF EXISTS] name 'Text comment'
Каждый столбец может содержать только один комментарий. При выполнении запроса существующий комментарий заменяется на новый.
Посмотреть комментарии можно в столбце `comment_expression` из запроса [DESCRIBE TABLE](misc.md#misc-describe-table).
Посмотреть комментарии можно в столбце `comment_expression` из запроса [DESCRIBE TABLE](misc.md#misc-describe-table).
Пример:
@ -177,9 +177,9 @@ ALTER TABLE [db].name DROP CONSTRAINT constraint_name;
Запросы выполняют добавление или удаление метаданных об ограничениях таблицы `[db].name`, поэтому выполняются мнгновенно.
Если ограничение появилось для непустой таблицы, то *проверка ограничения вызвана не будет*. Если же важно добавить ограничение на существующую таблицу, то рекомендуется создать новую таблицу с нужным ограничением и выполнить `INSERT SELECT` запрос для перекачки данных из одной таблицы в другую.
Если ограничение появилось для непустой таблицы, то *проверка ограничения для имеющихся данных не производится*.
Запрос на изменение ограничений так же, как и с индексами, реплицируется через ZooKeeper.
Запрос на изменение ограничений для Replicated таблиц реплицируется, сохраняя новые метаданные в ZooKeeper и применяя изменения на всех репликах.
### Манипуляции с партициями и кусками {#alter_manipulations-with-partitions}
@ -260,7 +260,7 @@ ALTER TABLE visits ATTACH PART 201901_2_2_0;
ALTER TABLE table2 REPLACE PARTITION partition_expr FROM table1
```
Копирует партицию из таблицы `table1` в таблицу `table2`. Данные из `table1` не удаляются.
Копирует партицию из таблицы `table1` в таблицу `table2`. Данные из `table1` не удаляются.
Следует иметь в виду:
@ -297,19 +297,19 @@ ALTER TABLE table_name CLEAR INDEX index_name IN PARTITION partition_expr
ALTER TABLE table_name FREEZE [PARTITION partition_expr]
```
Создаёт резервную копию для заданной партиции. Если выражение `PARTITION` опущено, резервные копии будут созданы для всех партиций.
Создаёт резервную копию для заданной партиции. Если выражение `PARTITION` опущено, резервные копии будут созданы для всех партиций.
!!! note
Создание резервной копии не требует остановки сервера.
Создание резервной копии не требует остановки сервера.
Для таблиц старого стиля имя партиций можно задавать в виде префикса (например, '2019'). В этом случае резервные копии будут созданы для всех соответствующих партиций. Подробнее о том, как корректно задать имя партиции, см. в разделе [Как задавать имя партиции в запросах ALTER](#alter-how-to-specify-part-expr).
Запрос делает следующее — для текущего состояния таблицы он формирует жесткие ссылки на данные в этой таблице. Ссылки размещаются в директории `/var/lib/clickhouse/shadow/N/...`, где:
- `/var/lib/clickhouse/` — рабочая директория ClickHouse, заданная в конфигурационном файле;
- `N` — инкрементальный номер резервной копии.
Структура директорий внутри резервной копии такая же, как внутри `/var/lib/clickhouse/`. Запрос выполнит 'chmod' для всех файлов, запрещая запись в них.
Структура директорий внутри резервной копии такая же, как внутри `/var/lib/clickhouse/`. Запрос выполнит 'chmod' для всех файлов, запрещая запись в них.
Обратите внимание, запрос `ALTER TABLE t FREEZE PARTITION` не реплицируется. Он создает резервную копию только на локальном сервере. После создания резервной копии данные из `/var/lib/clickhouse/shadow/` можно скопировать на удалённый сервер, а локальную копию удалить.
@ -357,7 +357,7 @@ ALTER TABLE users ATTACH PARTITION 201902;
#### Как задавать имя партиции в запросах ALTER {#alter-how-to-specify-part-expr}
Чтобы задать нужную партицию в запросах `ALTER ... PARTITION`, можно использовать:
Чтобы задать нужную партицию в запросах `ALTER ... PARTITION`, можно использовать:
- Имя партиции. Посмотреть имя партиции можно в столбце `partition` системной таблицы [system.parts](../operations/system_tables.md#system_tables-parts). Например, `ALTER TABLE visits DETACH PARTITION 201901`.
- Произвольное выражение из столбцов исходной таблицы. Также поддерживаются константы и константные выражения. Например, `ALTER TABLE visits DETACH PARTITION toYYYYMM(toDate('2019-01-25'))`.

View File

@ -119,9 +119,9 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
) ENGINE = engine
```
`boolean_expr_1` может быть любым булевым выражением, состоящим из операторов сравнения или функций. При наличии одного или нескольких ограничений в момент вставки данных выражения ограничений будут проверяться на истинность для каждой вставляемой строки данных. В случае, если в теле INSERT запроса придут некорректные данные — клиентов будет выкинуто исключение с нарушенным ограничением.
`boolean_expr_1` может быть любым булевым выражением, состоящим из операторов сравнения или функций. При наличии одного или нескольких ограничений в момент вставки данных выражения ограничений будут проверяться на истинность для каждой вставляемой строки данных. В случае, если в теле INSERT запроса придут некорректные данные — клиент получит исключение с описанием нарушенного ограничения.
Добавление большого числа ограничений может негативно повлиять на производительность объёмных `INSERT` запросов.
Добавление большого числа ограничений может негативно повлиять на производительность `INSERT` запросов.
### Выражение для TTL

View File

@ -10,13 +10,94 @@
Возвращает протокол. Примеры: http, ftp, mailto, magnet...
### domain
Возвращает домен. Отсекает схему размером не более 16 байт.
Извлекает имя хоста из URL.
```
domain(url)
```
**Параметры**
- `url` — URL. Тип — [String](../../data_types/string.md).
URL может быть указан со схемой или без неё. Примеры:
```
svn+ssh://some.svn-hosting.com:80/repo/trunk
some.svn-hosting.com:80/repo/trunk
https://yandex.com/time/
```
Для указанных примеров функция `domain` возвращает следующие результаты:
```
some.svn-hosting.com
some.svn-hosting.com
yandex.com
```
**Возвращаемые значения**
- Имя хоста. Если ClickHouse может распарсить входную строку как URL.
- Пустая строка. Если ClickHouse не может распарсить входную строку как URL.
Тип — `String`.
**Пример**
```sql
SELECT domain('svn+ssh://some.svn-hosting.com:80/repo/trunk')
```
```text
┌─domain('svn+ssh://some.svn-hosting.com:80/repo/trunk')─┐
│ some.svn-hosting.com │
└────────────────────────────────────────────────────────┘
```
### domainWithoutWWW
Возвращает домен, удалив не более одного 'www.' с начала, если есть.
Возвращает домен, удалив префикс 'www.', если он присутствовал.
### topLevelDomain
Возвращает домен верхнего уровня. Пример: .ru.
Извлекает домен верхнего уровня из URL.
```
topLevelDomain(url)
```
**Параметры**
- `url` — URL. Тип — [String](../../data_types/string.md).
URL может быть указан со схемой или без неё. Примеры:
```
svn+ssh://some.svn-hosting.com:80/repo/trunk
some.svn-hosting.com:80/repo/trunk
https://yandex.com/time/
```
**Возвращаемые значения**
- Имя домена. Если ClickHouse может распарсить входную строку как URL.
- Пустая строка. Если ClickHouse не может распарсить входную строку как URL.
Тип — `String`.
**Пример**
```sql
SELECT topLevelDomain('svn+ssh://www.some.svn-hosting.com:80/repo/trunk')
```
```text
┌─topLevelDomain('svn+ssh://www.some.svn-hosting.com:80/repo/trunk')─┐
│ com │
└────────────────────────────────────────────────────────────────────┘
```
### firstSignificantSubdomain
Возвращает "первый существенный поддомен". Это понятие является нестандартным и специфично для Яндекс.Метрики. Первый существенный поддомен - это домен второго уровня, если он не равен одному из com, net, org, co, или домен третьего уровня, иначе. Например, firstSignificantSubdomain('<https://news.yandex.ru/>') = 'yandex', firstSignificantSubdomain('<https://news.yandex.com.tr/>') = 'yandex'. Список "несущественных" доменов второго уровня и другие детали реализации могут изменяться в будущем.