Merge pull request #1941 from yandex/unsorted-mergetree

Fix MergeTree idiosyncrasies
This commit is contained in:
alexey-milovidov 2018-02-22 00:36:59 +03:00 committed by GitHub
commit d4dcb9412c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 207 additions and 47 deletions

View File

@ -1111,6 +1111,8 @@ public:
return std::make_shared<DataTypeUUID>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
auto col_res = ColumnVector<UInt128>::create();

View File

@ -1208,6 +1208,8 @@ public:
return std::make_shared<DataTypeDateTime>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt32().createColumnConst(
@ -1235,6 +1237,8 @@ public:
return std::make_shared<DataTypeDate>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt16().createColumnConst(
@ -1262,6 +1266,8 @@ public:
return std::make_shared<DataTypeDate>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt16().createColumnConst(

View File

@ -218,6 +218,8 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// The dictionary key that defines the "point of view".
@ -312,6 +314,8 @@ public:
return std::make_shared<DataTypeUInt8>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// The dictionary key that defines the "point of view".
@ -446,6 +450,8 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// The dictionary key that defines the "point of view".
@ -720,6 +726,8 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
RegionsNames::Language language = RegionsNames::Language::RU;

View File

@ -94,6 +94,8 @@ private:
return std::make_shared<DataTypeUInt8>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -274,6 +276,8 @@ private:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -535,6 +539,8 @@ private:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -821,6 +827,8 @@ private:
return std::make_shared<DataType>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -1134,6 +1142,8 @@ private:
return std::make_shared<DataType>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -1379,6 +1389,8 @@ private:
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());
@ -1549,6 +1561,8 @@ private:
return std::make_shared<DataTypeUInt8>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, const size_t result) override
{
const auto dict_name_col = checkAndGetColumnConst<ColumnString>(block.getByPosition(arguments[0]).column.get());

View File

@ -23,6 +23,8 @@ public:
bool isVariadic() const override { return true; }
bool isDeterministic() override { return false; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;

View File

@ -104,6 +104,8 @@ public:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, const size_t result) override
{
block.getByPosition(result).column = DataTypeString().createColumnConst(block.rows(), db_name);
@ -126,6 +128,8 @@ public:
return name;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -391,6 +395,8 @@ public:
return name;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -434,6 +440,8 @@ public:
return 0;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -482,6 +490,8 @@ public:
return 0;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -524,6 +534,8 @@ public:
return 0;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -889,6 +901,8 @@ public:
}
/** It could return many different values for single argument. */
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -1288,6 +1302,8 @@ public:
return std::make_shared<DataTypeUInt32>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeUInt32().createColumnConst(block.rows(), static_cast<UInt64>(uptime));
@ -1323,6 +1339,8 @@ public:
return std::make_shared<DataTypeString>();
}
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & /*arguments*/, size_t result) override
{
block.getByPosition(result).column = DataTypeString().createColumnConst(block.rows(), DateLUT::instance().getTimeZone());
@ -1355,6 +1373,8 @@ public:
return 1;
}
bool isDeterministic() override { return false; }
bool isDeterministicInScopeOfQuery() override
{
return false;
@ -1632,6 +1652,8 @@ public:
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override;
bool isDeterministic() override { return false; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override;
private:

View File

@ -126,6 +126,9 @@ public:
* (even for distributed query), but not deterministic it general.
* Example: now(). Another example: functions that work with periodically updated dictionaries.
*/
virtual bool isDeterministic() { return true; }
virtual bool isDeterministicInScopeOfQuery() { return true; }
/** Lets you know if the function is monotonic in a range of values.
@ -320,6 +323,8 @@ public:
bool isInjective(const Block & sample_block) override { return function->isInjective(sample_block); }
bool isDeterministic() override { return function->isDeterministic(); }
bool isDeterministicInScopeOfQuery() override { return function->isDeterministicInScopeOfQuery(); }
bool hasInformationAboutMonotonicity() const override { return function->hasInformationAboutMonotonicity(); }

View File

@ -113,12 +113,12 @@ MergeTreeData::MergeTreeData(
{
merging_params.check(columns);
if (!primary_expr_ast) /// TODO Allow tables without primary key.
if (!primary_expr_ast)
throw Exception("Primary key cannot be empty", ErrorCodes::BAD_ARGUMENTS);
initPrimaryKey();
if (sampling_expression && (!primary_expr_ast || !primary_key_sample.has(sampling_expression->getColumnName()))
if (sampling_expression && (!primary_key_sample.has(sampling_expression->getColumnName()))
&& !attach && !settings.compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
@ -178,22 +178,37 @@ MergeTreeData::MergeTreeData(
}
static void checkForAllowedKeyColumns(const ColumnWithTypeAndName & element, const std::string & key_name)
static void checkKeyExpression(const ExpressionActions & expr, const Block & sample_block, const String & key_name)
{
const ColumnPtr & column = element.column;
if (column && (column->isColumnConst() || column->isDummy()))
throw Exception{key_name + " key cannot contain constants", ErrorCodes::ILLEGAL_COLUMN};
for (const ExpressionAction & action : expr.getActions())
{
if (action.type == ExpressionAction::ARRAY_JOIN)
throw Exception(key_name + " key cannot contain array joins");
if (element.type->isNullable())
throw Exception{key_name + " key cannot contain nullable columns", ErrorCodes::ILLEGAL_COLUMN};
if (action.type == ExpressionAction::APPLY_FUNCTION)
{
IFunctionBase & func = *action.function;
if (!func.isDeterministic())
throw Exception(key_name + " key cannot contain non-deterministic functions, "
"but contains function " + func.getName(),
ErrorCodes::BAD_ARGUMENTS);
}
}
for (const ColumnWithTypeAndName & element : sample_block)
{
const ColumnPtr & column = element.column;
if (column && (column->isColumnConst() || column->isDummy()))
throw Exception{key_name + " key cannot contain constants", ErrorCodes::ILLEGAL_COLUMN};
if (element.type->isNullable())
throw Exception{key_name + " key cannot contain nullable columns", ErrorCodes::ILLEGAL_COLUMN};
}
}
void MergeTreeData::initPrimaryKey()
{
if (!primary_expr_ast)
return;
auto addSortDescription = [](SortDescription & descr, const ASTPtr & expr_ast)
{
descr.reserve(descr.size() + expr_ast->children.size());
@ -216,14 +231,9 @@ void MergeTreeData::initPrimaryKey()
primary_key_sample = projected_expr->getSampleBlock();
}
checkKeyExpression(*primary_expr, primary_key_sample, "Primary");
size_t primary_key_size = primary_key_sample.columns();
/// A primary key cannot contain constants. It is meaningless.
/// (And also couldn't work because primary key is serialized with method of IDataType that doesn't support constants).
/// Also a primary key must not contain any nullable column.
for (size_t i = 0; i < primary_key_size; ++i)
checkForAllowedKeyColumns(primary_key_sample.getByPosition(i), "Primary");
primary_key_data_types.resize(primary_key_size);
for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.getByPosition(i).type;
@ -238,8 +248,7 @@ void MergeTreeData::initPrimaryKey()
ExpressionAnalyzer(secondary_sort_expr_ast, context, nullptr, getColumnsList()).getActions(true);
auto secondary_key_sample = projected_expr->getSampleBlock();
for (size_t i = 0; i < secondary_key_sample.columns(); ++i)
checkForAllowedKeyColumns(secondary_key_sample.getByPosition(i), "Secondary");
checkKeyExpression(*secondary_sort_expr, secondary_key_sample, "Secondary");
}
}
@ -253,14 +262,11 @@ void MergeTreeData::initPartitionKey()
for (const ASTPtr & ast : partition_expr_ast->children)
{
String col_name = ast->getColumnName();
partition_expr_columns.emplace_back(col_name);
const ColumnWithTypeAndName & element = partition_expr->getSampleBlock().getByName(col_name);
checkForAllowedKeyColumns(element, "Partition");
partition_expr_column_types.emplace_back(element.type);
partition_key_sample.insert(partition_expr->getSampleBlock().getByName(col_name));
}
checkKeyExpression(*partition_expr, partition_key_sample, "Partition");
/// Add all columns used in the partition key to the min-max index.
const NamesAndTypesList & minmax_idx_columns_with_types = partition_expr->getRequiredColumnsWithTypes();
minmax_idx_expr = std::make_shared<ExpressionActions>(minmax_idx_columns_with_types, context.getSettingsRef());
@ -2031,7 +2037,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
/// Re-parse partition key fields using the information about expected field types.
size_t fields_count = partition_expr_column_types.size();
size_t fields_count = partition_key_sample.columns();
if (partition_ast.fields_count != fields_count)
throw Exception(
"Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
@ -2047,12 +2053,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
ReadBufferFromMemory right_paren_buf(")", 1);
ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
Block header;
for (size_t i = 0; i < fields_count; ++i)
header.insert(ColumnWithTypeAndName(partition_expr_column_types[i], partition_expr_columns[i]));
ValuesRowInputStream input_stream(buf, header, context, /* interpret_expressions = */true);
MutableColumns columns = header.cloneEmptyColumns();
ValuesRowInputStream input_stream(buf, partition_key_sample, context, /* interpret_expressions = */true);
MutableColumns columns = partition_key_sample.cloneEmptyColumns();
if (!input_stream.read(columns))
throw Exception(

View File

@ -524,8 +524,7 @@ public:
ASTPtr partition_expr_ast;
ExpressionActionsPtr partition_expr;
Names partition_expr_columns;
DataTypes partition_expr_column_types;
Block partition_key_sample;
ExpressionActionsPtr minmax_idx_expr;
Names minmax_idx_columns;

View File

@ -81,9 +81,9 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
data.partition_expr->execute(block_copy);
ColumnRawPtrs partition_columns;
partition_columns.reserve(data.partition_expr_columns.size());
for (const String & name : data.partition_expr_columns)
partition_columns.emplace_back(block_copy.getByName(name).column.get());
partition_columns.reserve(data.partition_key_sample.columns());
for (const ColumnWithTypeAndName & element : data.partition_key_sample)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
@ -103,7 +103,9 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
if (partitions_count == 1)
{
/// A typical case is when there is one partition (you do not need to split anything).
result.emplace_back(std::move(block_copy), get_partition(0));
/// NOTE: returning a copy of the original block so that calculated partition key columns
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
return result;
}

View File

@ -23,7 +23,7 @@ static ReadBufferFromFile openForReading(const String & path)
/// So if you want to change this method, be sure to guarantee compatibility with existing table data.
String MergeTreePartition::getID(const MergeTreeData & storage) const
{
if (value.size() != storage.partition_expr_columns.size())
if (value.size() != storage.partition_key_sample.columns())
throw Exception("Invalid partition key size: " + toString(value.size()), ErrorCodes::LOGICAL_ERROR);
if (value.empty())
@ -51,7 +51,7 @@ String MergeTreePartition::getID(const MergeTreeData & storage) const
if (i > 0)
result += '-';
if (typeid_cast<const DataTypeDate *>(storage.partition_expr_column_types[i].get()))
if (typeid_cast<const DataTypeDate *>(storage.partition_key_sample.getByPosition(i).type.get()))
result += toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(value[i].safeGet<UInt64>())));
else
result += applyVisitor(to_string_visitor, value[i]);
@ -79,7 +79,7 @@ String MergeTreePartition::getID(const MergeTreeData & storage) const
void MergeTreePartition::serializeTextQuoted(const MergeTreeData & storage, WriteBuffer & out) const
{
size_t key_size = storage.partition_expr_column_types.size();
size_t key_size = storage.partition_key_sample.columns();
if (key_size == 0)
{
@ -95,7 +95,7 @@ void MergeTreePartition::serializeTextQuoted(const MergeTreeData & storage, Writ
if (i > 0)
writeCString(", ", out);
const DataTypePtr & type = storage.partition_expr_column_types[i];
const DataTypePtr & type = storage.partition_key_sample.getByPosition(i).type;
auto column = type->createColumn();
column->insert(value[i]);
type->serializeTextQuoted(*column, 0, out);
@ -111,9 +111,9 @@ void MergeTreePartition::load(const MergeTreeData & storage, const String & part
return;
ReadBufferFromFile file = openForReading(part_path + "partition.dat");
value.resize(storage.partition_expr_column_types.size());
for (size_t i = 0; i < storage.partition_expr_column_types.size(); ++i)
storage.partition_expr_column_types[i]->deserializeBinary(value[i], file);
value.resize(storage.partition_key_sample.columns());
for (size_t i = 0; i < storage.partition_key_sample.columns(); ++i)
storage.partition_key_sample.getByPosition(i).type->deserializeBinary(value[i], file);
}
void MergeTreePartition::store(const MergeTreeData & storage, const String & part_path, MergeTreeDataPartChecksums & checksums) const
@ -124,7 +124,7 @@ void MergeTreePartition::store(const MergeTreeData & storage, const String & par
WriteBufferFromFile out(part_path + "partition.dat");
HashingWriteBuffer out_hashing(out);
for (size_t i = 0; i < value.size(); ++i)
storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing);
storage.partition_key_sample.getByPosition(i).type->serializeBinary(value[i], out_hashing);
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();

View File

@ -586,6 +586,10 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (args.storage_def->order_by)
primary_expr_list = extractKeyExpressionList(*args.storage_def->order_by);
else
throw Exception("You must provide an ORDER BY expression in the table definition. "
"If you don't want this table to be sorted, use ORDER BY tuple()",
ErrorCodes::BAD_ARGUMENTS);
if (args.storage_def->sample_by)
sampling_expression = args.storage_def->sample_by->ptr();

View File

@ -0,0 +1,14 @@
*** MergeTree ***
1 a
5 b
2 c
4 d
3 e
*** ReplacingMergeTree ***
1 a 5
---
1 a 5
*** CollapsingMergeTree ***
3 c 1
---
3 c 1

View File

@ -0,0 +1,56 @@
SELECT '*** MergeTree ***';
DROP TABLE IF EXISTS test.unsorted;
CREATE TABLE test.unsorted (x UInt32, y String) ENGINE MergeTree ORDER BY tuple();
INSERT INTO test.unsorted VALUES (1, 'a'), (5, 'b');
INSERT INTO test.unsorted VALUES (2, 'c'), (4, 'd');
INSERT INTO test.unsorted VALUES (3, 'e');
OPTIMIZE TABLE test.unsorted PARTITION tuple() FINAL;
SELECT * FROM test.unsorted;
DROP TABLE test.unsorted;
SELECT '*** ReplacingMergeTree ***';
DROP TABLE IF EXISTS test.unsorted_replacing;
CREATE TABLE test.unsorted_replacing (x UInt32, s String, v UInt32) ENGINE ReplacingMergeTree(v) ORDER BY tuple();
INSERT INTO test.unsorted_replacing VALUES (1, 'a', 5), (5, 'b', 4);
INSERT INTO test.unsorted_replacing VALUES (2, 'c', 3), (4, 'd', 2);
INSERT INTO test.unsorted_replacing VALUES (3, 'e', 1);
SELECT * FROM test.unsorted_replacing FINAL;
SELECT '---';
OPTIMIZE TABLE test.unsorted_replacing PARTITION tuple() FINAL;
SELECT * FROM test.unsorted_replacing;
DROP TABLE test.unsorted_replacing;
SELECT '*** CollapsingMergeTree ***';
DROP TABLE IF EXISTS test.unsorted_collapsing;
CREATE TABLE test.unsorted_collapsing (x UInt32, s String, sign Int8) ENGINE CollapsingMergeTree(sign) ORDER BY tuple();
INSERT INTO test.unsorted_collapsing VALUES (1, 'a', 1);
INSERT INTO test.unsorted_collapsing VALUES (1, 'a', -1), (2, 'b', 1);
INSERT INTO test.unsorted_collapsing VALUES (2, 'b', -1), (3, 'c', 1);
SELECT * FROM test.unsorted_collapsing FINAL;
SELECT '---';
OPTIMIZE TABLE test.unsorted_collapsing PARTITION tuple() FINAL;
SELECT * FROM test.unsorted_collapsing;
DROP TABLE test.unsorted_collapsing;

View File

@ -0,0 +1,6 @@
2018-02-19 12:00:00
2018-02-20 12:00:00
2018-02-21 12:00:00
---
2018-02-19 12:00:00
2018-02-21 12:00:00

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS test.partition_and_primary_keys_using_same_expression;
CREATE TABLE test.partition_and_primary_keys_using_same_expression(dt DateTime)
ENGINE MergeTree PARTITION BY toDate(dt) ORDER BY toDayOfWeek(toDate(dt));
INSERT INTO test.partition_and_primary_keys_using_same_expression
VALUES ('2018-02-19 12:00:00');
INSERT INTO test.partition_and_primary_keys_using_same_expression
VALUES ('2018-02-20 12:00:00'), ('2018-02-21 12:00:00');
SELECT * FROM test.partition_and_primary_keys_using_same_expression ORDER BY dt;
SELECT '---';
ALTER TABLE test.partition_and_primary_keys_using_same_expression DROP PARTITION '2018-02-20';
SELECT * FROM test.partition_and_primary_keys_using_same_expression ORDER BY dt;
DROP TABLE test.partition_and_primary_keys_using_same_expression;