Merge pull request #31887 from ClickHouse/fix_cannot_create_empty_part

Parse partition key value from `partition_id` when need to create part in empty partition
This commit is contained in:
tavplubix 2021-12-01 15:38:46 +03:00 committed by GitHub
commit b623a387af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 204 additions and 64 deletions

View File

@ -12,7 +12,6 @@ class ASTPartition : public IAST
{
public:
ASTPtr value;
String fields_str; /// The extent of comma-separated partition expression fields without parentheses.
size_t fields_count = 0;
String id;

View File

@ -35,7 +35,6 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
size_t fields_count;
String fields_str;
const auto * tuple_ast = value->as<ASTFunction>();
bool surrounded_by_parens = false;
@ -58,7 +57,6 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
else
{
fields_count = 1;
fields_str = String(begin->begin, pos->begin - begin->begin);
}
}
else
@ -78,13 +76,10 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
--right_paren;
if (right_paren->type != TokenType::ClosingRoundBracket)
return false;
fields_str = String(left_paren->end, right_paren->begin - left_paren->end);
}
partition->value = value;
partition->children.push_back(value);
partition->fields_str = std::move(fields_str);
partition->fields_count = fields_count;
}

View File

@ -37,6 +37,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -342,6 +343,10 @@ ConstantExpressionTemplate::TemplateStructure::TemplateStructure(LiteralsInfo &
auto syntax_result = TreeRewriter(context).analyze(expression, literals.getNamesAndTypesList());
result_column_name = expression->getColumnName();
actions_on_literals = ExpressionAnalyzer(expression, syntax_result, context).getActions(false);
if (actions_on_literals->hasArrayJoin())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Array joins are not allowed in constant expressions for IN, VALUES, LIMIT and similar sections.");
}
size_t ConstantExpressionTemplate::TemplateStructure::getTemplateHash(const ASTPtr & expression,

View File

@ -26,6 +26,8 @@
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
@ -3646,56 +3648,54 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
/// Re-parse partition key fields using the information about expected field types.
auto metadata_snapshot = getInMemoryMetadataPtr();
size_t fields_count = metadata_snapshot->getPartitionKey().sample_block.columns();
const Block & key_sample_block = metadata_snapshot->getPartitionKey().sample_block;
size_t fields_count = key_sample_block.columns();
if (partition_ast.fields_count != fields_count)
throw Exception(
"Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
", must be: " + toString(fields_count),
ErrorCodes::INVALID_PARTITION_VALUE);
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE,
"Wrong number of fields in the partition expression: {}, must be: {}",
partition_ast.fields_count, fields_count);
if (auto * f = partition_ast.value->as<ASTFunction>())
{
assert(f->name == "tuple");
if (f->arguments && !f->arguments->as<ASTExpressionList>()->children.empty())
{
ASTPtr query = partition_ast.value->clone();
auto syntax_analyzer_result
= TreeRewriter(local_context)
.analyze(query, metadata_snapshot->getPartitionKey().sample_block.getNamesAndTypesList(), {}, {}, false, false);
auto actions = ExpressionAnalyzer(query, syntax_analyzer_result, local_context).getActions(true);
if (actions->hasArrayJoin())
throw Exception("The partition expression cannot contain array joins", ErrorCodes::INVALID_PARTITION_VALUE);
}
}
const FormatSettings format_settings;
Row partition_row(fields_count);
if (fields_count)
if (fields_count == 0)
{
ConcatReadBuffer buf;
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>("(", 1));
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(partition_ast.fields_str.data(), partition_ast.fields_str.size()));
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(")", 1));
/// Function tuple(...) requires at least one argument, so empty key is a special case
assert(!partition_ast.fields_count);
assert(typeid_cast<ASTFunction *>(partition_ast.value.get()));
assert(partition_ast.value->as<ASTFunction>()->name == "tuple");
assert(partition_ast.value->as<ASTFunction>()->arguments);
bool empty_tuple = partition_ast.value->as<ASTFunction>()->arguments->children.empty();
if (!empty_tuple)
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Partition key is empty, expected 'tuple()' as partition key");
}
else if (fields_count == 1)
{
ASTPtr partition_value_ast = partition_ast.value;
if (auto * tuple = partition_value_ast->as<ASTFunction>())
{
assert(tuple->name == "tuple");
assert(tuple->arguments);
assert(tuple->arguments->children.size() == 1);
partition_value_ast = tuple->arguments->children[0];
}
/// Simple partition key, need to evaluate and cast
Field partition_key_value = evaluateConstantExpression(partition_value_ast, local_context).first;
partition_row[0] = convertFieldToTypeOrThrow(partition_key_value, *key_sample_block.getByPosition(0).type);
}
else
{
/// Complex key, need to evaluate, untuple and cast
Field partition_key_value = evaluateConstantExpression(partition_ast.value, local_context).first;
if (partition_key_value.getType() != Field::Types::Tuple)
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE,
"Expected tuple for complex partition key, got {}", partition_key_value.getTypeName());
auto input_format = local_context->getInputFormat(
"Values",
buf,
metadata_snapshot->getPartitionKey().sample_block,
local_context->getSettingsRef().max_block_size);
QueryPipeline pipeline(std::move(input_format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
if (!block || !block.rows())
throw Exception(
"Could not parse partition value: `" + partition_ast.fields_str + "`",
ErrorCodes::INVALID_PARTITION_VALUE);
const Tuple & tuple = partition_key_value.get<Tuple>();
if (tuple.size() != fields_count)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Wrong number of fields in the partition expression: {}, must be: {}", tuple.size(), fields_count);
for (size_t i = 0; i < fields_count; ++i)
block.getByPosition(i).column->get(0, partition_row[i]);
partition_row[i] = convertFieldToTypeOrThrow(tuple[i], *key_sample_block.getByPosition(i).type);
}
MergeTreePartition partition(std::move(partition_row));
@ -3707,11 +3707,10 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{
WriteBufferFromOwnString buf;
writeCString("Parsed partition value: ", buf);
partition.serializeText(*this, buf, format_settings);
writeCString(" doesn't match partition value for an existing part with the same partition ID: ", buf);
writeString(existing_part_in_partition->name, buf);
throw Exception(buf.str(), ErrorCodes::INVALID_PARTITION_VALUE);
partition.serializeText(*this, buf, FormatSettings{});
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parsed partition value: {} "
"doesn't match partition value for an existing part with the same partition ID: {}",
buf.str(), existing_part_in_partition->name);
}
}

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_PARTITION_VALUE;
}
namespace
@ -182,6 +183,8 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
/// In case all partition fields are represented by integral types, try to produce a human-readable ID.
/// Otherwise use a hex-encoded hash.
/// NOTE It will work in unexpected way if some partition key column is Nullable:
/// are_all_integral will be false if some value is NULL. Maybe we should fix it.
bool are_all_integral = true;
for (const Field & field : value)
{
@ -228,6 +231,94 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
return result;
}
std::optional<Row> MergeTreePartition::tryParseValueFromID(const String & partition_id, const Block & partition_key_sample)
{
size_t num_keys = partition_key_sample.columns();
Row res;
res.reserve(num_keys);
ReadBufferFromString buf(partition_id);
if (num_keys == 0)
{
checkString("all", buf);
assertEOF(buf);
return res;
}
enum KeyType { DATE, UNSIGNED, SIGNED };
std::vector<KeyType> key_types;
key_types.reserve(num_keys);
for (size_t i = 0; i < num_keys; ++i)
{
auto type = partition_key_sample.getByPosition(i).type;
/// NOTE Sometimes it's possible to parse Nullable key, but easier to ignore it.
if (type->isNullable())
return {};
/// We use Field::Types when serializing partition_id, let's get some Field to check type
Field sample_field = type->getDefault();
if (typeid_cast<const DataTypeDate *>(type.get()))
key_types.emplace_back(DATE);
else if (sample_field.getType() == Field::Types::UInt64)
key_types.emplace_back(UNSIGNED);
else if (sample_field.getType() == Field::Types::Int64)
key_types.emplace_back(SIGNED);
else
return {};
}
/// All columns are numeric, will parse partition value
for (size_t i = 0; i < num_keys; ++i)
{
switch (key_types[i])
{
case DATE:
{
UInt32 date_yyyymmdd;
readText(date_yyyymmdd, buf);
constexpr UInt32 min_yyyymmdd = 10000000;
constexpr UInt32 max_yyyymmdd = 99999999;
if (date_yyyymmdd < min_yyyymmdd || max_yyyymmdd < date_yyyymmdd)
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Cannot parse partition_id: got unexpected Date: {}", date_yyyymmdd);
UInt32 date = DateLUT::instance().YYYYMMDDToDayNum(date_yyyymmdd);
res.emplace_back(date);
break;
}
case UNSIGNED:
{
UInt64 value;
readText(value, buf);
res.emplace_back(value);
break;
}
case SIGNED:
{
Int64 value;
readText(value, buf);
res.emplace_back(value);
break;
}
}
if (i + 1 != num_keys)
assertChar('-', buf);
}
assertEOF(buf);
String expected_partition_id = MergeTreePartition{res}.getID(partition_key_sample);
if (expected_partition_id != partition_id)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition ID was parsed incorrectly: expected {}, got {}",
expected_partition_id, partition_id);
return res;
}
void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();

