Part movement between shards

Integrate query deduplication from #17348
This commit is contained in:
Nicolae Vartolomei 2020-11-24 14:24:48 +00:00
parent 96e84a44d1
commit 53d57ffb52
31 changed files with 1328 additions and 14 deletions

View File

@ -440,7 +440,11 @@ class IColumn;
\
M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
M(UInt64, insert_shard_id, 0, "If non zero, when insert into a distributed table, the data will be inserted into the shard `insert_shard_id` synchronously. Possible values range from 1 to `shards_number` of corresponding distributed table", 0) \
M(Bool, allow_experimental_query_deduplication, false, "Allow sending parts' UUIDs for a query in order to deduplicate data parts if any", 0) \
\
/** Experimental feature for moving data between shards. */ \
\
M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \
M(Bool, experimental_query_deduplication_send_all_part_uuids, false, "If false only part UUIDs for currently moving parts are sent. If true all read part UUIDs are sent (useful only for testing).", 0) \
M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \

View File

@ -79,6 +79,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_to_table("TO TABLE");
ParserKeyword s_to_shard("TO SHARD");
ParserKeyword s_delete("DELETE");
ParserKeyword s_update("UPDATE");
@ -295,6 +296,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_to_shard.ignore(pos))
{
command->move_destination_type = DataDestinationType::SHARD;
}
else
return false;

View File

@ -10,6 +10,7 @@ enum class DataDestinationType
VOLUME,
TABLE,
DELETE,
SHARD,
};
}

View File

