ALTER UPDATE/DELETE ... IN PARTITION with partition pruning in ReplicatedMergeTree ()

Co-authored-by: Alexander Kazakov <Akazz@users.noreply.github.com>
This commit is contained in:
Vladimir Chebotarev 2020-11-10 13:23:46 +03:00 committed by GitHub
parent 607b84f290
commit 059357d51e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 493 additions and 89 deletions

View File

@ -5,6 +5,7 @@
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/TreeRewriter.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
@ -32,6 +33,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_MUTATION_COMMAND;
@ -92,6 +94,7 @@ std::optional<String> findFirstNonDeterministicFunctionName(const MutationComman
if (finder_data.nondeterministic_function_name)
return finder_data.nondeterministic_function_name;
/// Currently UPDATE and DELETE both always have predicates so we can use fallthrough
[[fallthrough]];
}
@ -110,7 +113,7 @@ std::optional<String> findFirstNonDeterministicFunctionName(const MutationComman
return {};
}
ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands)
ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands, const StoragePtr & storage, const Context & context)
{
/// Execute `SELECT count() FROM storage WHERE predicate1 OR predicate2 OR ...` query.
/// The result can differ from the number of affected rows (e.g. if there is an UPDATE command that
@ -125,20 +128,23 @@ ASTPtr prepareQueryAffectedAST(const std::vector<MutationCommand> & commands)
count_func->arguments = std::make_shared<ASTExpressionList>();
select->select()->children.push_back(count_func);
if (commands.size() == 1)
select->setExpression(ASTSelectQuery::Expression::WHERE, commands[0].predicate->clone());
else
{
auto coalesced_predicates = std::make_shared<ASTFunction>();
coalesced_predicates->name = "or";
coalesced_predicates->arguments = std::make_shared<ASTExpressionList>();
coalesced_predicates->children.push_back(coalesced_predicates->arguments);
ASTs conditions;
for (const MutationCommand & command : commands)
coalesced_predicates->arguments->children.push_back(command.predicate->clone());
{
if (ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command, storage, context))
conditions.push_back(std::move(condition));
}
if (conditions.size() > 1)
{
auto coalesced_predicates = makeASTFunction("or");
coalesced_predicates->arguments->children = std::move(conditions);
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(coalesced_predicates));
}
else if (conditions.size() == 1)
{
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(conditions.front()));
}
return select;
}
@ -167,8 +173,9 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_
}
bool isStorageTouchedByMutations(
StoragePtr storage,
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
Context context_copy)
@ -176,16 +183,33 @@ bool isStorageTouchedByMutations(
if (commands.empty())
return false;
bool all_commands_can_be_skipped = true;
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
for (const MutationCommand & command : commands)
{
if (!command.predicate) /// The command touches all rows.
return true;
if (command.partition && !storage_from_merge_tree_data_part)
throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED);
if (command.partition && storage_from_merge_tree_data_part)
{
const String partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context_copy);
if (partition_id == storage_from_merge_tree_data_part->getPartitionId())
all_commands_can_be_skipped = false;
}
else
all_commands_can_be_skipped = false;
}
if (all_commands_can_be_skipped)
return false;
context_copy.setSetting("max_streams_to_max_threads_ratio", 1);
context_copy.setSetting("max_threads", 1);
ASTPtr select_query = prepareQueryAffectedAST(commands);
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);
/// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
@ -202,9 +226,42 @@ bool isStorageTouchedByMutations(
auto count = (*block.getByName("count()").column)[0].get<UInt64>();
return count != 0;
}
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
const MutationCommand & command,
const StoragePtr & storage,
const Context & context
)
{
ASTPtr partition_predicate_as_ast_func;
if (command.partition)
{
String partition_id;
auto storage_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(storage);
auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast<StorageFromMergeTreeDataPart>(storage);
if (storage_merge_tree)
partition_id = storage_merge_tree->getPartitionIDFromQuery(command.partition, context);
else if (storage_from_merge_tree_data_part)
partition_id = storage_from_merge_tree_data_part->getPartitionIDFromQuery(command.partition, context);
else
throw Exception("ALTER UPDATE/DELETE ... IN PARTITION is not supported for non-MergeTree tables", ErrorCodes::NOT_IMPLEMENTED);
partition_predicate_as_ast_func = makeASTFunction("equals",
std::make_shared<ASTIdentifier>("_partition_id"),
std::make_shared<ASTLiteral>(partition_id)
);
}
if (command.predicate && command.partition)
return makeASTFunction("and", command.predicate->clone(), std::move(partition_predicate_as_ast_func));
else
return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func;
}
MutationsInterpreter::MutationsInterpreter(
StoragePtr storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -349,7 +406,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
auto negated_predicate = makeASTFunction("isZeroOrNull", command.predicate->clone());
auto negated_predicate = makeASTFunction("isZeroOrNull", getPartitionAndPredicateExpressionForMutationCommand(command));
stages.back().filters.push_back(negated_predicate);
}
else if (command.type == MutationCommand::UPDATE)
@ -387,7 +444,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
const auto & update_expr = kv.second;
auto updated_column = makeASTFunction("CAST",
makeASTFunction("if",
command.predicate->clone(),
getPartitionAndPredicateExpressionForMutationCommand(command),
makeASTFunction("CAST",
update_expr->clone(),
type_literal),
@ -592,7 +649,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
for (const String & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
auto syntax_result = TreeRewriter(context).analyze(all_asts, all_columns);
auto syntax_result = TreeRewriter(context).analyze(all_asts, all_columns, storage, metadata_snapshot);
if (context.hasQueryContext())
for (const auto & it : syntax_result->getScalars())
context.getQueryContext().addScalar(it.first, it.second);
@ -759,10 +816,10 @@ const Block & MutationsInterpreter::getUpdatedHeader() const
size_t MutationsInterpreter::evaluateCommandsSize()
{
for (const MutationCommand & command : commands)
if (unlikely(!command.predicate)) /// The command touches all rows.
if (unlikely(!command.predicate && !command.partition)) /// The command touches all rows.
return mutation_ast->size();
return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size());
return std::max(prepareQueryAffectedAST(commands, storage, context)->size(), mutation_ast->size());
}
std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const
@ -783,6 +840,11 @@ std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIf
return sort_description;
}
ASTPtr MutationsInterpreter::getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const
{
return DB::getPartitionAndPredicateExpressionForMutationCommand(command, storage, context);
}
bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const
{
/// is subset

View File

@ -20,7 +20,17 @@ using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations(
StoragePtr storage, const StorageMetadataPtr & metadata_snapshot, const std::vector<MutationCommand> & commands, Context context_copy);
const StoragePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
Context context_copy
);
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(
const MutationCommand & command,
const StoragePtr & storage,
const Context & context
);
/// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs, MATERIALIZEs)
/// to this data.
@ -59,6 +69,8 @@ private:
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;
ASTPtr getPartitionAndPredicateExpressionForMutationCommand(const MutationCommand & command) const;
StoragePtr storage;
StorageMetadataPtr metadata_snapshot;
MutationCommands commands;

View File

@ -90,7 +90,7 @@ void ASTAlterCommand::formatImpl(
column->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
@ -150,7 +150,7 @@ void ASTAlterCommand::formatImpl(
index->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
@ -161,7 +161,7 @@ void ASTAlterCommand::formatImpl(
index->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
@ -272,7 +272,15 @@ void ASTAlterCommand::formatImpl(
}
else if (type == ASTAlterCommand::DELETE)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE WHERE " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DELETE" << (settings.hilite ? hilite_none : "");
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WHERE " << (settings.hilite ? hilite_none : "");
predicate->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::UPDATE)
@ -280,6 +288,12 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "UPDATE " << (settings.hilite ? hilite_none : "");
update_assignments->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WHERE " << (settings.hilite ? hilite_none : "");
predicate->formatImpl(settings, state, frame);
}
@ -298,7 +312,7 @@ void ASTAlterCommand::formatImpl(
<< (settings.hilite ? hilite_none : "");
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str<< " IN PARTITION " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}

View File

@ -103,7 +103,7 @@ public:
*/
ASTPtr constraint;
/** Used in DROP PARTITION and ATTACH PARTITION FROM queries.
/** Used in DROP PARTITION, ATTACH PARTITION FROM, UPDATE, DELETE queries.
* The value or ID of the partition is stored here.
*/
ASTPtr partition;

View File

@ -55,6 +55,12 @@ const char * ParserComparisonExpression::operators[] =
nullptr
};
const char * ParserComparisonExpression::overlapping_operators_to_skip[] =
{
"IN PARTITION",
nullptr
};
const char * ParserLogicalNotExpression::operators[] =
{
"NOT", "not",
@ -137,6 +143,14 @@ bool ParserLeftAssociativeBinaryOperatorList::parseImpl(Pos & pos, ASTPtr & node
/// try to find any of the valid operators
const char ** it;
Expected stub;
for (it = overlapping_operators_to_skip; *it; ++it)
if (ParserKeyword{*it}.checkWithoutMoving(pos, stub))
break;
if (*it)
break;
for (it = operators; *it; it += 2)
if (parseOperator(pos, *it, expected))
break;

View File

@ -82,6 +82,7 @@ class ParserLeftAssociativeBinaryOperatorList : public IParserBase
{
private:
Operators_t operators;
Operators_t overlapping_operators_to_skip = { (const char *[]){ nullptr } };
ParserPtr first_elem_parser;
ParserPtr remaining_elem_parser;
@ -93,6 +94,11 @@ public:
{
}
ParserLeftAssociativeBinaryOperatorList(Operators_t operators_, Operators_t overlapping_operators_to_skip_, ParserPtr && first_elem_parser_)
: operators(operators_), overlapping_operators_to_skip(overlapping_operators_to_skip_), first_elem_parser(std::move(first_elem_parser_))
{
}
ParserLeftAssociativeBinaryOperatorList(Operators_t operators_, ParserPtr && first_elem_parser_,
ParserPtr && remaining_elem_parser_)
: operators(operators_), first_elem_parser(std::move(first_elem_parser_)),
@ -284,7 +290,8 @@ class ParserComparisonExpression : public IParserBase
{
private:
static const char * operators[];
ParserLeftAssociativeBinaryOperatorList operator_parser {operators, std::make_unique<ParserBetweenExpression>()};
static const char * overlapping_operators_to_skip[];
ParserLeftAssociativeBinaryOperatorList operator_parser {operators, overlapping_operators_to_skip, std::make_unique<ParserBetweenExpression>()};
protected:
const char * getName() const override{ return "comparison expression"; }

View File

@ -79,7 +79,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_to_table("TO TABLE");
ParserKeyword s_delete_where("DELETE WHERE");
ParserKeyword s_delete("DELETE");
ParserKeyword s_update("UPDATE");
ParserKeyword s_where("WHERE");
ParserKeyword s_to("TO");
@ -506,8 +506,17 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->type = ASTAlterCommand::MODIFY_SAMPLE_BY;
}
else if (s_delete_where.ignore(pos, expected))
else if (s_delete.ignore(pos, expected))
{
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
if (!s_where.ignore(pos, expected))
return false;
if (!parser_exp_elem.parse(pos, command->predicate, expected))
return false;
@ -518,6 +527,12 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
if (!parser_assignment_list.parse(pos, command->update_assignments, expected))
return false;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
if (!s_where.ignore(pos, expected))
return false;

View File

@ -10,7 +10,7 @@ namespace DB
* ALTER TABLE [db.]name [ON CLUSTER cluster]
* [ADD COLUMN [IF NOT EXISTS] col_name type [AFTER col_after],]
* [DROP COLUMN [IF EXISTS] col_to_drop, ...]
* [CLEAR COLUMN [IF EXISTS] col_to_clear [IN PARTITION partition],]
* [CLEAR COLUMN [IF EXISTS] col_to_clear[ IN PARTITION partition],]
* [MODIFY COLUMN [IF EXISTS] col_to_modify type, ...]
* [RENAME COLUMN [IF EXISTS] col_name TO col_name]
* [MODIFY PRIMARY KEY (a, b, c...)]
@ -19,8 +19,12 @@ namespace DB
* [DROP|DETACH|ATTACH PARTITION|PART partition, ...]
* [FETCH PARTITION partition FROM ...]
* [FREEZE [PARTITION] [WITH NAME name]]
* [DELETE WHERE ...]
* [UPDATE col_name = expr, ... WHERE ...]
* [DELETE[ IN PARTITION partition] WHERE ...]
* [UPDATE col_name = expr, ...[ IN PARTITION partition] WHERE ...]
* [ADD INDEX [IF NOT EXISTS] index_name [AFTER index_name]]
* [DROP INDEX [IF EXISTS] index_name]
* [CLEAR INDEX [IF EXISTS] index_name IN PARTITION partition]
* [MATERIALIZE INDEX [IF EXISTS] index_name [IN PARTITION partition]]
* ALTER LIVE VIEW [db.name]
* [REFRESH]
*/

View File

@ -9,17 +9,17 @@ struct BoolMask
BoolMask() {}
BoolMask(bool can_be_true_, bool can_be_false_) : can_be_true(can_be_true_), can_be_false(can_be_false_) {}
BoolMask operator &(const BoolMask & m)
BoolMask operator &(const BoolMask & m) const
{
return BoolMask(can_be_true && m.can_be_true, can_be_false || m.can_be_false);
return {can_be_true && m.can_be_true, can_be_false || m.can_be_false};
}
BoolMask operator |(const BoolMask & m)
BoolMask operator |(const BoolMask & m) const
{
return BoolMask(can_be_true || m.can_be_true, can_be_false && m.can_be_false);
return {can_be_true || m.can_be_true, can_be_false && m.can_be_false};
}
BoolMask operator !()
BoolMask operator !() const
{
return BoolMask(can_be_false, can_be_true);
return {can_be_false, can_be_true};
}
/// If mask is (true, true), then it can no longer change under operation |.

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <common/logger_useful.h>
#include <common/types.h>
namespace DB
@ -71,13 +72,13 @@ EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
const String & block_numbers_path, const String & path_prefix, const String & temp_path,
zkutil::ZooKeeper & zookeeper_)
: zookeeper(zookeeper_)
: zookeeper(&zookeeper_)
{
std::vector<String> holders;
while (true)
{
Coordination::Stat partitions_stat;
Strings partitions = zookeeper.getChildren(block_numbers_path, &partitions_stat);
Strings partitions = zookeeper->getChildren(block_numbers_path, &partitions_stat);
if (holders.size() < partitions.size())
{
@ -85,7 +86,7 @@ EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
for (size_t i = 0; i < partitions.size() - holders.size(); ++i)
{
String path = temp_path + "/abandonable_lock-";
holder_futures.push_back(zookeeper.asyncCreate(path, {}, zkutil::CreateMode::EphemeralSequential));
holder_futures.push_back(zookeeper->asyncCreate(path, {}, zkutil::CreateMode::EphemeralSequential));
}
for (auto & future : holder_futures)
{
@ -104,7 +105,7 @@ EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version));
Coordination::Responses lock_responses;
Coordination::Error rc = zookeeper.tryMulti(lock_ops, lock_responses);
Coordination::Error rc = zookeeper->tryMulti(lock_ops, lock_responses);
if (rc == Coordination::Error::ZBADVERSION)
{
LOG_TRACE(&Poco::Logger::get("EphemeralLocksInAllPartitions"), "Someone has inserted a block in a new partition while we were creating locks. Retry.");
@ -131,13 +132,16 @@ EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
void EphemeralLocksInAllPartitions::unlock()
{
if (!zookeeper)
return;
std::vector<zkutil::ZooKeeper::FutureMulti> futures;
for (const auto & lock : locks)
{
Coordination::Requests unlock_ops;
unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.path, -1));
unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.holder_path, -1));
futures.push_back(zookeeper.asyncMulti(unlock_ops));
futures.push_back(zookeeper->asyncMulti(unlock_ops));
}
for (auto & future : futures)

View File

@ -1,9 +1,14 @@
#pragma once
#include "ReplicatedMergeTreeMutationEntry.h"
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <IO/ReadHelpers.h>
#include <map>
#include <optional>
namespace DB
{
@ -87,13 +92,30 @@ private:
/// Acquires block number locks in all partitions.
class EphemeralLocksInAllPartitions : private boost::noncopyable
class EphemeralLocksInAllPartitions : public boost::noncopyable
{
public:
EphemeralLocksInAllPartitions(
const String & block_numbers_path, const String & path_prefix, const String & temp_path,
zkutil::ZooKeeper & zookeeper_);
EphemeralLocksInAllPartitions() = default;
EphemeralLocksInAllPartitions(EphemeralLocksInAllPartitions && rhs) noexcept
: zookeeper(rhs.zookeeper)
, locks(std::move(rhs.locks))
{
rhs.zookeeper = nullptr;
}
EphemeralLocksInAllPartitions & operator=(EphemeralLocksInAllPartitions && rhs) noexcept
{
zookeeper = rhs.zookeeper;
rhs.zookeeper = nullptr;
locks = std::move(rhs.locks);
return *this;
}
struct LockInfo
{
String path;
@ -110,8 +132,51 @@ public:
~EphemeralLocksInAllPartitions();
private:
zkutil::ZooKeeper & zookeeper;
zkutil::ZooKeeper * zookeeper = nullptr;
std::vector<LockInfo> locks;
};
/// This class allows scoped manipulations with block numbers locked in certain partitions
/// See StorageReplicatedMergeTree::allocateBlockNumbersInAffectedPartitions and alter()/mutate() methods
class PartitionBlockNumbersHolder
{
public:
PartitionBlockNumbersHolder(const PartitionBlockNumbersHolder &) = delete;
PartitionBlockNumbersHolder & operator=(const PartitionBlockNumbersHolder &) = delete;
using BlockNumbersType = ReplicatedMergeTreeMutationEntry::BlockNumbersType;
PartitionBlockNumbersHolder() = default;
PartitionBlockNumbersHolder(
BlockNumbersType block_numbers_, std::optional<EphemeralLocksInAllPartitions> locked_block_numbers_holder)
: block_numbers(std::move(block_numbers_))
, multiple_partitions_holder(std::move(locked_block_numbers_holder))
{
}
PartitionBlockNumbersHolder(
BlockNumbersType block_numbers_, std::optional<EphemeralLockInZooKeeper> locked_block_numbers_holder)
: block_numbers(std::move(block_numbers_))
, single_partition_holder(std::move(locked_block_numbers_holder))
{
}
PartitionBlockNumbersHolder & operator=(PartitionBlockNumbersHolder &&) = default;
const BlockNumbersType & getBlockNumbers() const { return block_numbers; }
void reset()
{
multiple_partitions_holder.reset();
single_partition_holder.reset();
block_numbers.clear();
}
private:
BlockNumbersType block_numbers;
std::optional<EphemeralLocksInAllPartitions> multiple_partitions_holder;
std::optional<EphemeralLockInZooKeeper> single_partition_holder;
};
}

View File

@ -35,9 +35,10 @@ struct ReplicatedMergeTreeMutationEntry
/// Replica which initiated mutation
String source_replica;
/// Accured numbers of blocks
/// Acquired block numbers
/// partition_id -> block_number
std::map<String, Int64> block_numbers;
using BlockNumbersType = std::map<String, Int64>;
BlockNumbersType block_numbers;
/// Mutation commands which will give to MUTATE_PART entries
MutationCommands commands;

View File

@ -45,6 +45,16 @@ public:
return part->storage.getVirtuals();
}
String getPartitionId() const
{
return part->info.partition_id;
}
String getPartitionIDFromQuery(const ASTPtr & ast, const Context & context) const
{
return part->storage.getPartitionIDFromQuery(ast, context);
}
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_))

View File

@ -2,11 +2,13 @@
#include <IO/Operators.h>
#include <Parsers/formatAST.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Core/Defines.h>
@ -32,6 +34,7 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
res.ast = command->ptr();
res.type = DELETE;
res.predicate = command->predicate;
res.partition = command->partition;
return res;
}
else if (command->type == ASTAlterCommand::UPDATE)
@ -40,6 +43,7 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
res.ast = command->ptr();
res.type = UPDATE;
res.predicate = command->predicate;
res.partition = command->partition;
for (const ASTPtr & assignment_ast : command->update_assignments->children)
{
const auto & assignment = assignment_ast->as<ASTAssignment &>();
@ -124,6 +128,7 @@ std::shared_ptr<ASTAlterCommandList> MutationCommands::ast() const
return res;
}
void MutationCommands::writeText(WriteBuffer & out) const
{
std::stringstream commands_ss;

View File

@ -43,8 +43,10 @@ struct MutationCommand
/// Columns with corresponding actions
std::unordered_map<String, ASTPtr> column_to_update_expression;
/// For MATERIALIZE INDEX
/// For MATERIALIZE INDEX.
String index_name;
/// For MATERIALIZE INDEX, UPDATE and DELETE.
ASTPtr partition;
/// For reads, drops and etc.

View File

@ -3915,6 +3915,60 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
}
std::set<String> StorageReplicatedMergeTree::getPartitionIdsAffectedByCommands(
const MutationCommands & commands, const Context & query_context) const
{
std::set<String> affected_partition_ids;
for (const auto & command : commands)
{
if (!command.partition)
{
affected_partition_ids.clear();
break;
}
affected_partition_ids.insert(
getPartitionIDFromQuery(command.partition, query_context)
);
}
return affected_partition_ids;
}
PartitionBlockNumbersHolder StorageReplicatedMergeTree::allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, const Context & query_context, const zkutil::ZooKeeperPtr & zookeeper) const
{
const std::set<String> mutation_affected_partition_ids = getPartitionIdsAffectedByCommands(commands, query_context);
if (mutation_affected_partition_ids.size() == 1)
{
const auto & affected_partition_id = *mutation_affected_partition_ids.cbegin();
auto block_number_holder = allocateBlockNumber(affected_partition_id, zookeeper);
if (!block_number_holder.has_value())
return {};
auto block_number = block_number_holder->getNumber(); /// Avoid possible UB due to std::move
return {{{affected_partition_id, block_number}}, std::move(block_number_holder)};
}
else
{
/// TODO: Implement optimal block number aqcuisition algorithm in multiple (but not all) partitions
EphemeralLocksInAllPartitions lock_holder(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
PartitionBlockNumbersHolder::BlockNumbersType block_numbers;
for (const auto & lock : lock_holder.getLocks())
{
if (mutation_affected_partition_ids.empty() || mutation_affected_partition_ids.count(lock.partition_id))
block_numbers[lock.partition_id] = lock.number;
}
return {std::move(block_numbers), std::move(lock_holder)};
}
}
void StorageReplicatedMergeTree::alter(
const AlterCommands & commands, const Context & query_context, TableLockHolder & table_lock_holder)
{
@ -3942,7 +3996,7 @@ void StorageReplicatedMergeTree::alter(
return queryToString(query);
};
auto zookeeper = getZooKeeper();
const auto zookeeper = getZooKeeper();
std::optional<ReplicatedMergeTreeLogEntryData> alter_entry;
std::optional<String> mutation_znode;
@ -3953,10 +4007,6 @@ void StorageReplicatedMergeTree::alter(
alter_entry.emplace();
mutation_znode.reset();
/// We can safely read structure, because we guarded with alter_intention_lock
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
auto current_metadata = getInMemoryMetadataPtr();
StorageInMemoryMetadata future_metadata = *current_metadata;
@ -4029,27 +4079,23 @@ void StorageReplicatedMergeTree::alter(
ops.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/log/log-", alter_entry->toString(), zkutil::CreateMode::PersistentSequential));
std::optional<EphemeralLocksInAllPartitions> lock_holder;
/// Now we will prepare mutations record.
/// This code pretty same with mutate() function but process results slightly differently.
PartitionBlockNumbersHolder partition_block_numbers_holder;
if (alter_entry->have_mutation)
{
String mutations_path = zookeeper_path + "/mutations";
const String mutations_path(zookeeper_path + "/mutations");
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = maybe_mutation_commands;
mutation_entry.alter_version = new_metadata_version;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = std::move(maybe_mutation_commands);
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
lock_holder.emplace(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : lock_holder->getLocks())
mutation_entry.block_numbers[lock.partition_id] = lock.number;
partition_block_numbers_holder =
allocateBlockNumbersInAffectedPartitions(mutation_entry.commands, query_context, zookeeper);
mutation_entry.block_numbers = partition_block_numbers_holder.getBlockNumbers();
mutation_entry.create_time = time(nullptr);
ops.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
@ -4060,6 +4106,11 @@ void StorageReplicatedMergeTree::alter(
Coordination::Responses results;
Coordination::Error rc = zookeeper->tryMulti(ops, results);
/// For the sake of constitency with mechanics of concurrent background process of assigning parts merge tasks
/// this placeholder must be held up until the moment of committing into ZK of the mutation entry
/// See ReplicatedMergeTreeMergePredicate::canMergeTwoParts() method
partition_block_numbers_holder.reset();
if (rc == Coordination::Error::ZOK)
{
if (alter_entry->have_mutation)
@ -4398,7 +4449,7 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_table_data, c
}
bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path) const
{
{
std::lock_guard lock(existing_nodes_cache_mutex);
@ -4420,7 +4471,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
std::optional<EphemeralLockInZooKeeper>
StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path)
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path) const
{
/// Lets check for duplicates in advance, to avoid superfluous block numbers allocation
Coordination::Requests deduplication_check_ops;
@ -5063,44 +5114,46 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
/// the version of this mutation), the mutation is considered done and can be deleted.
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = commands;
String mutations_path = zookeeper_path + "/mutations";
const String mutations_path = zookeeper_path + "/mutations";
const auto zookeeper = getZooKeeper();
/// Update the mutations_path node when creating the mutation and check its version to ensure that
/// nodes for mutations are created in the same order as the corresponding block numbers.
/// Should work well if the number of concurrent mutation requests is small.
while (true)
{
auto zookeeper = getZooKeeper();
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
EphemeralLocksInAllPartitions block_number_locks(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
PartitionBlockNumbersHolder partition_block_numbers_holder =
allocateBlockNumbersInAffectedPartitions(mutation_entry.commands, query_context, zookeeper);
for (const auto & lock : block_number_locks.getLocks())
entry.block_numbers[lock.partition_id] = lock.number;
entry.create_time = time(nullptr);
mutation_entry.block_numbers = partition_block_numbers_holder.getBlockNumbers();
mutation_entry.create_time = time(nullptr);
/// The following version check guarantees the linearizability property for any pair of mutations:
/// mutation with higher sequence number is guaranteed to have higher block numbers in every partition
/// (and thus will be applied strictly according to sequence numbers of mutations)
Coordination::Requests requests;
requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses;
Coordination::Error rc = zookeeper->tryMulti(requests, responses);
partition_block_numbers_holder.reset();
if (rc == Coordination::Error::ZOK)
{
const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with ID {}", entry.znode_name);
mutation_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with ID {}", mutation_entry.znode_name);
break;
}
else if (rc == Coordination::Error::ZBADVERSION)
@ -5112,7 +5165,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
waitMutation(entry.znode_name, query_context.getSettingsRef().mutations_sync);
waitMutation(mutation_entry.znode_name, query_context.getSettingsRef().mutations_sync);
}
void StorageReplicatedMergeTree::waitMutation(const String & znode_name, size_t mutations_sync) const

View File

@ -506,8 +506,8 @@ private:
/// Creates new block number if block with such block_id does not exist
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "");
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "") const;
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .
@ -531,9 +531,9 @@ private:
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
std::unordered_set<std::string> existing_nodes_cache;
std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path);
mutable std::unordered_set<std::string> existing_nodes_cache;
mutable std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path) const;
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
@ -565,6 +565,11 @@ private:
MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;
std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, const Context & query_context) const;
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, const Context & query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,17 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -0,0 +1,98 @@
import pytest
import helpers.client
import helpers.cluster
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/logs_config.xml', 'configs/cluster.xml'],
with_zookeeper=True, stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/logs_config.xml', 'configs/cluster.xml'],
with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_trivial_alter_in_partition_merge_tree_without_where(started_cluster):
try:
name = "test_trivial_alter_in_partition_merge_tree_without_where"
node1.query("DROP TABLE IF EXISTS {}".format(name))
node1.query("CREATE TABLE {} (p Int64, x Int64) ENGINE=MergeTree() ORDER BY tuple() PARTITION BY p".format(name))
node1.query("INSERT INTO {} VALUES (1, 2), (2, 3)".format(name))
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("ALTER TABLE {} UPDATE x = x + 1 IN PARTITION 1 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["5"]
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("ALTER TABLE {} UPDATE x = x + 1 IN PARTITION 2 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["5"]
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("ALTER TABLE {} DELETE IN PARTITION 1 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["5"]
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("ALTER TABLE {} DELETE IN PARTITION 2 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["5"]
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
def test_trivial_alter_in_partition_merge_tree_with_where(started_cluster):
try:
name = "test_trivial_alter_in_partition_merge_tree_with_where"
node1.query("DROP TABLE IF EXISTS {}".format(name))
node1.query("CREATE TABLE {} (p Int64, x Int64) ENGINE=MergeTree() ORDER BY tuple() PARTITION BY p".format(name))
node1.query("INSERT INTO {} VALUES (1, 2), (2, 3)".format(name))
node1.query("ALTER TABLE {} UPDATE x = x + 1 IN PARTITION 2 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["6"]
node1.query("ALTER TABLE {} UPDATE x = x + 1 IN PARTITION 1 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["6"]
node1.query("ALTER TABLE {} DELETE IN PARTITION 2 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["2"]
node1.query("ALTER TABLE {} DELETE IN PARTITION 1 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
assert node1.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["2"]
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
def test_trivial_alter_in_partition_replicated_merge_tree(started_cluster):
try:
name = "test_trivial_alter_in_partition_replicated_merge_tree"
node1.query("DROP TABLE IF EXISTS {}".format(name))
node2.query("DROP TABLE IF EXISTS {}".format(name))
for node in (node1, node2):
node.query(
"CREATE TABLE {name} (p Int64, x Int64) ENGINE=ReplicatedMergeTree('/clickhouse/{name}', '{{instance}}') ORDER BY tuple() PARTITION BY p"
.format(name=name))
node1.query("INSERT INTO {} VALUES (1, 2)".format(name))
node2.query("INSERT INTO {} VALUES (2, 3)".format(name))
node1.query("ALTER TABLE {} UPDATE x = x + 1 IN PARTITION 2 WHERE 1 SETTINGS mutations_sync = 2".format(name))
for node in (node1, node2):
assert node.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["6"]
node1.query("ALTER TABLE {} UPDATE x = x + 1 IN PARTITION 1 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
for node in (node1, node2):
assert node.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["6"]
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("ALTER TABLE {} DELETE IN PARTITION 2 SETTINGS mutations_sync = 2".format(name))
for node in (node1, node2):
assert node.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["6"]
node1.query("ALTER TABLE {} DELETE IN PARTITION 2 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
for node in (node1, node2):
assert node.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["2"]
node1.query("ALTER TABLE {} DELETE IN PARTITION 1 WHERE p = 2 SETTINGS mutations_sync = 2".format(name))
for node in (node1, node2):
assert node.query("SELECT sum(x) FROM {}".format(name)).splitlines() == ["2"]
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
node2.query("DROP TABLE IF EXISTS {}".format(name))