View File

@ -33,6 +33,8 @@ public:
String getID(const MergeTreeData & storage) const;
String getID(const Block & partition_key_sample) const;
static std::optional<Row> tryParseValueFromID(const String & partition_id, const Block & partition_key_sample);
void serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const;
void load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path);

View File

@ -7381,13 +7381,23 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
{
auto lock = lockParts();
auto parts_in_partition = getDataPartsPartitionRange(new_part_info.partition_id);
if (parts_in_partition.empty())
if (!parts_in_partition.empty())
{
LOG_WARNING(log, "Empty part {} is not created instead of lost part because there are no parts in partition {} (it's empty), resolve this manually using DROP PARTITION.", lost_part_name, new_part_info.partition_id);
new_data_part->partition = (*parts_in_partition.begin())->partition;
}
else if (auto parsed_partition = MergeTreePartition::tryParseValueFromID(
new_part_info.partition_id,
metadata_snapshot->getPartitionKey().sample_block))
{
new_data_part->partition = MergeTreePartition(*parsed_partition);
}
else
{
LOG_WARNING(log, "Empty part {} is not created instead of lost part because there are no parts in partition {} (it's empty), "
"resolve this manually using DROP/DETACH PARTITION.", lost_part_name, new_part_info.partition_id);
return false;
}
new_data_part->partition = (*parts_in_partition.begin())->partition;
}
new_data_part->minmax_idx = std::move(minmax_idx);