@ -151,6 +151,7 @@ MergeTreeData::MergeTreeData(
, log_name(table_id_.getNameForLogs())
, log(&Poco::Logger::get(log_name))
, storage_settings(std::move(storage_settings_))
, pinned_part_uuids(std::make_shared<PinnedPartUUIDs>())
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
, parts_mover(this)
@ -2920,6 +2921,11 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, const Context & /*query_context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MOVE PARTITION TO SHARD is not supported by storage {}", getName());
}
void MergeTreeData::fetchPartition(
const ASTPtr & /*partition*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
@ -2969,10 +2975,16 @@ Pipe MergeTreeData::alterPartition(
break;
case PartitionCommand::MoveDestinationType::TABLE:
{
checkPartitionCanBeDropped(command.partition);
String dest_database = query_context->resolveDatabase(command.to_database);
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
movePartitionToTable(dest_storage, command.partition, query_context);
}
break;
case PartitionCommand::MoveDestinationType::SHARD:
movePartitionToShard(command.partition, command.part, command.move_destination_name, query_context);
break;
}
}
@ -4054,6 +4066,12 @@ catch (...)
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
StorageMergeTree::PinnedPartUUIDsPtr MergeTreeData::getPinnedPartUUIDs() const
{
std::lock_guard lock(pinned_part_uuids_mutex);
return pinned_part_uuids;
}
MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_)
: parts_to_move(std::move(moving_parts_)), data(data_)
{

View File

@ -20,6 +20,7 @@
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <Interpreters/Aggregator.h>
@ -127,6 +128,8 @@ public:
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
using PinnedPartUUIDsPtr = std::shared_ptr<const PinnedPartUUIDs>;
constexpr static auto FORMAT_VERSION_FILE_NAME = "format_version.txt";
constexpr static auto DETACHED_DIR_NAME = "detached";
@ -781,6 +784,8 @@ public:
/// Mutex for currently_moving_parts
mutable std::mutex moving_parts_mutex;
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
/// Return main processing background job, like merge/mutate/fetch and so on
virtual std::optional<JobAndPool> getDataProcessingJob() = 0;
/// Return job to move parts between disks/volumes and so on.
@ -835,6 +840,10 @@ protected:
/// Use get and set to receive readonly versions.
MultiVersion<MergeTreeSettings> storage_settings;
/// Used to determine which UUIDs to send to root query executor for deduplication.
mutable std::shared_mutex pinned_part_uuids_mutex;
PinnedPartUUIDsPtr pinned_part_uuids;
/// Work with data parts
struct TagByInfo{};
@ -970,6 +979,7 @@ protected:
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;
/// Makes sense only for replicated tables
virtual void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, const Context & query_context);
virtual void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -291,7 +291,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
auto index_stats = std::make_unique<ReadFromMergeTree::IndexStats>();
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, query_context, part_filter_counters);
selectPartsToReadWithUUIDFilter(
parts,
part_values,
data.getPinnedPartUUIDs(),
minmax_idx_condition,
minmax_columns_types, partition_pruner,
max_block_numbers_to_read,
query_context, part_filter_counters);
else
selectPartsToRead(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, part_filter_counters);
@ -1922,6 +1929,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
const std::unordered_set<String> & part_values,
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
@ -1929,6 +1937,8 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
ContextPtr query_context,
PartFilterCounters & counters) const
{
const Settings & settings = query_context.getSettings();
/// process_parts prepare parts that have to be read for the query,
/// returns false if duplicated parts' UUID have been met
auto select_parts = [&] (MergeTreeData::DataPartsVector & selected_parts) -> bool
@ -1984,11 +1994,14 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
/// populate UUIDs and exclude ignored parts if enabled
if (part->uuid != UUIDHelpers::Nil)
{
if (settings.experimental_query_deduplication_send_all_part_uuids || pinned_part_uuids->contains(part->uuid))
{
auto result = temp_part_uuids.insert(part->uuid);
if (!result.second)
throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR);
}
}
selected_parts.push_back(part);
}
@ -2011,7 +2024,8 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
/// Process parts that have to be read for a query.
auto needs_retry = !select_parts(parts);
/// If any duplicated part UUIDs met during the first step, try to ignore them in second pass
/// If any duplicated part UUIDs met during the first step, try to ignore them in second pass.
/// This may happen when `prefer_localhost_replica` is set and "distributed" stage runs in the same process with "remote" stage.
if (needs_retry)
{
LOG_DEBUG(log, "Found duplicate uuids locally, will retry part selection without them");

View File

@ -150,6 +150,7 @@ private:
void selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
const std::unordered_set<String> & part_values,
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,

View File

@ -127,6 +127,7 @@ struct Settings;
M(UInt64, max_concurrent_queries, 0, "Max number of concurrently executed queries related to the MergeTree table (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
M(UInt64, min_marks_to_honor_max_concurrent_queries, 0, "Minimal number of marks to honor the MergeTree-level's max_concurrent_queries (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
M(UInt64, min_bytes_to_rebalance_partition_over_jbod, 0, "Minimal amount of bytes to enable part rebalance over JBOD array (0 - disabled).", 0) \
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
\
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \

View File

@ -0,0 +1,411 @@
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <boost/range/adaptor/map.hpp>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
namespace DB
{
PartMovesBetweenShardsOrchestrator::PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_)
: storage(storage_)
, zookeeper_path(storage.zookeeper_path)
, logger_name(storage.getStorageID().getFullTableName() + " (PartMovesBetweenShardsOrchestrator)")
, log(&Poco::Logger::get(logger_name))
, entries_znode_path(zookeeper_path + "/part_moves_shard")
{
task = storage.global_context.getSchedulePool().createTask(logger_name, [this]{ run(); });
}
void PartMovesBetweenShardsOrchestrator::run()
{
if (need_stop)
return;
auto sleep_ms = 10;
try
{
sync();
if (step())
sync();
else
sleep_ms = 3 * 1000;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
task->scheduleAfter(sleep_ms);
}
void PartMovesBetweenShardsOrchestrator::shutdown()
{
need_stop = true;
task->deactivate();
LOG_TRACE(log, "PartMovesBetweenShardsOrchestrator thread finished");
}
void PartMovesBetweenShardsOrchestrator::sync()
{
std::lock_guard lock(state_mutex);
entries.clear();
auto zk = storage.getZooKeeper();
Strings task_names = zk->getChildren(entries_znode_path);
for (auto const & task_name : task_names)
{
PartMovesBetweenShardsOrchestrator::Entry e;
Coordination::Stat stat;
e.znode_path = entries_znode_path + "/" + task_name;
auto entry_str = zk->get(e.znode_path, &stat);
e.fromString(entry_str);
e.version = stat.version;
e.znode_name = task_name;
entries[task_name] = std::move(e);
}
}
bool PartMovesBetweenShardsOrchestrator::step()
{
if (!storage.is_leader)
return false;
auto zk = storage.getZooKeeper();
std::optional<Entry> entry_to_process;
/// Try find an entry to process and copy it.
{
std::lock_guard lock(state_mutex);
for (auto const & entry : entries | boost::adaptors::map_values)
{
if (entry.state.value == EntryState::DONE || entry.state.value == EntryState::CANCELLED)
continue;
entry_to_process.emplace(entry);
break;
}
}
if (!entry_to_process.has_value())
return false;
try
{
/// Since some state transitions are long running (waiting on replicas acknowledgement we create this lock to avoid
/// other replicas trying to do the same work. All state transitions should be idempotent so is is safe to lose the
/// lock and have another replica retry.
///
/// Note: This blocks all other entries from being executed. Technical debt.
auto entry_node_holder = zkutil::EphemeralNodeHolder::create(entry_to_process->znode_path + "/lock_holder", *zk, storage.replica_name);
}
catch (const Coordination::Exception & e)
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(log, "Task {} is being processed by another replica", entry_to_process->znode_name);
return false;
}
throw;
}
try
{
stepEntry(entry_to_process.value());
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
Entry entry_copy = entry_to_process.value();
entry_copy.last_exception_msg = getCurrentExceptionMessage(false);
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
return false;
}
return true;
}
void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry)
{
auto zk = storage.getZooKeeper();
switch (entry.state.value)
{
case EntryState::DONE:
break;
case EntryState::CANCELLED:
break;
case EntryState::TODO:
{
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::SYNC_SOURCE;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
break;
case EntryState::SYNC_SOURCE:
{
{
/// Log entry.
Coordination::Requests ops;
ReplicatedMergeTreeLogEntryData log_entry;
log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS;
log_entry.create_time = std::time(nullptr);
log_entry.source_replica = storage.replica_name;
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses;
Coordination::Error rc = zk->tryMulti(ops, responses);
zkutil::KeeperMultiException::check(rc, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllReplicasToProcessLogEntry(log_entry, true);
}
{
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::SYNC_DESTINATION;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
}
break;
case EntryState::SYNC_DESTINATION:
{
{
/// Log entry.
Coordination::Requests ops;
ReplicatedMergeTreeLogEntryData log_entry;
log_entry.type = ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS;
log_entry.create_time = std::time(nullptr);
log_entry.source_replica = storage.replica_name;
log_entry.source_shard = zookeeper_path;
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(
entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses;
Coordination::Error rc = zk->tryMulti(ops, responses);
zkutil::KeeperMultiException::check(rc, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true);
}
{
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::DESTINATION_FETCH;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
}
break;
case EntryState::DESTINATION_FETCH:
{
/// There is a chance that attach on destination will fail and this task will be left in the queue forever.
/// Make sure table structure doesn't change when there are part movements in progress.
///
/// DESTINATION_FETCH is tricky from idempotency standpoint. We must ensure that no orchestrator
/// issues a CLONE_PART_FROM_SHARD entry after the source part is dropped.
///
/// `makeSetRequest` on the entry is a sloppy mechanism for ensuring that the thread that makes the state
/// transition is the last one to issue a CLONE_PART_FROM_SHARD event. We assume here that if last event
/// was processed then all the ones prior to that succeeded as well.
///
/// If we could somehow create just one entry in the log and record log entry name in the same
/// transaction so we could wait for it later the problem would be solved.
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(entry.znode_path, entry.toString(), entry.version));
/// Log entry.
ReplicatedMergeTreeLogEntryData log_entry;
log_entry.type = ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD;
log_entry.create_time = std::time(nullptr);
log_entry.new_part_name = entry.part_name;
log_entry.source_replica = storage.replica_name;
log_entry.source_shard = zookeeper_path;
log_entry.block_id = toString(entry.task_uuid);
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(
entry.to_shard + "/log/log-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
/// Submit.
Coordination::Responses responses;
Coordination::Error rc = zk->tryMulti(ops, responses);
zkutil::KeeperMultiException::check(rc, ops, responses);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true);
}
{
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::SOURCE_DROP_PRE_DELAY;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version + 1);
}
}
break;
case EntryState::SOURCE_DROP_PRE_DELAY:
{
std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds));
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::SOURCE_DROP;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
break;
case EntryState::SOURCE_DROP:
{
{
ReplicatedMergeTreeLogEntry log_entry;
if (storage.dropPart(zk, entry.part_name, log_entry,false, false))
storage.waitForAllReplicasToProcessLogEntry(log_entry, true);
}
{
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::SOURCE_DROP_POST_DELAY;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
}
break;
case EntryState::SOURCE_DROP_POST_DELAY:
{
std::this_thread::sleep_for(std::chrono::seconds(storage.getSettings()->part_moves_between_shards_delay_seconds));
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::REMOVE_UUID_PIN;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
break;
case EntryState::REMOVE_UUID_PIN:
{
{
PinnedPartUUIDs src_pins;
PinnedPartUUIDs dst_pins;
{
String s = zk->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat);
src_pins.fromString(s);
}
{
String s = zk->get(entry.to_shard + "/pinned_part_uuids", &dst_pins.stat);
dst_pins.fromString(s);
}
src_pins.part_uuids.erase(entry.part_uuid);
dst_pins.part_uuids.erase(entry.part_uuid);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version));
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version));
zk->multi(ops);
}
/// State transition.
Entry entry_copy = entry;
entry_copy.state = EntryState::DONE;
entry_copy.update_time = std::time(nullptr);
zk->set(entry_copy.znode_path, entry_copy.toString(), entry_copy.version);
}
break;
}
}
std::vector<PartMovesBetweenShardsOrchestrator::Entry> PartMovesBetweenShardsOrchestrator::getEntries() const
{
std::lock_guard lock(state_mutex);
auto res = std::vector<Entry>();
for (const auto& e : entries)
res.push_back(e.second);
return res;
}
String PartMovesBetweenShardsOrchestrator::Entry::toString() const
{
Poco::JSON::Object json;
json.set(JSON_KEY_CREATE_TIME, DB::toString(create_time));
json.set(JSON_KEY_UPDATE_TIME, DB::toString(update_time));
json.set(JSON_KEY_TASK_UUID, DB::toString(task_uuid));
json.set(JSON_KEY_PART_NAME, part_name);
json.set(JSON_KEY_PART_UUID, DB::toString(part_uuid));
json.set(JSON_KEY_TO_SHARD, to_shard);
json.set(JSON_KEY_STATE, state.toString());
json.set(JSON_KEY_LAST_EX_MSG, last_exception_msg);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
json.stringify(oss);
return oss.str();
}
void PartMovesBetweenShardsOrchestrator::Entry::fromString(const String & buf)
{
Poco::JSON::Parser parser;
auto json = parser.parse(buf).extract<Poco::JSON::Object::Ptr>();
create_time = parseFromString<time_t>(json->getValue<std::string>(JSON_KEY_CREATE_TIME));
update_time = parseFromString<time_t>(json->getValue<std::string>(JSON_KEY_UPDATE_TIME));
task_uuid = parseFromString<UUID>(json->getValue<std::string>(JSON_KEY_TASK_UUID));
part_name = json->getValue<std::string>(JSON_KEY_PART_NAME);
part_uuid = parseFromString<UUID>(json->getValue<std::string>(JSON_KEY_PART_UUID));
to_shard = json->getValue<std::string>(JSON_KEY_TO_SHARD);
state.value = EntryState::fromString(json->getValue<std::string>(JSON_KEY_STATE));
last_exception_msg = json->getValue<std::string>(JSON_KEY_LAST_EX_MSG);
}
}

View File

@ -0,0 +1,158 @@
#pragma once
#include <vector>
#include <common/logger_useful.h>
#include <common/types.h>
#include <Core/UUID.h>
#include <Core/BackgroundSchedulePool.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class StorageReplicatedMergeTree;
/// Cross shard part movement workflow orchestration.
class PartMovesBetweenShardsOrchestrator
{
public:
struct EntryState
{
enum Value
{
TODO,
SYNC_SOURCE,
SYNC_DESTINATION,
DESTINATION_FETCH,
SOURCE_DROP_PRE_DELAY,
SOURCE_DROP,
SOURCE_DROP_POST_DELAY,
REMOVE_UUID_PIN,
DONE,
CANCELLED,
};
EntryState(): value(TODO) {}
EntryState(Value value_): value(value_) {}
Value value;
String toString() const
{
switch (value)
{
case TODO: return "TODO";
case SYNC_SOURCE: return "SYNC_SOURCE";
case SYNC_DESTINATION: return "SYNC_DESTINATION";
case DESTINATION_FETCH: return "DESTINATION_FETCH";
case SOURCE_DROP_PRE_DELAY: return "SOURCE_DROP_PRE_DELAY";
case SOURCE_DROP: return "SOURCE_DROP";
case SOURCE_DROP_POST_DELAY: return "SOURCE_DROP_POST_DELAY";
case REMOVE_UUID_PIN: return "REMOVE_UUID_PIN";
case DONE: return "DONE";
case CANCELLED: return "CANCELLED";
}
throw Exception("Unknown EntryState: " + DB::toString<int>(value), ErrorCodes::LOGICAL_ERROR);
}
static EntryState::Value fromString(String in)
{
if (in == "TODO") return TODO;
else if (in == "SYNC_SOURCE") return SYNC_SOURCE;
else if (in == "SYNC_DESTINATION") return SYNC_DESTINATION;
else if (in == "DESTINATION_FETCH") return DESTINATION_FETCH;
else if (in == "SOURCE_DROP_PRE_DELAY") return SOURCE_DROP_PRE_DELAY;
else if (in == "SOURCE_DROP") return SOURCE_DROP;
else if (in == "SOURCE_DROP_POST_DELAY") return SOURCE_DROP_POST_DELAY;
else if (in == "REMOVE_UUID_PIN") return REMOVE_UUID_PIN;
else if (in == "DONE") return DONE;
else if (in == "CANCELLED") return CANCELLED;
else throw Exception("Unknown state: " + in, ErrorCodes::LOGICAL_ERROR);
}
};
struct Entry
{
friend class PartMovesBetweenShardsOrchestrator;
time_t create_time = 0;
time_t update_time = 0;
/// Globally unique identifier used for attaching parts on destination.
/// Using `part_uuid` results in part names being reused when moving parts back and forth.
UUID task_uuid;
String part_name;
UUID part_uuid;
String to_shard;
EntryState state;
String last_exception_msg;
String znode_name;
private:
/// Transient value for CAS.
uint32_t version;
String znode_path;
public:
String toString() const;
void fromString(const String & buf);
};
private:
static constexpr auto JSON_KEY_CREATE_TIME = "create_time";
static constexpr auto JSON_KEY_UPDATE_TIME = "update_time";
static constexpr auto JSON_KEY_TASK_UUID = "task_uuid";
static constexpr auto JSON_KEY_PART_NAME = "part_name";
static constexpr auto JSON_KEY_PART_UUID = "part_uuid";
static constexpr auto JSON_KEY_TO_SHARD = "to_shard";
static constexpr auto JSON_KEY_STATE = "state";
static constexpr auto JSON_KEY_LAST_EX_MSG = "last_exception";
public:
PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_);
void start() { task->activateAndSchedule(); }
void wakeup() { task->schedule(); }
void shutdown();
void sync();
/// We could have one thread per Entry and worry about concurrency issues.
/// Or we could have a single thread trying to run one step at a time.
bool step();
std::vector<Entry> getEntries() const;
private:
void run();
void stepEntry(const Entry & entry);
private:
StorageReplicatedMergeTree & storage;
String zookeeper_path;
String logger_name;
Poco::Logger * log = nullptr;
std::atomic<bool> need_stop{false};
BackgroundSchedulePool::TaskHolder task;
mutable std::mutex state_mutex;
std::map<String, Entry> entries;
public:
String entries_znode_path;
};
}

View File

@ -0,0 +1,36 @@
#include "PinnedPartUUIDs.h"
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
namespace DB
{
String PinnedPartUUIDs::toString() const
{
std::vector<UUID> vec(part_uuids.begin(), part_uuids.end());
Poco::JSON::Object json;
json.set(JSON_KEY_UUIDS, DB::toString(vec));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
json.stringify(oss);
return oss.str();
}
void PinnedPartUUIDs::fromString(const String & buf)
{
Poco::JSON::Parser parser;
auto json = parser.parse(buf).extract<Poco::JSON::Object::Ptr>();
std::vector<UUID> vec = parseFromString<std::vector<UUID>>(json->getValue<std::string>(PinnedPartUUIDs::JSON_KEY_UUIDS));
part_uuids.clear();
std::copy(vec.begin(), vec.end(), std::inserter(part_uuids, part_uuids.begin()));
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Common/ZooKeeper/IKeeper.h>
#include <Core/UUID.h>
#include <set>
namespace DB
{
struct PinnedPartUUIDs
{
std::set<UUID> part_uuids;
Coordination::Stat stat{};
bool contains(const UUID & part_uuid) const
{
return part_uuids.contains(part_uuid);
}
String toString() const;
void fromString(const String & buf);
private:
static constexpr auto JSON_KEY_UUIDS = "part_uuids";
};
}

View File

@ -183,7 +183,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
}
void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part, String block_id)
{
last_block_is_duplicate = false;
@ -199,7 +199,7 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta
try
{
commitPart(zookeeper, part, "");
commitPart(zookeeper, part, block_id);
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
}
catch (...)

View File

@ -40,7 +40,7 @@ public:
void write(const Block & block) override;
/// For ATTACHing existing data on filesystem.
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part, String block_id = "");
/// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const

View File

@ -53,6 +53,12 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << "get\n" << new_part_name;
break;
case CLONE_PART_FROM_SHARD:
out << "clone_part_from_shard\n"
<< new_part_name << "\n"
<< "source_shard: " << source_shard;
break;
case ATTACH_PART:
out << "attach\n" << new_part_name << "\n"
<< "part_checksum: " << part_checksum;
@ -141,6 +147,10 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << metadata_str;
break;
case SYNC_PINNED_PART_UUIDS:
out << "sync_pinned_part_uuids\n";
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown log entry type: {}", static_cast<int>(type));
}
@ -305,6 +315,16 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
metadata_str.resize(metadata_size);
in.readStrict(&metadata_str[0], metadata_size);
}
else if (type_str == "sync_pinned_part_uuids")
{
type = SYNC_PINNED_PART_UUIDS;
}
else if (type_str == "clone_part_from_shard")
{
type = CLONE_PART_FROM_SHARD;
in >> new_part_name;
in >> "\nsource_shard: " >> source_shard;
}
if (!trailing_newline_found)
in >> "\n";

View File

@ -41,6 +41,8 @@ struct ReplicatedMergeTreeLogEntryData
REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones
MUTATE_PART, /// Apply one or several mutations to the part.
ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths
SYNC_PINNED_PART_UUIDS, /// Synchronization point for ensuring that all replicas have up to date in-memory state.
CLONE_PART_FROM_SHARD, /// Clone part from another shard.
};
static String typeToString(Type type)
@ -56,6 +58,8 @@ struct ReplicatedMergeTreeLogEntryData
case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE";
case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART";
case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA";
case ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS: return "SYNC_PINNED_PART_UUIDS";
case ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD: return "CLONE_PART_FROM_SHARD";
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
@ -74,6 +78,7 @@ struct ReplicatedMergeTreeLogEntryData
Type type = EMPTY;
String source_replica; /// Empty string means that this entry was added to the queue immediately, and not copied from the log.
String source_shard;
String part_checksum; /// Part checksum for ATTACH_PART, empty otherwise.
@ -150,6 +155,14 @@ struct ReplicatedMergeTreeLogEntryData
return res;
}
/// Doesn't produce any part.
if (type == SYNC_PINNED_PART_UUIDS)
return {};
/// Doesn't produce any part by itself.
if (type == CLONE_PART_FROM_SHARD)
return {};
return {new_part_name};
}

View File

@ -1849,6 +1849,17 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
merges_version = queue_.pullLogsToQueue(zookeeper);
{
/// We avoid returning here a version to be used in a lightweight transaction.
///
/// When pinned parts set is changed a log entry is added to the queue in the same transaction.
/// The log entry serves as a synchronization point, and it also increments `merges_version`.
///
/// If pinned parts are fetched after logs are pulled then we can safely say that it contains all locks up to `merges_version`.
String s = zookeeper->get(queue.zookeeper_path + "/pinned_part_uuids");
pinned_part_uuids.fromString(s);
}
Coordination::GetResponse quorum_status_response = quorum_status_future.get();
if (quorum_status_response.error == Coordination::Error::ZOK)
{
@ -1919,6 +1930,13 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
for (const MergeTreeData::DataPartPtr & part : {left, right})
{
if (pinned_part_uuids.part_uuids.contains(part->uuid))
{
if (out_reason)
*out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
return false;
}
if (part->name == inprogress_quorum_part)
{
if (out_reason)
@ -2014,6 +2032,13 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart(
const MergeTreeData::DataPartPtr & part,
String * out_reason) const
{
if (pinned_part_uuids.part_uuids.contains(part->uuid))
{
if (out_reason)
*out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
return false;
}
if (part->name == inprogress_quorum_part)
{
if (out_reason)

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h>
@ -501,6 +502,9 @@ private:
/// (loaded at some later time than prev_virtual_parts).
std::unordered_map<String, std::set<Int64>> committing_blocks;
/// List of UUIDs for parts that have their identity "pinned".
PinnedPartUUIDs pinned_part_uuids;
/// Quorum state taken at some later time than prev_virtual_parts.
String inprogress_quorum_part;

View File

@ -13,6 +13,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand * command_ast)
{
if (command_ast->type == ASTAlterCommand::DROP_PARTITION)
@ -59,8 +64,11 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.to_database = command_ast->to_database;
res.to_table = command_ast->to_table;
break;
default:
case DataDestinationType::SHARD:
res.move_destination_type = PartitionCommand::MoveDestinationType::SHARD;
break;
case DataDestinationType::DELETE:
throw Exception("ALTER with this destination type is not handled. This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
if (res.move_destination_type != PartitionCommand::MoveDestinationType::TABLE)
res.move_destination_name = command_ast->move_destination_name;

View File

@ -62,6 +62,7 @@ struct PartitionCommand
DISK,
VOLUME,
TABLE,
SHARD,
};
std::optional<MoveDestinationType> move_destination_type;

View File

@ -17,6 +17,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
@ -131,6 +132,7 @@ namespace ErrorCodes
extern const int NO_SUCH_DATA_PART;
extern const int INTERSERVER_SCHEME_DOESNT_MATCH;
extern const int DUPLICATE_DATA_PART;
extern const int BAD_ARGUMENTS;
}
namespace ActionLocks
@ -263,6 +265,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, cleanup_thread(*this)
, part_check_thread(*this)
, restarting_thread(*this)
, part_moves_between_shards_orchestrator(*this)
, allow_renaming(allow_renaming_)
, replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size)
{
@ -440,6 +443,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
createNewZooKeeperNodes();
syncPinnedPartUUIDs();
}
@ -560,6 +564,10 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String());
}
/// Part movement.
zookeeper->createIfNotExists(zookeeper_path + "/part_moves_shard", String());
zookeeper->createIfNotExists(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString());
}
@ -1199,6 +1207,27 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
}
void StorageReplicatedMergeTree::syncPinnedPartUUIDs()
{
auto zookeeper = getZooKeeper();
Coordination::Stat stat;
String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &stat);
std::lock_guard lock(pinned_part_uuids_mutex);
/// Unsure whether or not this can be called concurrently.
if (pinned_part_uuids->stat.version < stat.version)
{
auto new_pinned_part_uuids = std::make_shared<PinnedPartUUIDs>();
new_pinned_part_uuids->fromString(s);
new_pinned_part_uuids->stat = stat;
pinned_part_uuids = new_pinned_part_uuids;
}
}
void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper,
const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet * absent_replicas_paths)
{
@ -1487,6 +1516,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
break;
case LogEntry::ALTER_METADATA:
return executeMetadataAlter(entry);
case LogEntry::SYNC_PINNED_PART_UUIDS:
syncPinnedPartUUIDs();
return true;
case LogEntry::CLONE_PART_FROM_SHARD:
executeClonePartFromShard(entry);
return true;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected log entry type: {}", static_cast<int>(entry.type));
}
@ -2479,6 +2514,47 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(entry.source_shard + "/replicas");
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
String replica = replicas.front();
LOG_INFO(log, "Will clone part from shard " + entry.source_shard + " and replica " + replica);
MutableDataPartPtr part;
{
auto metadata_snapshot = getInMemoryMetadataPtr();
String source_replica_path = entry.source_shard + "/replicas/" + replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
auto get_part = [&, address, timeouts, user_password, interserver_scheme]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
ErrorCodes::LOGICAL_ERROR);
return fetcher.fetchPart(
metadata_snapshot, entry.new_part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, true);
};
part = get_part();
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false);
output.writeExistingPart(part, "clone_part_from_shard_" + entry.block_id);
}
}
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
{
String source_path = zookeeper_path + "/replicas/" + source_replica;
@ -4005,6 +4081,8 @@ void StorageReplicatedMergeTree::startup()
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
background_executor.start();
startBackgroundMovesIfNeeded();
part_moves_between_shards_orchestrator.start();
}
catch (...)
{
@ -4035,6 +4113,7 @@ void StorageReplicatedMergeTree::shutdown()
restarting_thread.shutdown();
background_executor.finish();
part_moves_between_shards_orchestrator.shutdown();
{
auto lock = queue.lockQueue();
@ -5245,6 +5324,11 @@ void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica
queue.getEntries(res);
}
std::vector<PartMovesBetweenShardsOrchestrator::Entry> StorageReplicatedMergeTree::getPartMovesBetweenShardsEntries()
{
return part_moves_between_shards_orchestrator.getEntries();
}
time_t StorageReplicatedMergeTree::getAbsoluteDelay() const
{
time_t min_unprocessed_insert_time = 0;
@ -6432,6 +6516,100 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
cleanLastPartNode(partition_id);
}
void StorageReplicatedMergeTree::movePartitionToShard(
const ASTPtr & partition, bool move_part, const String & to, const Context & /*query_context*/)
{
/// This is a lightweight operation that only optimistically checks if it could succeed and queues tasks.
if (!move_part)
throw Exception("MOVE PARTITION TO SHARD is not supported, use MOVE PART instead", ErrorCodes::NOT_IMPLEMENTED);
if (normalizeZooKeeperPath(zookeeper_path) == normalizeZooKeeperPath(to))
throw Exception("Source and destination are the same", ErrorCodes::BAD_ARGUMENTS);
auto zookeeper = getZooKeeper();
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed});
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found locally", part_name);
if (part->uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part {} does not have an uuid assigned and it can't be moved between shards", part_name);
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper);
/// The following block is pretty much copy & paste from StorageReplicatedMergeTree::dropPart to avoid conflicts while this is WIP.
/// Extract it to a common method and re-use it before merging.
{
if (partIsLastQuorumPart(part->info))
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Part {} is last inserted part with quorum in partition. Would not be able to drop", part_name);
}
/// canMergeSinglePart is overlapping with dropPart, let's try to use the same code.
String out_reason;
if (!merge_pred.canMergeSinglePart(part, &out_reason))
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part is busy, reason: " + out_reason);
}
{
/// Optimistic check that for compatible destination table structure.
checkTableStructure(to, getInMemoryMetadataPtr());
}
PinnedPartUUIDs src_pins;
PinnedPartUUIDs dst_pins;
{
String s = zookeeper->get(zookeeper_path + "/pinned_part_uuids", &src_pins.stat);
src_pins.fromString(s);
}
{
String s = zookeeper->get(to + "/pinned_part_uuids", &dst_pins.stat);
dst_pins.fromString(s);
}
if (src_pins.part_uuids.contains(part->uuid) || dst_pins.part_uuids.contains(part->uuid))
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Part {} has it's uuid ({}) already pinned.", part_name, toString(part->uuid));
src_pins.part_uuids.insert(part->uuid);
dst_pins.part_uuids.insert(part->uuid);
PartMovesBetweenShardsOrchestrator::Entry part_move_entry;
part_move_entry.create_time = std::time(nullptr);
part_move_entry.update_time = part_move_entry.create_time;
part_move_entry.task_uuid = UUIDHelpers::generateV4();
part_move_entry.part_name = part->name;
part_move_entry.part_uuid = part->uuid;
part_move_entry.to_shard = to;
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", merge_pred.getVersion())); /// Make sure no new events were added to the log.
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/pinned_part_uuids", src_pins.toString(), src_pins.stat.version));
ops.emplace_back(zkutil::makeSetRequest(to + "/pinned_part_uuids", dst_pins.toString(), dst_pins.stat.version));
ops.emplace_back(zkutil::makeCreateRequest(
part_moves_between_shards_orchestrator.entries_znode_path + "/task-",
part_move_entry.toString(),
zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses;
Coordination::Error rc = zookeeper->tryMulti(ops, responses);
zkutil::KeeperMultiException::check(rc, ops, responses);
String task_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
LOG_DEBUG(log, "Created task for part movement between shards at " + task_znode_path);
/// Force refresh local state for making system table up to date after this operation succeeds.
part_moves_between_shards_orchestrator.sync();
// TODO: Add support for `replication_alter_partitions_sync`.
}
void StorageReplicatedMergeTree::getCommitPartOps(
Coordination::Requests & ops,
MutableDataPartPtr & part,

View File

@ -20,6 +20,7 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
@ -183,6 +184,8 @@ public:
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getQueue(LogEntriesData & res, String & replica_name);
std::vector<PartMovesBetweenShardsOrchestrator::Entry> getPartMovesBetweenShardsEntries();
/// Get replica delay relative to current time.
time_t getAbsoluteDelay() const;
@ -252,6 +255,7 @@ private:
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class PartMovesBetweenShardsOrchestrator;
friend class MergeTreeData;
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
@ -305,7 +309,6 @@ private:
DataPartsExchange::Fetcher fetcher;
/// When activated, replica is initialized and startup() method could exit
Poco::Event startup_event;
@ -350,6 +353,8 @@ private:
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
@ -387,6 +392,10 @@ private:
*/
void checkParts(bool skip_sanity_checks);
/// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
/// to be used for deduplication.
void syncPinnedPartUUIDs();
/** Check that the part's checksum is the same as the checksum of the same part on some other replica.
* If no one has such a part, nothing checks.
* Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
@ -457,6 +466,7 @@ private:
bool executeFetch(LogEntry & entry);
bool executeReplaceRange(const LogEntry & entry);
void executeClonePartFromShard(const LogEntry & entry);
/** Updates the queue.
*/
@ -633,6 +643,7 @@ private:
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, const Context & query_context) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -0,0 +1,135 @@
#include <Access/ContextAccess.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/StorageSystemPartMovesBetweenShards.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/typeid_cast.h>
namespace DB
{
NamesAndTypesList StorageSystemPartMovesBetweenShards::getNamesAndTypes()
{
return {
/// Table properties.
{ "database", std::make_shared<DataTypeString>() },
{ "table", std::make_shared<DataTypeString>() },
/// Constant element properties.
{ "task_name", std::make_shared<DataTypeString>() },
{ "task_uuid", std::make_shared<DataTypeUUID>() },
{ "create_time", std::make_shared<DataTypeDateTime>() },
{ "part_name", std::make_shared<DataTypeString>() },
{ "part_uuid", std::make_shared<DataTypeUUID>() },
{ "to_shard", std::make_shared<DataTypeString>() },
/// Processing status of item.
{ "update_time", std::make_shared<DataTypeDateTime>() },
{ "state", std::make_shared<DataTypeString>() },
{ "num_tries", std::make_shared<DataTypeUInt32>() },
{ "last_exception", std::make_shared<DataTypeString>() },
};
}
void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
{
const auto access = context.getAccess();
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
std::map<String, std::map<String, StoragePtr>> replicated_tables;
for (const auto & db : DatabaseCatalog::instance().getDatabases())
{
/// Check if database can contain replicated tables
if (!db.second->canContainMergeTreeTables())
continue;
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{
const auto & table = iterator->table();
if (!table)
continue;
if (!dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
continue;
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
continue;
replicated_tables[db.first][iterator->name()] = table;
}
}
MutableColumnPtr col_database_mut = ColumnString::create();
MutableColumnPtr col_table_mut = ColumnString::create();
for (auto & db : replicated_tables)
{
for (auto & table : db.second)
{
col_database_mut->insert(db.first);
col_table_mut->insert(table.first);
}
}
ColumnPtr col_database_to_filter = std::move(col_database_mut);
ColumnPtr col_table_to_filter = std::move(col_table_mut);
/// Determine what tables are needed by the conditions in the query.
{
Block filtered_block
{
{ col_database_to_filter, std::make_shared<DataTypeString>(), "database" },
{ col_table_to_filter, std::make_shared<DataTypeString>(), "table" },
};
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
if (!filtered_block.rows())
return;
col_database_to_filter = filtered_block.getByName("database").column;
col_table_to_filter = filtered_block.getByName("table").column;
}
for (size_t i = 0, tables_size = col_database_to_filter->size(); i < tables_size; ++i)
{
String database = (*col_database_to_filter)[i].safeGet<const String &>();
String table = (*col_table_to_filter)[i].safeGet<const String &>();
auto moves = dynamic_cast<StorageReplicatedMergeTree &>(*replicated_tables[database][table]).getPartMovesBetweenShardsEntries();
for (auto & entry : moves)
{
size_t col_num = 0;
/// Table properties.
res_columns[col_num++]->insert(database);
res_columns[col_num++]->insert(table);
/// Constant element properties.
res_columns[col_num++]->insert(entry.znode_name);
res_columns[col_num++]->insert(entry.task_uuid);
res_columns[col_num++]->insert(entry.create_time);
res_columns[col_num++]->insert(entry.part_name);
res_columns[col_num++]->insert(entry.part_uuid);
res_columns[col_num++]->insert(entry.to_shard);
/// Processing status of item.
res_columns[col_num++]->insert(entry.update_time);
res_columns[col_num++]->insert(entry.state.toString());
res_columns[col_num++]->insert(0);
res_columns[col_num++]->insert(entry.last_exception_msg);
}
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
class StorageSystemPartMovesBetweenShards final : public ext::shared_ptr_helper<StorageSystemPartMovesBetweenShards>, public IStorageSystemOneBlock<StorageSystemPartMovesBetweenShards>
{
friend struct ext::shared_ptr_helper<StorageSystemPartMovesBetweenShards>;
public:
std::string getName() const override { return "SystemShardMoves"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -25,6 +25,7 @@
#include <Storages/System/StorageSystemMutations.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <Storages/System/StorageSystemOne.h>
#include <Storages/System/StorageSystemPartMovesBetweenShards.h>
#include <Storages/System/StorageSystemParts.h>
#include <Storages/System/StorageSystemPartsColumns.h>
#include <Storages/System/StorageSystemProcesses.h>
@ -144,6 +145,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
attach<StorageSystemGraphite>(system_database, "graphite_retentions");
attach<StorageSystemMacros>(system_database, "macros");
attach<StorageSystemReplicatedFetches>(system_database, "replicated_fetches");
attach<StorageSystemPartMovesBetweenShards>(system_database, "part_moves_between_shards");
if (has_zookeeper)
attach<StorageSystemZooKeeper>(system_database, "zookeeper");

View File

@ -0,0 +1,6 @@
<yandex>
<merge_tree>
<assign_part_uuids>1</assign_part_uuids>
<part_moves_between_shards_delay_seconds>3</part_moves_between_shards_delay_seconds>
</merge_tree>
</yandex>

View File

@ -0,0 +1,26 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>s0r0</host>
<port>9000</port>
</replica>
<replica>
<host>s0r1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>s1r0</host>
<port>9000</port>
</replica>
<replica>
<host>s1r1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,162 @@
import random
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
s0r0 = cluster.add_instance(
's0r0',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
s0r1 = cluster.add_instance(
's0r1',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
s1r0 = cluster.add_instance(
's1r0',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
s1r1 = cluster.add_instance(
's1r1',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_move(started_cluster):
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
for replica_ix, r in enumerate(rs):
r.query("""
CREATE TABLE t(v UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/t', '{}')
ORDER BY tuple()
""".format(shard_ix, replica_ix))
s0r0.query("SYSTEM STOP MERGES t")
s0r0.query("INSERT INTO t VALUES (1)")
s0r0.query("INSERT INTO t VALUES (2)")
assert "2" == s0r0.query("SELECT count() FROM t").strip()
assert "0" == s1r0.query("SELECT count() FROM t").strip()
s0r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/t'")
print(s0r0.query("SELECT * FROM system.part_moves_between_shards"))
s0r0.query("SYSTEM START MERGES t")
s0r0.query("OPTIMIZE TABLE t FINAL")
while True:
time.sleep(3)
print(s0r0.query("SELECT * FROM system.part_moves_between_shards"))
# Eventually.
if "DONE" == s0r0.query("SELECT state FROM system.part_moves_between_shards").strip():
break
for n in [s0r0, s0r1]:
assert "1" == n.query("SELECT count() FROM t").strip()
for n in [s1r0, s1r1]:
assert "1" == n.query("SELECT count() FROM t").strip()
# Move part back
s1r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/t'")
while True:
time.sleep(3)
print(s1r0.query("SELECT * FROM system.part_moves_between_shards"))
# Eventually.
if "DONE" == s1r0.query("SELECT state FROM system.part_moves_between_shards").strip():
break
for n in [s0r0, s0r1]:
assert "2" == n.query("SELECT count() FROM t").strip()
for n in [s1r0, s1r1]:
assert "0" == n.query("SELECT count() FROM t").strip()
# Cleanup.
for n in started_cluster.instances.values():
n.query("DROP TABLE t SYNC")
def test_deduplication_while_move(started_cluster):
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
for replica_ix, r in enumerate(rs):
r.query("""
CREATE TABLE t(v UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/t', '{}')
ORDER BY tuple()
""".format(shard_ix, replica_ix))
r.query("""
CREATE TABLE t_d AS t
ENGINE Distributed('test_cluster', '', t)
""")
s0r0.query("SYSTEM STOP MERGES t")
s0r0.query("INSERT INTO t VALUES (1)")
s0r0.query("INSERT INTO t VALUES (2)")
s0r1.query("SYSTEM SYNC REPLICA t")
assert "2" == s0r0.query("SELECT count() FROM t").strip()
assert "0" == s1r0.query("SELECT count() FROM t").strip()
s0r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/t'")
s0r0.query("SYSTEM START MERGES t")
expected = """
1
2
"""
# Verify that we get consisntent result at all times while the part is moving from one shard to another.
while "DONE" != s0r0.query("SELECT state FROM system.part_moves_between_shards ORDER BY create_time DESC LIMIT 1").strip():
n = random.choice(list(started_cluster.instances.values()))
assert TSV(n.query("SELECT * FROM t_d ORDER BY v", settings={
"allow_experimental_query_deduplication": 1
})) == TSV(expected)
def test_move_not_permitted(started_cluster):
for ix, n in enumerate([s0r0, s1r0]):
n.query("""
CREATE TABLE not_permitted(v_{} UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/not_permitted', 'r')
ORDER BY tuple()
""".format(ix, ix))
s0r0.query("INSERT INTO not_permitted VALUES (1)")
with pytest.raises(QueryRuntimeException) as exc:
s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted'")
assert "DB::Exception: Table columns structure in ZooKeeper is different from local table structure." in str(exc.value)
with pytest.raises(QueryRuntimeException) as exc:
s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted'")
assert "DB::Exception: Source and destination are the same" in str(exc.value)

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<experimental_query_deduplication_send_all_part_uuids>1</experimental_query_deduplication_send_all_part_uuids>
</default>
</profiles>
</yandex>

View File

@ -11,15 +11,18 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
'node1',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'],
user_configs=['configs/profiles.xml'])
node2 = cluster.add_instance(
'node2',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'],
user_configs=['configs/profiles.xml'])
node3 = cluster.add_instance(
'node3',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'],
user_configs=['configs/profiles.xml'])
@pytest.fixture(scope="module")