View File

@ -178,18 +178,20 @@ def test_lost_part_mutation(start_cluster):
def test_lost_last_part(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt3 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') ORDER BY tuple()".format(node.name))
"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') "
"ORDER BY tuple() PARTITION BY p".format(node.name))
node1.query("SYSTEM STOP MERGES mt3")
node2.query("SYSTEM STOP REPLICATION QUEUES")
for i in range(1):
node1.query("INSERT INTO mt3 VALUES ({})".format(i))
node1.query("INSERT INTO mt3 VALUES ({}, 'x')".format(i))
# actually not important
node1.query("ALTER TABLE mt3 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"})
remove_part_from_disk(node1, 'mt3', 'all_0_0_0')
partition_id = node1.query("select partitionId('x')").strip()
remove_part_from_disk(node1, 'mt3', '{}_0_0_0'.format(partition_id))
# other way to detect broken parts
node1.query("CHECK TABLE mt3")
@ -199,13 +201,13 @@ def test_lost_last_part(start_cluster):
for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
assert int(result) <= 1, "Have a lot of entries in queue {}".format(node1.query("SELECT * FROM system.replication_queue FORMAT Vertical"))
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log("DROP PARTITION"):
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log("DROP/DETACH PARTITION"):
break
time.sleep(1)
else:
assert False, "Don't have required messages in node1 log"
node1.query("ALTER TABLE mt3 DROP PARTITION ID 'all'")
node1.query("ALTER TABLE mt3 DROP PARTITION ID '{}'".format(partition_id))
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt3", "0")
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")

View File

@ -0,0 +1,37 @@
-- Tags: zookeeper
create table rmt1 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by toYYYYMMDD(d);
create table rmt2 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by toYYYYMMDD(d);
system stop replicated sends rmt1;
insert into rmt1 values (now(), arrayJoin([1, 2])); -- { clientError 36 }
insert into rmt1(n) select * from system.numbers limit arrayJoin([1, 2]); -- { serverError 36 }
insert into rmt1 values (now(), rand());
drop table rmt1;
system sync replica rmt2;
drop table rmt2;
create table rmt1 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by tuple();
create table rmt2 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by tuple();
system stop replicated sends rmt1;
insert into rmt1 values (now(), rand());
drop table rmt1;
system sync replica rmt2;
drop table rmt2;
create table rmt1 (n UInt8, m Int32, d Date, t DateTime) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by (n, m, d, t);
create table rmt2 (n UInt8, m Int32, d Date, t DateTime) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by (n, m, d, t);
system stop replicated sends rmt1;
insert into rmt1 values (rand(), rand(), now(), now());
insert into rmt1 values (rand(), rand(), now(), now());
insert into rmt1 values (rand(), rand(), now(), now());
drop table rmt1;
system sync replica rmt2;
drop table rmt2;

View File

@ -1,4 +1,4 @@
CREATE TABLE table_2009_part (`i` Int64, `d` Date, `s` String) ENGINE = MergeTree PARTITION BY toYYYYMM(d) ORDER BY i;
ALTER TABLE table_2009_part ATTACH PARTITION tuple(arrayJoin([0, 1])); -- {serverError 248}
ALTER TABLE table_2009_part ATTACH PARTITION tuple(toYYYYMM(toDate([arrayJoin([arrayJoin([arrayJoin([arrayJoin([3, materialize(NULL), arrayJoin([1025, materialize(NULL), materialize(NULL)]), NULL])])]), materialize(NULL)])], NULL))); -- {serverError 248}
ALTER TABLE table_2009_part ATTACH PARTITION tuple(arrayJoin([0, 1])); -- {serverError 36}
ALTER TABLE table_2009_part ATTACH PARTITION tuple(toYYYYMM(toDate([arrayJoin([arrayJoin([arrayJoin([arrayJoin([3, materialize(NULL), arrayJoin([1025, materialize(NULL), materialize(NULL)]), NULL])])]), materialize(NULL)])], NULL))); -- {serverError 36}